Source code for bambi.models

# pylint: disable=no-name-in-module
# pylint: disable=too-many-lines
import re
import logging
from collections import OrderedDict
from copy import deepcopy

import numpy as np
import pandas as pd
import statsmodels.api as sm
from arviz.plots import plot_posterior
from arviz.data import from_dict
from patsy import dmatrices, dmatrix, EvalFactor
import pymc3 as pm

import bambi.version as version
from .backends import PyMC3BackEnd
from .external.patsy import Custom_NA
from .priors import Prior, PriorFactory, PriorScaler
from .utils import listify, get_bernoulli_data, extract_label

_log = logging.getLogger("bambi")


[docs]class Model: """Specification of model class. Parameters ---------- data : DataFrame or str The dataset to use. Either a pandas DataFrame, or the name of the file containing the data, which will be passed to `pd.read_csv()`. default_priors : dict or str An optional specification of the default priors to use for all model terms. Either a dictionary containing named distributions, families, and terms (see the documentation in priors.PriorFactory for details), or the name of a JSON file containing the same information. auto_scale : bool If True (default), priors are automatically rescaled to the data (to be weakly informative) any time default priors are used. Note that any priors explicitly set by the user will always take precedence over default priors. dropna : bool When True, rows with any missing values in either the predictors or outcome are automatically dropped from the dataset in a listwise manner. taylor : int Order of Taylor expansion to use in approximate variance when constructing the default priors. Should be between 1 and 13. Lower values are less accurate, tending to undershoot the correct prior width, but are faster to compute and more stable. Odd-numbered values tend to work better. Defaults to 5 for Normal models and 1 for non-Normal models. Values higher than the defaults are generally not recommended as they can be unstable. noncentered : bool If True (default), uses a non-centered parameterization for normal hyperpriors on grouped parameters. If False, naive (centered) parameterization is used. """ # pylint: disable=too-many-instance-attributes def __init__( self, data=None, default_priors=None, auto_scale=True, dropna=False, taylor=None, noncentered=True, ): if isinstance(data, str): data = pd.read_csv(data, sep=None, engine="python") self.default_priors = PriorFactory(default_priors) obj_cols = data.select_dtypes(["object"]).columns data[obj_cols] = data[obj_cols].apply(lambda x: x.astype("category")) self.data = data # Some group_specific effects stuff later requires us to make guesses about # column groupings into terms based on patsy's naming scheme. if re.search(r"[\[\]]+", "".join(data.columns)): _log.warning( "At least one of the column names in the specified " "dataset contain square brackets ('[' or ']')." "This may cause unexpected behavior if you specify " "models with group specific effects. You are encouraged to " "rename your columns to avoid square brackets." ) self.reset() self.auto_scale = auto_scale self.dropna = dropna self.taylor = taylor self.noncentered = noncentered self._backend_name = None # build() will loop over these, calling _add() and _set_priors() self.added_terms = [] self._added_priors = {} # if dropna=True, completes gets updated by add() to track complete cases self.completes = [] self.clean_data = None # attributes that are set later self.y = None # _add_y() self.family = None # _add_y() self.backend = None # _set_backend() self.dm_statistics = None # build() self._diagnostics = None # build() self.built = False # build()
[docs] def reset(self): """Reset list of terms and y-variable.""" self.terms = OrderedDict() self.y = None self.backend = None self.added_terms = [] self._added_priors = {} self.completes = [] self.clean_data = None
def _set_backend(self, backend): backend = backend.lower() if backend.startswith("pymc"): self.backend = PyMC3BackEnd() else: raise ValueError("At the moment, only the PyMC3 backend is supported.") self._backend_name = backend
[docs] def build(self, backend="pymc"): """Set up the model for sampling/fitting. Performs any steps that require access to all model terms (e.g., scaling priors on each term), then calls the BackEnd's build() method. Parameters ---------- backend : str The name of the backend to use for model fitting. Currently only 'pymc' is supported. """ # retain only the complete cases n_total = len(self.data.index) if self.completes: completes = [set(x) for x in sum(self.completes, [])] completes = set.intersection(*completes) else: completes = range(len(self.data.index)) self.clean_data = self.data.iloc[list(completes), :] # warn the user about any dropped rows # NOTE: When this message is shown the rows have already been removed. if len(completes) < n_total: _log.info( "Automatically removing %d/%d rows from the dataset.", n_total - len(completes), n_total, ) # loop over the added terms and _add() them for term_args in self.added_terms: self._add(**term_args) # set custom priors self._set_priors(**self._added_priors) # prepare all priors for name, term in self.terms.items(): type_ = ( "intercept" if name == "Intercept" else "group_specific" if self.terms[name].group_specific else "common" ) term.prior = self._prepare_prior(term.prior, type_) # check for backend if backend is None: if self._backend_name is None: raise ValueError( "No backend was passed or set in the Model; did you forget to call fit()?" ) backend = self._backend_name # check for outcome if self.y is None: raise ValueError( "No outcome (y) variable is set! Please specify " "an outcome variable using the formula interface " "before build() or fit()." ) # X = common effects design matrix (excluding intercept/constant term) # r2_x = 1 - 1/VIF, i.e., R2 for predicting each x from all other x's. # only compute these stats if there are multiple terms in the model terms = [t for t in self.common_terms.values() if t.name != "Intercept"] if len(self.common_terms) > 1: x_matrix = [pd.DataFrame(x.data, columns=x.levels) for x in terms] x_matrix = pd.concat(x_matrix, axis=1) self.dm_statistics = { "r2_x": pd.Series( { x: sm.OLS( endog=x_matrix[x], exog=sm.add_constant(x_matrix.drop(x, axis=1)) if "Intercept" in self.term_names else x_matrix.drop(x, axis=1), ) .fit() .rsquared for x in list(x_matrix.columns) } ), "sigma_x": x_matrix.std(), "mean_x": x_matrix.mean(axis=0), } # save potentially useful info for diagnostics # mat = correlation matrix of X, w/ diagonal replaced by X means mat = x_matrix.corr() for x_col in list(mat.columns): mat.loc[x_col, x_col] = self.dm_statistics["mean_x"][x_col] self._diagnostics = { # the Variance Inflation Factors (VIF), which is possibly # useful for diagnostics "VIF": 1 / (1 - self.dm_statistics["r2_x"]), "corr_mean_X": mat, } # throw informative error if perfect collinearity among common fx if any(self.dm_statistics["r2_x"] > 0.999): raise ValueError( "There is perfect collinearity among the common effects!\n" "Printing some design matrix statistics:\n" + str(self.dm_statistics) + "\n" + str(self._diagnostics) ) # throw informative error message if any categorical predictors have 1 category num_cats = [x.data.size for x in self.common_terms.values()] if any(np.array(num_cats) == 0): raise ValueError("At least one categorical predictor contains only 1 category!") # only set priors if there is at least one term in the model if self.terms: # Get and scale default priors if none are defined yet if self.taylor is not None: taylor = self.taylor else: taylor = 5 if self.family.name == "gaussian" else 1 scaler = PriorScaler(self, taylor=taylor) scaler.scale() # Tell user which event is being modeled if self.family.name == "bernoulli": _log.info( "Modeling the probability that %s==%s", self.y.name, str(self.y.success_event), ) self._set_backend(backend) self.backend.build(self) self.built = True
[docs] def fit( self, common=None, group_specific=None, fixed=None, random=None, priors=None, family="gaussian", link=None, run=True, categorical=None, omit_offsets=True, backend="pymc", **kwargs, ): """Fit the model using the specified BackEnd. Parameters ---------- common : str Optional formula specification of common effects. group_specific : list Optional list-based specification of group specific effects. priors : dict Optional specification of priors for one or more terms. A dict where the keys are the names of terms in the model, and the values are either instances of class Prior or ints, floats, or strings that specify the width of the priors on a standardized scale. family : str or Family A specification of the model family (analogous to the family object in R). Either a string, or an instance of class priors.Family. If a string is passed, a family with the corresponding name be defined in the defaults loaded at Model initialization. Valid pre-defined families are 'gaussian', 'bernoulli', 'poisson', and 't'. link : str The model link function to use. Can be either a string (must be one of the options defined in the current backend; typically this will include at least 'identity', 'logit', 'inverse', and 'log'), or a callable that takes a 1D ndarray or theano tensor as the sole argument and returns one with the same shape. run : bool Whether or not to immediately begin fitting the model once any set up of passed arguments is complete. categorical : str or list The names of any variables to treat as categorical. Can be either a single variable name, or a list of names. If categorical is None, the data type of the columns in the DataFrame will be used to infer handling. In cases where numeric columns are to be treated as categoricals (e.g., group specific factors coded as numerical IDs), explicitly passing variable names via this argument is recommended. omit_offsets: bool Omits offset terms in the InferenceData object when the model includes group specific effects. Defaults to True. backend : str The name of the BackEnd to use. Currently only 'pymc' backend is supported. """ if fixed is not None: _log.warning("The fixed argument has been deprecated, please use common") common = fixed if random is not None: _log.warning("The random argument has been deprecated, please use group_specific") group_specific = random if common is not None or group_specific is not None: self.add( common=common, group_specific=group_specific, priors=priors, family=family, link=link, categorical=categorical, append=False, ) # Run the BackEnd to fit the model. if backend is None: backend = "pymc" if self._backend_name is None else self._backend_name if run: if not self.built or backend != self._backend_name: self.build(backend) return self.backend.run(omit_offsets=omit_offsets, **kwargs) self._backend_name = backend return None
[docs] def add( self, common=None, group_specific=None, priors=None, family="gaussian", link=None, categorical=None, append=True, ): """Add one or more terms to the model via an R-like formula syntax. Parameters ---------- common : str Optional formula specification of common effects. group_specific : list Optional list-based specification of group specific effects. priors : dict Optional specification of priors for one or more terms. A dict where the keys are the names of terms in the model, and the values are either instances of class Prior or ints, floats, or strings that specify the width of the priors on a standardized scale. family : str, Family A specification of the model family (analogous to the family object in R). Either a string, or an instance of class priors.Family. If a string is passed, a family with the corresponding name must be defined in the defaults loaded at Model initialization. Valid pre-defined families are 'gaussian', 'bernoulli', 'poisson', and 't'. link : str The model link function to use. Can be either a string (must be one of the options defined in the current backend; typically this will include at least 'identity', 'logit', 'inverse', and 'log'), or a callable that takes a 1D ndarray or theano tensor as the sole argument and returns one with the same shape. categorical : str or list The names of any variables to treat as categorical. Can be either a single variable name, or a list of names. If categorical is None, the data type of the columns in the DataFrame will be used to infer handling. In cases where numeric columns are to be treated as categoricals (e.g., group specific factors coded as numerical IDs), explicitly passing variable names via this argument is recommended. append : bool If True, terms are appended to the existing model rather than replacing any existing terms. This allows formula-based specification of the model in stages. """ data = self.data # Primitive values (floats, strs) can be overwritten with Prior objects # so we need to make sure to copy first to avoid bad things happening # if user is re-using same prior dict in multiple models. if priors is None: priors = {} else: priors = deepcopy(priors) if not append: self.reset() # Explicitly convert columns to category if desired--though this # can also be done within the formula using C(). if categorical is not None: data = data.copy() cats = listify(categorical) data[cats] = data[cats].apply(lambda x: x.astype("category")) # Custom patsy.missing.NAAction class. Similar to patsy drop/raise # defaults, but changes the raised message and logs any dropped rows NA_handler = Custom_NA(dropna=self.dropna) # screen common terms # it deletes everything between [] and the brackets too. if common is not None: if "~" in common: clean_fix = re.sub(r"\[.+\]", "", common) dmatrices(clean_fix, data=data, NA_action=NA_handler) else: dmatrix(common, data=data, NA_action=NA_handler) # screen group specific terms if group_specific is not None: for term in listify(group_specific): for side in term.split("|"): dmatrix(side, data=data, NA_action=NA_handler) # update the running list of complete cases if NA_handler.completes: self.completes.append(NA_handler.completes) # save arguments to pass to _add() args = dict( zip( ["common", "group_specific", "priors", "family", "link", "categorical"], [common, group_specific, priors, family, link, categorical], ) ) self.added_terms.append(args) self.built = False
def _add( self, common=None, group_specific=None, priors=None, family="gaussian", link=None, categorical=None, ): """Internal version of add(), with the same arguments. Runs during Model.build() """ # use cleaned data with NAs removed (if user requested) data = self.clean_data # alter this pandas flag to avoid false positive SettingWithCopyWarnings data._is_copy = False # pylint: disable=protected-access # Explicitly convert columns to category if desired--though this # can also be done within the formula using C(). if categorical is not None: data = data.copy() cats = listify(categorical) data[cats] = data[cats].apply(lambda x: x.astype("category")) if common is not None: self._add_common(common, data, family, link, priors) if group_specific is not None: self._add_group_specific(listify(group_specific), data, priors) # pylint: disable=keyword-arg-before-vararg def _add_y(self, vector, prior=None, family="gaussian", link=None, event=None): """Add a dependent (or outcome) variable to the model. Parameters ---------- variable : str The name of the dataset column containing the y values. prior : Prior, int, float, str Optional specification of prior. Can be an instance of class Prior, a numeric value, or a string describing the width. In the numeric case, the distribution specified in the defaults will be used, and the passed value will be used to scale the appropriate variance parameter. For strings (e.g., 'wide', 'narrow', 'medium', or 'superwide'), predefined values will be used. family : str or Family A specification of the model family (analogous to the family object in R). Either a string, or an instance of class priors.Family. If a string is passed, a family with the corresponding name must be defined in the defaults loaded at Model initialization. Valid pre-defined families are 'gaussian', 'bernoulli', 'poisson', and 't'. link : str The model link function to use. Can be either a string (must be one of the options defined in the current backend; typically this will include at least 'identity', 'logit', 'inverse', and 'log'), or a callable that takes a 1D ndarray or theano tensor as the sole argument and returns one with the same shape. """ if isinstance(family, str): family = self.default_priors.get(family=family) self.family = family # Override family's link if another is explicitly passed if link is not None: self.family.link = link if prior is None: prior = self.family.prior variable = vector.design_info.term_names[0] if self.family.name == "gaussian": prior.update(sigma=Prior("HalfStudentT", nu=4, sigma=self.clean_data[variable].std())) # Success event when family = 'bernoulli' success_event = None categorical = False if event is not None: if self.family.name != "bernoulli": raise ValueError("Index notation only available for 'bernoulli' family") # pass in new Y data that has 1 if y=event and 0 otherwise success_event = event.group(1) categorical = True data = vector[:, vector.design_info.column_names.index(success_event)] # recall group(3) contains 'event' from 'y[event]' notation data = pd.DataFrame({event.group(3): data}) else: data = self.clean_data[variable] if self.family.name == "bernoulli": categorical = True data, success_event = get_bernoulli_data(data) self.y = ResponseTerm(variable, data, categorical, prior, success_event=success_event) self.built = False def _add_common(self, common, data, family, link, priors): # Create design matrices and add response if "~" in common: # check to see if formula is using the 'y[event] ~ x' syntax. # If so, chop it into groups: # 1 = 'y[event]', 2 = 'y', 3 = 'event', 4 = 'x' # If this syntax is not being used, event = None event = re.match(r"^((\S+)\[(\S+)\])\s*~(.*)$", common) if event is not None: common = "{}~{}".format(event.group(2), event.group(4)) y_vector, x_matrix = dmatrices(common, data=data, NA_action="raise") self._add_y(y_vector, family=family, link=link, event=event) else: x_matrix = dmatrix(common, data=data, NA_action="raise") # Add predictors self._add_common_predictors(x_matrix, priors) def _add_group_specific(self, group_specific, data, priors): for group_specific_effect in group_specific: group_specific_effect = group_specific_effect.strip() # Split specification into intercept, predictor, and grouper patt = r"^([01]+)*[\s\+]*([^\|]+)*\|(.*)" intcpt, pred, grpr = re.search(patt, group_specific_effect).groups() label = "{}|{}".format(pred, grpr) if pred else grpr prior = priors.pop(label, priors.get("group_specific", None)) # Treat all grouping variables as categoricals, regardless of # their dtype and what the user may have specified in the # 'categorical' argument. var_names = re.findall(r"(\w+)", grpr) for var_name in var_names: if var_name in data.columns: data.loc[:, var_name] = data.loc[:, var_name].astype("category") self.clean_data.loc[:, var_name] = data.loc[:, var_name] # Default to including group specific intercepts intcpt = 1 if intcpt is None else int(intcpt) grpr_df = dmatrix(f"0+{grpr}", data, return_type="dataframe", NA_action="raise") # If there's no predictor, we must be adding group specific intercepts if not pred and grpr not in self.terms: name = "1|" + grpr pred = np.ones((len(grpr_df), 1)) term = GroupSpecificTerm( name, grpr_df, pred, grpr_df.values, categorical=True, prior=prior ) self.terms[name] = term else: pred_df = dmatrix( f"{intcpt}+{pred}", data, return_type="dataframe", NA_action="raise" ) # determine value of the 'constant' attribute const = np.atleast_2d(pred_df.T).T.sum(1).var() == 0 factor_infos = pred_df.design_info.factor_infos for col, i in pred_df.design_info.column_name_indexes.items(): pred_data = pred_df.iloc[:, i] lev_data = grpr_df.multiply(pred_data, axis=0) # Also rename intercepts and skip if already added. # This can happen if user specifies something like # group_specific=['1|school', 'student|school']. if col == "Intercept": if grpr in self.terms: continue label = f"1|{grpr}" else: label = col + "|" + grpr # Delete everything between brackets and the brackets col = re.sub(r"\[.*?\]\ *", "", col) if EvalFactor(col) in factor_infos: categorical = factor_infos[EvalFactor(col)].type == "categorical" else: categorical = False prior = priors.pop(label, priors.get("group_specific", None)) pred_data = pred_data.to_numpy() pred_data = pred_data[:, None] # Must be 2D later term = GroupSpecificTerm( label, lev_data, pred_data, grpr_df.values, categorical=categorical, constant=const if const else None, prior=prior, ) self.terms[label] = term def _add_common_predictors(self, x_matrix, priors): design_info = x_matrix.design_info for term in design_info.terms: _slice = design_info.term_slices[term] _name = term.name() cols = design_info.column_names[_slice] data = pd.DataFrame(np.asfortranarray(x_matrix[:, _slice]), columns=cols) # General for main or interaction effects. # Any interaction with one categorical predictor, is considered categorical. categorical = "categorical" in [ design_info.factor_infos[fct].type for fct in term.factors ] prior = priors.pop(_name, priors.get("common", None)) # If there is more than one factor, we have an interaction if len(term.factors) > 1: term = InteractionTerm(_name, data, categorical=categorical, prior=prior) else: term = Term(_name, data, categorical=categorical, prior=prior) self.terms[_name] = term def _match_derived_terms(self, name): """Return all (group_specific) terms whose named are derived from the specified string. For example, 'condition|subject' should match the terms with names '1|subject', 'condition[T.1]|subject', and so on. Only works for strings with grouping operator ('|'). """ if "|" not in name: return None patt = r"^([01]+)*[\s\+]*([^\|]+)*\|(.*)" intcpt, pred, grpr = re.search(patt, name).groups() intcpt = f"1|{grpr}" if not pred: return [self.terms[intcpt]] if intcpt in self.terms else None source = f"{pred}|{grpr}" found = [ t for (n, t) in self.terms.items() if n == intcpt or re.sub(r"(\[.*?\])", "", n) == source ] # If only the intercept matches, return None, because we want to err # on the side of caution and not consider '1|subject' to be a match for # 'condition|subject' if no slopes are found (e.g., the intercept could # have been set by some other specification like 'gender|subject'). return found if found and (len(found) > 1 or found[0].name != intcpt) else None
[docs] def set_priors(self, priors=None, common=None, group_specific=None, match_derived_names=True): """Set priors for one or more existing terms. Parameters ---------- priors : dict Dict of priors to update. Keys are names of terms to update; values are the new priors (either a Prior instance, or an int or float that scales the default priors). Note that a tuple can be passed as the key, in which case the same prior will be applied to all terms named in the tuple. common : Prior, int, float or str A prior specification to apply to all common terms included in the model. group_specific : Prior, int, float or str A prior specification to apply to all group specific terms included in the model. match_derived_names : bool If True, the specified prior(s) will be applied not only to terms that match the keyword exactly, but to the levels of group specific effects that were derived from the original specification with the passed name. For example, `priors={'condition|subject':0.5}` would apply the prior to the terms with names '1|subject', 'condition[T.1]|subject', and so on. If False, an exact match is required for the prior to be applied. """ # save arguments to pass to _set_priors() at build time kwargs = dict( zip( ["priors", "common", "group_specific", "match_derived_names"], [priors, common, group_specific, match_derived_names], ) ) self._added_priors.update(kwargs) self.built = False
def _set_priors(self, priors=None, common=None, group_specific=None, match_derived_names=True): """Internal version of set_priors(), with same arguments. Runs during Model.build(). """ targets = {} if common is not None: targets.update({name: common for name in self.common_terms.keys()}) if group_specific is not None: targets.update({name: group_specific for name in self.group_specific_terms.keys()}) if priors is not None: for k, prior in priors.items(): for name in listify(k): term_names = list(self.terms.keys()) msg = f"No terms in model match {name}." if name not in term_names: terms = self._match_derived_terms(name) if not match_derived_names or terms is None: raise ValueError(msg) for term in terms: targets[term.name] = prior else: targets[name] = prior for name, prior in targets.items(): self.terms[name].prior = prior def _prepare_prior(self, prior, _type): """Helper function to correctly set default priors, auto_scaling, etc. Parameters ---------- prior : Prior object, or float, or None. _type : string accepted values are: 'intercept, 'common', or 'group_specific'. """ if prior is None and not self.auto_scale: prior = self.default_priors.get(term=_type + "_flat") if isinstance(prior, Prior): prior._auto_scale = False # pylint: disable=protected-access else: _scale = prior prior = self.default_priors.get(term=_type) prior.scale = _scale if prior.scale is not None: prior._auto_scale = False # pylint: disable=protected-access return prior def plot(self, draws=5000, var_names=None): _log.warning("plot will be deprecated, please use plot_priors") return self.plot_priors(draws, var_names)
[docs] def plot_priors( self, draws=5000, var_names=None, random_seed=None, figsize=None, textsize=None, hdi_prob=None, round_to=2, point_estimate="mean", kind="kde", bins=None, omit_offsets=True, omit_group_specific=True, ax=None, ): """ Samples from the prior distribution and plot its marginals. Parameters ---------- draws : int Number of draws to sample from the prior predictive distribution. Defaults to 5000. var_names : str or list A list of names of variables for which to compute the posterior predictive distribution. Defaults to both observed and unobserved RVs. random_seed : int Seed for the random number generator. figsize: tuple Figure size. If None it will be defined automatically. textsize: float Text size scaling factor for labels, titles and lines. If None it will be autoscaled based on figsize. hdi_prob: float, optional Plots highest density interval for chosen percentage of density. Use 'hide' to hide the highest density interval. Defaults to 0.94. round_to: int, optional Controls formatting of floats. Defaults to 2 or the integer part, whichever is bigger. point_estimate: Optional[str] Plot point estimate per variable. Values should be 'mean', 'median', 'mode' or None. Defaults to 'auto' i.e. it falls back to default set in rcParams. kind: str Type of plot to display (kde or hist) For discrete variables this argument is ignored and a histogram is always used. bins: integer or sequence or 'auto', optional Controls the number of bins, accepts the same keywords `matplotlib.hist()` does. Only works if `kind == hist`. If None (default) it will use `auto` for continuous variables and `range(xmin, xmax + 1)` for discrete variables. omit_offsets: bool Whether to omit offset terms in the plot. Defaults to True. omit_group_specific: bool Whether to omit group specific effects in the plot. Defaults to True. ax: numpy array-like of matplotlib axes or bokeh figures, optional A 2D array of locations into which to plot the densities. If not supplied, ArviZ will create its own array of plot areas (and return it). **kwargs Passed as-is to plt.hist() or plt.plot() function depending on the value of `kind`. Returns ------- axes: matplotlib axes or bokeh figures """ if not self.built: raise ValueError("Cannot plot priors until model is built!") unobserved_rvs_names = [] flat_rvs = [] for unobserved in self.backend.model.unobserved_RVs: if "Flat" in unobserved.__str__(): flat_rvs.append(unobserved.name) else: unobserved_rvs_names.append(unobserved.name) if var_names is None: var_names = pm.util.get_default_varnames( unobserved_rvs_names, include_transformed=False ) else: flat_rvs = [fv for fv in flat_rvs if fv in var_names] var_names = [vn for vn in var_names if vn not in flat_rvs] if flat_rvs: _log.info( "Variables %s have flat priors, and hence they are not plotted", ", ".join(flat_rvs) ) if omit_offsets: omitted = [f"{rt}_offset" for rt in self.group_specific_terms] var_names = [vn for vn in var_names if vn not in omitted] if omit_group_specific: omitted = list(self.group_specific_terms) var_names = [vn for vn in var_names if vn not in omitted] axes = None if var_names: pps = self.prior_predictive(draws=draws, var_names=var_names, random_seed=random_seed) axes = plot_posterior( pps, group="prior", figsize=figsize, textsize=textsize, hdi_prob=hdi_prob, round_to=round_to, point_estimate=point_estimate, kind=kind, bins=bins, ax=ax, ) return axes
[docs] def prior_predictive(self, draws=500, var_names=None, omit_offsets=True, random_seed=None): """ Generate samples from the prior predictive distribution. Parameters ---------- draws : int Number of draws to sample from the prior predictive distribution. Defaults to 500. var_names : str or list A list of names of variables for which to compute the posterior predictive distribution. Defaults to both observed and unobserved RVs. random_seed : int Seed for the random number generator. Returns ------- InferenceData InferenceData object with the groups prior, prior_predictive and ovserved_data. """ if var_names is None: variables = self.backend.model.unobserved_RVs + self.backend.model.observed_RVs variables_names = [v.name for v in variables] var_names = pm.util.get_default_varnames(variables_names, include_transformed=False) if omit_offsets: offset_vars = [f"{rt}_offset" for rt in self.group_specific_terms] var_names = [vn for vn in var_names if vn not in offset_vars] pps = pm.sample_prior_predictive( samples=draws, var_names=var_names, model=self.backend.model, random_seed=random_seed ) y_name = self.y.name if y_name in pps: prior_predictive = {y_name: np.moveaxis(pps.pop(y_name), 2, 0)} observed_data = {y_name: self.y.data.squeeze()} else: prior_predictive = {} observed_data = {} prior = {k: v[np.newaxis] for k, v in pps.items()} idata = from_dict( prior_predictive=prior_predictive, prior=prior, observed_data=observed_data, coords=self.backend.model.coords, # new line attrs={ "inference_library": self.backend.name, "inference_library_version": self.backend.name, "modeling_interface": "bambi", "modeling_interface_version": version.__version__, }, ) return idata
[docs] def posterior_predictive( self, idata, draws=500, var_names=None, inplace=True, random_seed=None ): """ Generate samples from the posterior predictive distribution. Parameters ---------- idata : InfereceData InfereceData with samples from the posterior distribution. draws : int Number of draws to sample from the prior predictive distribution. Defaults to 500. var_names : str or list A list of names of variables for which to compute the posterior predictive distribution. Defaults to both observed and unobserved RVs. inplace : bool If ``True`` it will add a posterior_predictive group to idata, otherwise it will return a copy of idata with the added group. If true and idata already have a posterior_predictive group it will be overwritted random_seed : int Seed for the random number generator. Returns ------- None or InferenceData When ``inplace=True`` add posterior_predictive group inplace to idata and return ``None`. Otherwise a copy of idata with a posterior_predictive group. """ if var_names is None: variables = self.backend.model.observed_RVs variables_names = [v.name for v in variables] var_names = pm.util.get_default_varnames(variables_names, include_transformed=False) pps = pm.sample_posterior_predictive( trace=idata, samples=draws, var_names=var_names, model=self.backend.model, random_seed=random_seed, ) if not inplace: idata = deepcopy(idata) if "posterior_predictive" in idata: del idata.posterior_predictive idata.add_groups( {"posterior_predictive": {k: v.squeeze()[np.newaxis] for k, v in pps.items()}} ) getattr(idata, "posterior_predictive").attrs["modeling_interface"] = "bambi" getattr(idata, "posterior_predictive").attrs[ "modeling_interface_version" ] = version.__version__ if inplace: return None else: return idata
def _get_pymc_coords(self): common_terms = { k + "_dim_0": v.cleaned_levels for k, v in self.common_terms.items() if v.categorical } # Include all group specific terms group_specific_terms = { k + "_dim_0": v.cleaned_levels for k, v in self.group_specific_terms.items() } return {**common_terms, **group_specific_terms} @property def term_names(self): """Return names of all terms in order of addition to model.""" return list(self.terms.keys()) @property def common_terms(self): """Return dict of all and only common effects in model.""" return {k: v for (k, v) in self.terms.items() if not v.group_specific} @property def group_specific_terms(self): """Return dict of all and only group specific effects in model.""" return {k: v for (k, v) in self.terms.items() if v.group_specific}
[docs]class BaseTerm: """Base class for all model terms""" group_specific = False def __init__(self, name, categorical, prior): self.name = name self.categorical = categorical self.prior = prior
[docs]class ResponseTerm(BaseTerm): """Representation of a single response model term. Parameters ---------- name : str Name of the term. data : (DataFrame, Series, ndarray) The term values. categorical : bool If True, the source variable is interpreted as nominal/categorical. If False, the source variable is treated as continuous. prior : Prior A specification of the prior(s) to use. An instance of class priors.Prior. success_event: str or None Indicates the success level when the term is a categorical variable. """ def __init__(self, name, data, categorical=False, prior=None, success_event=None): super().__init__(name, categorical, prior) if isinstance(data, pd.Series): data = data.to_frame() if isinstance(data, pd.DataFrame): self.levels = list(data.columns) data = data.values self.data = data self.constant = np.atleast_2d(data.T).T.sum(1).var() == 0 self.success_event = str(success_event) self.clean_event() def clean_event(self): event = re.search(r"\[([\S+]+)\]", self.success_event) if event is not None: self.success_event = event.group(1)
[docs]class Term(BaseTerm): """Representation of a single (common) model term. Parameters ---------- name : str Name of the term. data : (DataFrame, Series, ndarray) The term values. categorical : bool If True, the source variable is interpreted as nominal/categorical. If False, the source variable is treated as continuous. prior : Prior A specification of the prior(s) to use. An instance of class priors.Prior. constant : bool indicates whether the term levels collectively act as a constant, in which case the term is treated as an intercept for prior distribution purposes. """ def __init__(self, name, data, categorical=False, prior=None, constant=None): super().__init__(name, categorical, prior) if isinstance(data, pd.Series): data = data.to_frame() if isinstance(data, pd.DataFrame): self.levels = list(data.columns) data = data.values # Group specific effects pass through here else: data = np.atleast_2d(data) self.levels = list(range(data.shape[1])) self.data = data # identify and flag intercept and cell-means terms (i.e., full-rank # dummy codes), which receive special priors if constant is None: self.constant = np.atleast_2d(data.T).T.sum(1).var() == 0 else: self.constant = constant self.clean_levels() def clean_levels(self): self.cleaned_levels = [extract_label(level, "common") for level in self.levels]
[docs]class InteractionTerm(Term): """Representation of a single (common) interaction model term. Parameters ---------- name : str Name of the term. data : (DataFrame, Series, ndarray) The term values. categorical : bool If True, the source variable is interpreted as nominal/categorical. If False, the source variable is treated as continuous. prior : Prior A specification of the prior(s) to use. An instance of class priors.Prior. """ def __init__(self, name, data, categorical=False, prior=None): super().__init__(name, data, categorical, prior) def clean_levels(self): # Delete "T." within square brackets self.cleaned_levels = [re.sub("T.(?=[^[]]*\\])", "", level) for level in self.levels]
[docs]class GroupSpecificTerm(Term): """Representation of a single (group specific) model term. Parameters ---------- name : str Name of the term. data : (DataFrame, Series, ndarray) The term values. predictor: (DataFrame, Series, ndarray) Data of the predictor variable in the group specific term. grouper: (DataFrame, Series, ndarray) Data of the grouping variable in the group specific term. categorical : bool If True, the source variable is interpreted as nominal/categorical. If False, the source variable is treated as continuous. prior : Prior A specification of the prior(s) to use. An instance of class priors.Prior. constant : bool indicates whether the term levels collectively act as a constant, in which case the term is treated as an intercept for prior distribution purposes. """ group_specific = True def __init__( self, name, data, predictor, grouper, categorical=False, prior=None, constant=None ): super().__init__(name, data, categorical, prior, constant) self.grouper = grouper self.predictor = predictor self.group_index = self.invert_dummies(grouper) def clean_levels(self): self.cleaned_levels = [extract_label(level, "group_specific") for level in self.levels]
[docs] def invert_dummies(self, dummies): """ For the sake of computational efficiency (i.e., to avoid lots of large matrix multiplications in the backends), invert the dummy-coding process and represent full-rank dummies as a vector of indices into the coefficients. """ vec = np.zeros(len(dummies), dtype=int) for i in range(1, dummies.shape[1]): vec[dummies[:, i] == 1] = i return vec