Source code for bambi.priors

# pylint: disable=no-name-in-module
import json
import re
import sys

from copy import deepcopy
from os.path import dirname, join

import numpy as np
import pandas as pd
from scipy.special import hyp2f1
from statsmodels.genmod import families as genmod_families
from statsmodels.genmod.generalized_linear_model import GLM
from statsmodels.tools.sm_exceptions import PerfectSeparationError

genmod_links = genmod_families.links


[docs]class Family: """A specification of model family. Parameters ---------- name : str Family name. prior : Prior A ``Prior`` instance specifying the model likelihood prior. link : str The name of the link function transforming the linear model prediction to a parameter of the likelihood. parent : str The name of the prior parameter to set to the link-transformed predicted outcome (e.g., ``'mu'``, ``'p'``, etc.). """ def __init__(self, name, prior, link, parent): self.smlink = None fams = { "bernoulli": genmod_families.Binomial, "gamma": genmod_families.Gamma, "gaussian": genmod_families.Gaussian, "wald": genmod_families.InverseGaussian, "negativebinomial": genmod_families.NegativeBinomial, "poisson": genmod_families.Poisson, "t": None, # not implemented in statsmodels } self.name = name self.prior = prior self.link = link self.parent = parent self.smfamily = fams.get(name, None) def _set_link(self, link): """Set new link function. It updates both ``self.link`` (a string consumed by the backend) and ``self.smlink`` (the link instance for the statsmodel family) """ links = { "identity": genmod_links.identity(), "logit": genmod_links.logit(), "probit": genmod_links.probit(), "cloglog": genmod_links.cloglog(), "inverse": genmod_links.inverse_power(), "inverse_squared": genmod_links.inverse_squared(), "log": genmod_links.log(), } self.link = link if link in links: self.smlink = links[link] else: raise ValueError("Link name is not supported.") def __str__(self): if self.smfamily is None: str_ = "No family set" else: priors_str = ",\n ".join( [ f"{k}: {np.round_(v, 8)}" if not isinstance(v, Prior) else f"{k}: {v}" for k, v in self.prior.args.items() if k not in ["observed", self.parent] ] ) str_ = f"Response distribution: {self.smfamily.__name__}\n" str_ += f"Link: {self.link}\n" str_ += f"Priors:\n {priors_str}" return str_ def __repr__(self): return self.__str__()
[docs]class Prior: """Abstract specification of a term prior. Parameters ---------- name : str Name of prior distribution (e.g., ``'Normal'``, ``'Bernoulli'``, etc.) kwargs : dict Optional keywords specifying the parameters of the named distribution. """ def __init__(self, name, scale=None, **kwargs): self.name = name self._auto_scale = True self.scale = scale self.args = {} self.update(**kwargs)
[docs] def update(self, **kwargs): """Update the model arguments with additional arguments. Parameters ---------- kwargs : dict Optional keyword arguments to add to prior args. """ # Backends expect numpy arrays, so make sure all numeric values are represented as such. kwargs_ = {} for key, val in kwargs.items(): if isinstance(val, (int, float)): val = np.array(val) elif isinstance(val, np.ndarray): val = val.squeeze() kwargs_[key] = val self.args.update(kwargs_)
def __eq__(self, other): if not isinstance(other, type(self)): return False else: return self.__dict__ == other.__dict__ def __str__(self): args = ", ".join( [ f"{k}: {np.round_(v, 8)}" if not isinstance(v, Prior) else f"{k}: {v}" for k, v in self.args.items() ] ) return f"{self.name}({args})" def __repr__(self): return self.__str__()
[docs]class PriorFactory: """An object that supports specification and easy retrieval of default priors. Parameters ---------- defaults : str or dict Optional base configuration containing default priors for distribution, families, and term types. If a string, the name of a JSON file containing the config. If a dict, must contain keys for ``'dists'``, ``'terms'``, and ``'families'``; see the built-in JSON configuration for an example. If ``None``, a built-in set of priors will be used as defaults. dists : dict Optional specification of named distributions to use as priors. Each key gives the name of a newly defined distribution; values are two-element lists, where the first element is the name of the built-in distribution to use (``'Normal'``, ``'Cauchy',`` etc.), and the second element is a dictionary of parameters on that distribution (e.g., ``{'mu': 0, 'sigma': 10}``). Priors can be nested to arbitrary depths by replacing any parameter with another prior specification. terms : dict Optional specification of default priors for different model term types. Valid keys are ``'intercept'``, ``'common'``, or ``'group_specific'``. Values are either strings preprended by a #, in which case they are interpreted as pointers to distributions named in the dists dictionary, or key -> value specifications in the same format as elements in the dists dictionary. families : dict Optional specification of default priors for named family objects. Keys are family names, and values are dicts containing mandatory keys for ``'dist'``, ``'link'``, and ``'parent'``. Examples -------- >>> dists = {'my_dist': ['Normal', {'mu': 10, 'sigma': 1000}]} >>> pf = PriorFactory(dists=dists) >>> families = {'normalish': {'dist': ['normal', {sigma: '#my_dist'}], >>> link:'identity', parent: 'mu'}} >>> pf = PriorFactory(dists=dists, families=families) """ def __init__(self, defaults=None, dists=None, terms=None, families=None): if defaults is None: defaults = join(dirname(__file__), "config", "priors.json") if isinstance(defaults, str): with open(defaults, "r") as ofile: defaults = json.load(ofile) # Just in case the user plans to use the same defaults elsewhere defaults = deepcopy(defaults) if isinstance(dists, dict): defaults["dists"].update(dists) if isinstance(terms, dict): defaults["terms"].update(terms) if isinstance(families, dict): defaults["families"].update(families) self.dists = defaults["dists"] self.terms = defaults["terms"] self.families = defaults["families"] def _get_prior(self, spec, **kwargs): if isinstance(spec, str): spec = re.sub(r"^\#", "", spec) return self._get_prior(self.dists[spec]) elif isinstance(spec, (list, tuple)): name, args = spec if name.startswith("#"): name = re.sub(r"^\#", "", name) prior = self._get_prior(self.dists[name]) else: prior = Prior(name, **kwargs) args = {k: self._get_prior(v) for (k, v) in args.items()} prior.update(**args) return prior else: return spec # pylint: disable=inconsistent-return-statements
[docs] def get(self, dist=None, term=None, family=None): """Retrieve default prior for a named distribution, term type, or family. Parameters ---------- dist : str Name of desired distribution. Note that the name is the key in the defaults dictionary, not the name of the ``Distribution`` object used to construct the prior. term : str The type of term family to retrieve defaults for. Must be one of ``'intercept'``, ``'common'``, or ``'group_specific'``. family : str The name of the ``Family`` to retrieve. Must be a value defined internally. In the default config, this is one of ``'gaussian'``, ``'bernoulli'``, ``'poisson'``, ``'gama'``, ``'wald'``, or ``'negativebinomial'``. """ if dist is not None: if dist not in self.dists: raise ValueError(f"{dist} is not a valid distribution name.") return self._get_prior(self.dists[dist]) elif term is not None: if term not in self.terms: raise ValueError(f"{term} is not a valid term type.") return self._get_prior(self.terms[term]) elif family is not None: if family not in self.families: raise ValueError(f"{family} is not a valid family name.") _f = self.families[family] prior = self._get_prior(_f["dist"]) return Family(family, prior, _f["link"], _f["parent"])
class PriorScaler: # Default is 'wide'. The wide prior sigma is sqrt(1/3) = .577 on the partial # corr scale, which is the sigma of a flat prior over [-1,1]. names = {"narrow": 0.2, "medium": 0.4, "wide": 3 ** -0.5, "superwide": 0.8} def __init__(self, model, taylor): self.model = model self.stats = model.dm_statistics if hasattr(model, "dm_statistics") else None self.dm = pd.DataFrame( { lev: term.data[:, i] for term in model.common_terms.values() for i, lev in enumerate(term.levels) } ) self.priors = {} self.mle = None self.taylor = taylor with open(join(dirname(__file__), "config", "derivs.txt"), "r") as file: self.deriv = [next(file).strip("\n") for x in range(taylor + 1)] def _get_slope_stats(self, exog, predictor, sigma_corr, full_mod=None, points=4): """ Parameters ---------- full_mod : statsmodels.genmod.generalized_linear_model.GLM Statsmodels GLM to replace MLE model. For when ``'predictor'`` is not in the common part of the model. points : int Number of points to use for LL approximation. """ if full_mod is None: full_mod = self.mle # figure out which column of exog to drop for the null model keeps = [ i for i, x in enumerate(list(exog.columns)) if not np.array_equal(predictor, exog[x].values.flatten()) ] i = [x for x in range(exog.shape[1]) if x not in keeps][0] # get log-likelihood values from beta=0 to beta=MLE values = np.linspace(0.0, full_mod.params[i], points) # if there are multiple predictors, use statsmodels to optimize the LL if keeps: null = [ GLM( endog=self.model.response.data, exog=exog, family=self.model.family.smfamily(self.model.family.smlink), ).fit_constrained(str(exog.columns[i]) + "=" + str(val)) for val in values[:-1] ] null = np.append(null, full_mod) log_likelihood = np.array([x.llf for x in null]) # if just a single predictor, use statsmodels to evaluate the LL else: null = [ self.model.family.smfamily(self.model.family.smlink).loglike( np.squeeze(self.model.response.data), val * predictor ) for val in values[:-1] ] log_likelihood = np.append(null, full_mod.llf) # compute params of quartic approximatino to log-likelihood # c: intercept, d: shift parameter # a: quartic coefficient, b: quadratic coefficient intercept, shift_parameter = log_likelihood[-1], -(full_mod.params[i].item()) X = np.array([(values + shift_parameter) ** 4, (values + shift_parameter) ** 2]).T coef_a, coef_b = np.squeeze( np.linalg.multi_dot( [np.linalg.inv(np.dot(X.T, X)), X.T, (log_likelihood[:, None] - intercept)] ) ) # m, v: mean and variance of beta distribution of correlations # p, q: corresponding shape parameters of beta distribution mean = 0.5 variance = sigma_corr ** 2 / 4 p = mean * (mean * (1 - mean) / variance - 1) q = (1 - mean) * (mean * (1 - mean) / variance - 1) # function to return central moments of rescaled beta distribution def moment(k): return (2 * p / (p + q)) ** k * hyp2f1(p, -k, p + q, (p + q) / p) # evaluate the derivatives of beta = f(correlation). # dict 'point' gives points about which to Taylor expand. We want to # expand about the mean (generally 0), but some of the derivatives # do not exist at 0. Evaluating at a point very close to 0 (e.g., .001) # generally gives good results, but the higher order the expansion, the # further from 0 we need to evaluate the derivatives, or they blow up. point = dict(zip(range(1, 14), 2 ** np.linspace(-1, 5, 13) / 100)) vals = dict(a=coef_a, b=coef_b, n=len(self.model.response.data), r=point[self.taylor]) _deriv = [eval(x, globals(), vals) for x in self.deriv] # pylint: disable=eval-used # compute and return the approximate sigma def term(i, j): return ( 1 / np.math.factorial(i) * 1 / np.math.factorial(j) * _deriv[i] * _deriv[j] * (moment(i + j) - moment(i) * moment(j)) ) terms = [term(i, j) for i in range(1, self.taylor + 1) for j in range(1, self.taylor + 1)] return np.array(terms).sum() ** 0.5 def _get_intercept_stats(self, add_slopes=True): # start with mean and variance of Y on the link scale mod = GLM( endog=self.model.response.data, exog=np.repeat(1, len(self.model.response.data)), family=self.model.family.smfamily(self.model.family.smlink), missing="drop" if self.model.dropna else "none", ).fit() mu = mod.params # multiply SE by sqrt(N) to turn it into (approx.) sigma(Y) on link scale sigma = (mod.cov_params()[0] * len(mod.mu)) ** 0.5 # modify mu and sigma based on means and sigmas of slope priors. if len(self.model.common_terms) > 1 and add_slopes: means = np.array([x["mu"] for x in self.priors.values()]) sigmas = np.array([x["sigma"] for x in self.priors.values()]) # add to intercept prior index = list(self.priors.keys()) mu -= np.dot(means, self.stats["mean_x"][index]) sigma = (sigma ** 2 + np.dot(sigmas ** 2, self.stats["mean_x"][index] ** 2)) ** 0.5 return mu, sigma def _scale_common(self, term): # these defaults are only defined for Normal priors if term.prior.name != "Normal": return mu = [] sigma = [] sigma_corr = term.prior.scale for pred in term.data.T: mu += [0] sigma += [self._get_slope_stats(exog=self.dm, predictor=pred, sigma_corr=sigma_corr)] # save and set prior for i, lev in enumerate(term.levels): self.priors.update({lev: {"mu": mu[i], "sigma": sigma[i]}}) term.prior.update(mu=np.array(mu), sigma=np.array(sigma)) def _scale_intercept(self, term): # default priors are only defined for Normal priors if term.prior.name != "Normal": return # get prior mean and sigma for common intercept mu, sigma = self._get_intercept_stats() # save and set prior term.prior.update(mu=mu, sigma=sigma) def _scale_group_specific(self, term): # these default priors are only defined for HalfNormal priors if term.prior.args["sigma"].name != "HalfNormal": return sigma_corr = term.prior.scale # recreate the corresponding common effect data fix_data = term.data.sum(axis=1) # handle intercepts and cell means if term.constant: _, sigma = self._get_intercept_stats() sigma *= sigma_corr # handle slopes else: exists = [ x for x in self.dm.columns # pylint: disable=not-an-iterable if np.array_equal(fix_data, self.dm[x].values) ] # handle case where there IS a corresponding common effect if exists and exists[0] in self.priors.keys(): sigma = self.priors[exists[0]]["sigma"] # handle case where there IS NOT a corresponding common effect else: # the usual case: add the group specific effect data as a common effect # in the design matrix if not exists: fix_dataframe = pd.DataFrame(fix_data) # things break if column names are integers (the default) fix_dataframe.rename( columns={ c: "_" + str(c) for c in fix_dataframe.columns # pylint: disable=not-an-iterable }, inplace=True, ) exog = self.dm.join(fix_dataframe) # this handles the corner case where there technically is the # corresponding common effect, but the parameterization differs # between the common- and group-specific-effect specification. usually # this means the common effects use cell-means coding but the # group specific effects use k-1 coding else: group = term.name.split("|")[1] exog = self.model.group_specific_terms.values() exog = [v.data.sum(1) for v in exog if v.name.split("|")[-1] == group] index = ["_" + str(i) for i in range(len(exog))] exog = pd.DataFrame(exog, index=index).T # this will replace self.mle (which is missing predictors) missing = "drop" if self.model.dropna else "none" full_mod = GLM( endog=self.model.response.data, exog=exog, family=self.model.family.smfamily(self.model.family.smlink), missing=missing, ).fit() sigma = self._get_slope_stats( exog=exog, predictor=fix_data, full_mod=full_mod, sigma_corr=sigma_corr ) # set the prior sigma. term.prior.args["sigma"].update(sigma=np.squeeze(np.atleast_1d(sigma))) def scale(self): # classify all terms common_intercepts = [ term for term in self.model.terms.values() if not term.group_specific and term.data.sum(1).var() == 0 ] common_slopes = [ term for term in self.model.terms.values() if not term.group_specific and not term.data.sum(1).var() == 0 ] group_specific_terms = [term for term in self.model.terms.values() if term.group_specific] # arrange them in the order in which they should be initialized term_list = common_slopes + common_intercepts + group_specific_terms term_types = ( ["common"] * len(common_slopes) + ["intercept"] * len(common_intercepts) + ["group_specific"] * len(group_specific_terms) ) # initialize them in order for term, term_type in zip(term_list, term_types): if term.prior.scale is None: # pylint: disable=protected-access if not term.prior._auto_scale or not self.model.auto_scale: continue term.prior.scale = "wide" # MLE is fitted only once. if self.mle is None: self._fit_mle() # Convert scale names to float if isinstance(term.prior.scale, str): term.prior.scale = PriorScaler.names[term.prior.scale] # scale it! getattr(self, f"_scale_{term_type}")(term) def _fit_mle(self): """Fits MLE of the common part of the model. This used to be called in the class instantiation, but there is no need to fit the GLM when there are no automatic priors. So this method is only called when needed. """ missing = "drop" if self.model.dropna else "none" try: self.mle = GLM( endog=self.model.response.data, exog=self.dm, family=self.model.family.smfamily(self.model.family.smlink), missing=missing, ).fit() except PerfectSeparationError as error: msg = "Perfect separation detected, automatic priors are not available. " msg += "Please indicate priors manually." raise PerfectSeparationError(msg) from error except: print("Unexpected error:", sys.exc_info()[0]) raise