Source code for bambi.diagnostics

import numpy as np
import pandas as pd
from scipy.signal import fftconvolve

# code adapted from pymc3.diagnostics:
# https://github.com/pymc-devs/pymc3/blob/master/pymc3/diagnostics.py


[docs]def autocorr(x): """ Compute autocorrelation using FFT for every lag for the input array https://en.wikipedia.org/wiki/Autocorrelation#Efficient_computation. Args: x (array-like): An array containing MCMC samples. Returns: np.ndarray: An array of the same size as the input array. """ y = x - x.mean() n = len(y) result = fftconvolve(y, y[::-1]) acorr = result[len(result) // 2:] acorr /= np.arange(n, 0, -1) acorr /= acorr[0] return acorr
[docs]def autocov(x): """Compute autocovariance estimates for every lag for the input array. Args: x (array-like): An array containing MCMC samples. Returns: np.ndarray: An array of the same size as the input array. """ acorr = autocorr(x) varx = np.var(x, ddof=1) * (len(x) - 1) / len(x) acov = acorr * varx return acov
def _vhat_w(mcmc): # Calculate between-chain variance B = mcmc.n_samples * mcmc.data.mean(axis=0).var(axis=0, ddof=1) # Calculate within-chain variance W = mcmc.data.var(axis=0, ddof=1).mean(axis=0) # Estimate of marginal posterior variance Vhat = W * (mcmc.n_samples - 1) / mcmc.n_samples + B / mcmc.n_samples return Vhat, W
[docs]def gelman_rubin(mcmc): """ Args: mcmc (MCMCResults): Pre-sliced MCMC samples to compute diagnostics for. """ if mcmc.n_chains < 2: raise ValueError( 'Gelman-Rubin diagnostic requires multiple chains ' 'of the same length.') # get Vhat and within-chain variance Vhat, W = _vhat_w(mcmc) # compute and return Gelman-Rubin statistic Rhat = np.sqrt(Vhat / W) return pd.DataFrame({'gelman_rubin': Rhat}, index=mcmc.levels)
[docs]def effective_n(mcmc): """ Args: mcmc (MCMCResults): Pre-sliced MCMC samples to compute diagnostics for. """ if mcmc.n_chains < 2: raise ValueError( 'Calculation of effective sample size requires multiple chains ' 'of the same length.') def get_neff(x): """Compute the effective sample size for a 2D array.""" trace_value = x.T nchain, n_samples = trace_value.shape acov = np.asarray([autocov(trace_value[chain]) for chain in range(nchain)]) chain_mean = trace_value.mean(axis=1) chain_var = acov[:, 0] * n_samples / (n_samples - 1.) acov_t = acov[:, 1] * n_samples / (n_samples - 1.) mean_var = np.mean(chain_var) var_plus = mean_var * (n_samples - 1.) / n_samples var_plus += np.var(chain_mean, ddof=1) rho_hat_t = np.zeros(n_samples) rho_hat_even = 1. rho_hat_t[0] = rho_hat_even rho_hat_odd = 1. - (mean_var - np.mean(acov_t)) / var_plus rho_hat_t[1] = rho_hat_odd # Geyer's initial positive sequence max_t = 1 t = 1 while t < (n_samples - 2) and (rho_hat_even + rho_hat_odd) >= 0.: rho_hat_even = 1. - (mean_var - np.mean(acov[:, t + 1])) / var_plus rho_hat_odd = 1. - (mean_var - np.mean(acov[:, t + 2])) / var_plus if (rho_hat_even + rho_hat_odd) >= 0: rho_hat_t[t + 1] = rho_hat_even rho_hat_t[t + 2] = rho_hat_odd max_t = t + 2 t += 2 # Geyer's initial monotone sequence t = 3 while t <= max_t - 2: if (rho_hat_t[t + 1] + rho_hat_t[t + 2]) > (rho_hat_t[t - 1] + rho_hat_t[t]): rho_hat_t[t + 1] = (rho_hat_t[t - 1] + rho_hat_t[t]) / 2. rho_hat_t[t + 2] = rho_hat_t[t + 1] t += 2 ess = nchain * n_samples ess = ess / (-1. + 2. * np.sum(rho_hat_t)) return ess nvar = mcmc.data.shape[-1] n_eff = [get_neff(mcmc.data[:, :, i]) for i in range(nvar)] return pd.DataFrame({'effective_n': n_eff}, index=mcmc.levels)