Source code for bambi.priors

# pylint: disable=no-name-in-module
import json
import re
from copy import deepcopy
from os.path import dirname, join

import numpy as np
import pandas as pd
from scipy.special import hyp2f1
from statsmodels.genmod import families as genmod_families
from statsmodels.genmod.generalized_linear_model import GLM


[docs]class Family: """A specification of model family. Parameters ---------- name : str Family name. prior : Prior A Prior instance specifying the model likelihood prior. link : str The name of the link function transforming the linear model prediction to a parameter of the likelihood. parent : str The name of the prior parameter to set to the link-transformed predicted outcome (e.g., mu, p, etc.). """ def __init__(self, name, prior, link, parent): self.name = name self.prior = prior self.link = link self.parent = parent fams = { "gaussian": genmod_families.Gaussian, "bernoulli": genmod_families.Binomial, "poisson": genmod_families.Poisson, "t": None, # not implemented in statsmodels } self.smfamily = fams[name] if name in fams.keys() else None
[docs]class Prior: """Abstract specification of a term prior. Parameters ---------- name : str Name of prior distribution (e.g., Normal, Bernoulli, etc.) kwargs : dict Optional keywords specifying the parameters of the named distribution. """ def __init__(self, name, scale=None, **kwargs): self.name = name self._auto_scale = True self.scale = scale self.args = {} self.update(**kwargs)
[docs] def update(self, **kwargs): """Update the model arguments with additional arguments. Parameters ---------- kwargs : dict Optional keyword arguments to add to prior args. """ # Backends expect numpy arrays, so make sure all numeric values are represented as such. kwargs = {k: (np.array(v) if isinstance(v, (int, float)) else v) for k, v in kwargs.items()} self.args.update(kwargs)
[docs]class PriorFactory: """An object that supports specification and easy retrieval of default priors. Parameters ---------- defaults : str or dict Optional base configuration containing default priors for distribution, families, and term types. If a string, the name of a JSON file containing the config. If a dict, must contain keys for 'dists', 'terms', and 'families'; see the built-in JSON configuration for an example. If None, a built-in set of priors will be used as defaults. dists : dict Optional specification of named distributions to use as priors. Each key gives the name of a newly defined distribution; values are two-element lists, where the first element is the name of the built-in distribution to use ('Normal', 'Cauchy', etc.), and the second element is a dictionary of parameters on that distribution (e.g., {'mu': 0, 'sd': 10}). Priors can be nested to arbitrary depths by replacing any parameter with another prior specification. terms : dict Optional specification of default priors for different model term types. Valid keys are 'intercept', 'fixed', or 'random'. Values are either strings preprended by a #, in which case they are interpreted as pointers to distributions named in the dists dictionary, or key -> value specifications in the same format as elements in the dists dictionary. families : dict Optional specification of default priors for named family objects. Keys are family names, and values are dicts containing mandatory keys for 'dist', 'link', and 'parent'. Examples -------- >>> dists = {'my_dist': ['Normal', {'mu': 10, 'sd': 1000}]} >>> pf = PriorFactory(dists=dists) >>> families = {'normalish': {'dist': ['normal', {sd: '#my_dist'}], >>> link:'identity', parent: 'mu'}} >>> pf = PriorFactory(dists=dists, families=families) """ def __init__(self, defaults=None, dists=None, terms=None, families=None): if defaults is None: defaults = join(dirname(__file__), "config", "priors.json") if isinstance(defaults, str): defaults = json.load(open(defaults, "r")) # Just in case the user plans to use the same defaults elsewhere defaults = deepcopy(defaults) if isinstance(dists, dict): defaults["dists"].update(dists) if isinstance(terms, dict): defaults["terms"].update(terms) if isinstance(families, dict): defaults["families"].update(families) self.dists = defaults["dists"] self.terms = defaults["terms"] self.families = defaults["families"] def _get_prior(self, spec, **kwargs): if isinstance(spec, str): spec = re.sub(r"^\#", "", spec) return self._get_prior(self.dists[spec]) elif isinstance(spec, (list, tuple)): name, args = spec if name.startswith("#"): name = re.sub(r"^\#", "", name) prior = self._get_prior(self.dists[name]) else: prior = Prior(name, **kwargs) args = {k: self._get_prior(v) for (k, v) in args.items()} prior.update(**args) return prior else: return spec # pylint: disable=inconsistent-return-statements
[docs] def get(self, dist=None, term=None, family=None): """Retrieve default prior for a named distribution, term type, or family. Parameters ---------- dist : str Name of desired distribution. Note that the name is the key in the defaults dictionary, not the name of the Distribution object used to construct the prior. term : str The type of term family to retrieve defaults for. Must be one of 'intercept', 'fixed', or 'random'. family : str The name of the Family to retrieve. Must be a value defined internally. In the default config, this is one of 'gaussian', 'bernoulli', 'poisson', or 't'. """ if dist is not None: if dist not in self.dists: raise ValueError(f"{dist} is not a valid distribution name.") return self._get_prior(self.dists[dist]) elif term is not None: if term not in self.terms: raise ValueError(f"{term} is not a valid term type.") return self._get_prior(self.terms[term]) elif family is not None: if family not in self.families: raise ValueError(f"{family} is not a valid family name.") _f = self.families[family] prior = self._get_prior(_f["dist"]) return Family(family, prior, _f["link"], _f["parent"])
class PriorScaler: # Default is 'wide'. The wide prior SD is sqrt(1/3) = .577 on the partial # corr scale, which is the SD of a flat prior over [-1,1]. names = {"narrow": 0.2, "medium": 0.4, "wide": 3 ** -0.5, "superwide": 0.8} def __init__(self, model, taylor): self.model = model self.stats = model.dm_statistics if hasattr(model, "dm_statistics") else None self.dm = pd.DataFrame( { lev: t.data[:, i] for t in model.fixed_terms.values() for i, lev in enumerate(t.levels) } ) self.priors = {} missing = "drop" if self.model.dropna else "none" self.mle = GLM( endog=self.model.y.data, exog=self.dm, family=self.model.family.smfamily(), missing=missing, ).fit() self.taylor = taylor with open(join(dirname(__file__), "config", "derivs.txt"), "r") as file: self.deriv = [next(file).strip("\n") for x in range(taylor + 1)] def _get_slope_stats(self, exog, predictor, sd_corr, full_mod=None, points=4): """ Args: full_mod: statsmodels GLM to replace MLE model. For when 'predictor' is not in the fixed part of the model. points: number of points to use for LL approximation. """ if full_mod is None: full_mod = self.mle # figure out which column of exog to drop for the null model keeps = [ i for i, x in enumerate(list(exog.columns)) if not np.array_equal(predictor, exog[x].values.flatten()) ] i = [x for x in range(exog.shape[1]) if x not in keeps][0] # get log-likelihood values from beta=0 to beta=MLE values = np.linspace(0.0, full_mod.params[i], points) # if there are multiple predictors, use statsmodels to optimize the LL if keeps: null = [ GLM( endog=self.model.y.data, exog=exog, family=self.model.family.smfamily() ).fit_constrained( str(exog.columns[i]) + "=" + str(val), start_params=full_mod.params.values ) for val in values[:-1] ] null = np.append(null, full_mod) log_likelihood = np.array([x.llf for x in null]) # if just a single predictor, use statsmodels to evaluate the LL else: null = [ self.model.family.smfamily().loglike(np.squeeze(self.model.y.data), val * predictor) for val in values[:-1] ] log_likelihood = np.append(null, full_mod.llf) # compute params of quartic approximatino to log-likelihood # c: intercept, d: shift parameter # a: quartic coefficient, b: quadratic coefficient intercept, shift_parameter = log_likelihood[-1], -(full_mod.params[i].item()) X = np.array([(values + shift_parameter) ** 4, (values + shift_parameter) ** 2]).T coef_a, coef_b = np.squeeze( np.linalg.multi_dot( [np.linalg.inv(np.dot(X.T, X)), X.T, (log_likelihood[:, None] - intercept)] ) ) # m, v: mean and variance of beta distribution of correlations # p, q: corresponding shape parameters of beta distribution mean = 0.5 variance = sd_corr ** 2 / 4 p = mean * (mean * (1 - mean) / variance - 1) q = (1 - mean) * (mean * (1 - mean) / variance - 1) # function to return central moments of rescaled beta distribution def moment(k): return (2 * p / (p + q)) ** k * hyp2f1(p, -k, p + q, (p + q) / p) # evaluate the derivatives of beta = f(correlation). # dict 'point' gives points about which to Taylor expand. We want to # expand about the mean (generally 0), but some of the derivatives # do not exist at 0. Evaluating at a point very close to 0 (e.g., .001) # generally gives good results, but the higher order the expansion, the # further from 0 we need to evaluate the derivatives, or they blow up. point = dict(zip(range(1, 14), 2 ** np.linspace(-1, 5, 13) / 100)) vals = dict(a=coef_a, b=coef_b, n=len(self.model.y.data), r=point[self.taylor]) _deriv = [eval(x, globals(), vals) for x in self.deriv] # pylint: disable=eval-used # compute and return the approximate SD def term(i, j): return ( 1 / np.math.factorial(i) * 1 / np.math.factorial(j) * _deriv[i] * _deriv[j] * (moment(i + j) - moment(i) * moment(j)) ) terms = [term(i, j) for i in range(1, self.taylor + 1) for j in range(1, self.taylor + 1)] return np.array(terms).sum() ** 0.5 def _get_intercept_stats(self, add_slopes=True): # start with mean and variance of Y on the link scale mod = GLM( endog=self.model.y.data, exog=np.repeat(1, len(self.model.y.data)), family=self.model.family.smfamily(), missing="drop" if self.model.dropna else "none", ).fit() mu = mod.params # multiply SE by sqrt(N) to turn it into (approx.) SD(Y) on link scale sd = (mod.cov_params()[0] * len(mod.mu)) ** 0.5 # modify mu and sd based on means and SDs of slope priors. if len(self.model.fixed_terms) > 1 and add_slopes: means = np.array([x["mu"] for x in self.priors.values()]) sds = np.array([x["sd"] for x in self.priors.values()]) # add to intercept prior index = list(self.priors.keys()) mu -= np.dot(means, self.stats["mean_x"][index]) sd = (sd ** 2 + np.dot(sds ** 2, self.stats["mean_x"][index] ** 2)) ** 0.5 return mu, sd def _scale_fixed(self, term): # these defaults are only defined for Normal priors if term.prior.name != "Normal": return mu = [] sd = [] sd_corr = term.prior.scale for pred in term.data.T: mu += [0] sd += [self._get_slope_stats(exog=self.dm, predictor=pred, sd_corr=sd_corr)] # save and set prior for i, lev in enumerate(term.levels): self.priors.update({lev: {"mu": mu[i], "sd": sd[i]}}) term.prior.update(mu=np.array(mu), sd=np.array(sd)) def _scale_intercept(self, term): # default priors are only defined for Normal priors if term.prior.name != "Normal": return # get prior mean and SD for fixed intercept mu, sd = self._get_intercept_stats() # save and set prior term.prior.update(mu=mu, sd=sd) def _scale_random(self, term): # these default priors are only defined for HalfNormal priors if term.prior.args["sd"].name != "HalfNormal": return sd_corr = term.prior.scale # recreate the corresponding fixed effect data fix_data = term.data.sum(axis=1) # handle intercepts and cell means if term.constant: _, sd = self._get_intercept_stats() sd *= sd_corr # handle slopes else: exists = [ x for x in self.dm.columns # pylint: disable=not-an-iterable if np.array_equal(fix_data, self.dm[x].values) ] # handle case where there IS a corresponding fixed effect if exists and exists[0] in self.priors.keys(): sd = self.priors[exists[0]]["sd"] # handle case where there IS NOT a corresponding fixed effect else: # the usual case: add the random effect data as a fixed effect # in the design matrix if not exists: fix_dataframe = pd.DataFrame(fix_data) # things break if column names are integers (the default) fix_dataframe.rename( columns={ c: "_" + str(c) for c in fix_dataframe.columns # pylint: disable=not-an-iterable }, inplace=True, ) exog = self.dm.join(fix_dataframe) # this handles the corner case where there technically is the # corresponding fixed effect, but the parameterization differs # between the fixed- and random-effect specification. usually # this means the fixed effects use cell-means coding but the # random effects use k-1 coding else: group = term.name.split("|")[1] exog = self.model.random_terms.values() exog = [v.data.sum(1) for v in exog if v.name.split("|")[-1] == group] index = ["_" + str(i) for i in range(len(exog))] exog = pd.DataFrame(exog, index=index).T # this will replace self.mle (which is missing predictors) missing = "drop" if self.model.dropna else "none" full_mod = GLM( endog=self.model.y.data, exog=exog, family=self.model.family.smfamily(), missing=missing, ).fit() sd = self._get_slope_stats( exog=exog, predictor=fix_data, full_mod=full_mod, sd_corr=sd_corr ) # set the prior SD. term.prior.args["sd"].update(sd=np.squeeze(np.atleast_1d(sd))) def scale(self): # classify all terms fixed_intercepts = [ t for t in self.model.terms.values() if not t.random and t.data.sum(1).var() == 0 ] fixed_slopes = [ t for t in self.model.terms.values() if not t.random and not t.data.sum(1).var() == 0 ] random_terms = [t for t in self.model.terms.values() if t.random] # arrange them in the order in which they should be initialized term_list = fixed_slopes + fixed_intercepts + random_terms term_types = ( ["fixed"] * len(fixed_slopes) + ["intercept"] * len(fixed_intercepts) + ["random"] * len(random_terms) ) # initialize them in order for t, term_type in zip(term_list, term_types): if t.prior.scale is None: # pylint: disable=protected-access if not t.prior._auto_scale or not self.model.auto_scale: continue t.prior.scale = "wide" # Convert scale names to float if isinstance(t.prior.scale, str): t.prior.scale = PriorScaler.names[t.prior.scale] # scale it! getattr(self, f"_scale_{term_type}")(t)