Source code for multiple_inference.bayes.base

"""Base classes for Bayesian analysis.
"""
from __future__ import annotations

import warnings
from typing import Any, Sequence

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.stats import multivariate_normal, norm, rv_continuous, wasserstein_distance

from ..base import ColumnType, ModelBase, Numeric1DArray, ResultsBase, ColumnsType
from ..stats import joint_distribution
from ..utils import weighted_quantile


[docs] class BayesResults(ResultsBase): """Results of Bayesian analysis. Inherits from :class:`multiple_inference.base.ResultsBase`. Args: *args (Any): Passed to :class:`multiple_inference.base.ResultsBase`. n_samples (int): Number of samples used for approximations (ranking, likelihood and Wasserstein distance). Defaults to 10000. **kwargs (Any): Passed to :class:`multiple_inference.base.ResultsBase`. Attributes: distributions (List[scipy.stats.norm]): Marginal posterior distributions. multivariate_distribution (scipy.stats.multivariate_normal): Joint posterior distribution. rank_df (pd.DataFrame): (n, n) dataframe of probabilities that column i has rank j. """ _default_title = "Bayesian estimates" def __init__(self, *args: Any, n_samples: int = 10000, **kwargs: Any): super().__init__(*args, **kwargs) # get the marginal (posterior) distributions, parameters, and pvalues self.marginal_distributions, params, pvalues = [], [], [] for i in range(self.model.n_params): dist = self.model.get_marginal_distribution(i) self.marginal_distributions.append(dist) params.append(dist.mean()) pvalues.append(dist.cdf(0)) self.params = np.array(params).squeeze() self.pvalues = np.array(pvalues).squeeze() self._n_samples = n_samples self._sample_weight = np.full(n_samples, 1 / n_samples) @property def _posterior_rvs(self): if hasattr(self, "_cached_posterior_rvs"): return self._cached_posterior_rvs # estimate the parameter rankings by drawing from the posterior try: self.joint_distribution = self.model.get_joint_distribution() self._cached_posterior_rvs = self.joint_distribution.rvs( size=self._n_samples ) except NotImplementedError: warnings.warn( "Model does not provide a joint posterior distribution." " I'll assume the marginal posterior distributions are independent." " Rank estimates and likelihood and Wasserstein approximations may be" " unreliable." ) self._cached_posterior_rvs = joint_distribution( self.marginal_distributions ).rvs(size=self._n_samples) return self._cached_posterior_rvs @property def _reconstructed_rvs(self): if hasattr(self, "_cached_reconstructed_rvs"): return self._cached_reconstructed_rvs self._cached_reconstructed_rvs = np.apply_along_axis( lambda mean: multivariate_normal.rvs(mean, self.model.cov), 1, self._posterior_rvs, ) return self._cached_reconstructed_rvs @property def rank_df(self): if hasattr(self, "_cached_rank_df"): return self._cached_rank_df argsort = np.argsort(-self._posterior_rvs, axis=1) rank_matrix = np.array( [ ((argsort == k).T * self._sample_weight).sum(axis=1) for k in range(self.model.n_params) ] ).T self._cached_rank_df = pd.DataFrame( rank_matrix, columns=self.model.exog_names, index=np.arange(1, self.model.n_params + 1), ) self._cached_rank_df.index.name = "Rank" return self._cached_rank_df
[docs] def compute_best_params( self, n_best_params: int = 1, alpha: float = 0.05, superset: bool = True, criterion: str = "fwer", ) -> pd.Series: """Compute the set of best (largest) parameters. Args: n_best_params (int, optional): Number of best parameters. Defaults to 1. alpha (float, optional): Significance level. If the criterion is "fwer", alpha is the family-wise error rate. If the criterion is "fdr", alpha is the false discovery rate. Defaults to 0.05. superset (bool, optional): Indicates that the returned set should be a superset of the truly best parameters. If False, the returned set should be a subset of the truly best parameters. Defaults to True. criterion (str, optional): "fwer" to control the family-wise error rate, "fdr" to control the false discovery rate. Defaults to "fwer". Raises: ValueError: ``criterion`` must be either "fwer" or "fdr". Returns: pd.Series: Indicates which parameters are in the selected set. """ if criterion not in ("fwer", "fdr"): raise ValueError(f"`criterion` must be 'fwer' or 'fdr', got {criterion}.") selected_mask = ( self._compute_best_params_fwer(n_best_params, alpha, superset) if criterion == "fwer" else self._compute_best_params_fdr(n_best_params, alpha, superset) ) return pd.Series(selected_mask, index=self.exog_names)
def _compute_best_params_fdr( self, n_best_params: int, alpha: float, superset: bool ) -> np.ndarray: """Compute the set of best parameters controlling the false discovery rate. See :meth:`BayesResults.compute_best_params` for arguments. """ if superset: fp_proba = self.rank_df[:n_best_params].sum(axis=0) else: fp_proba = self.rank_df[n_best_params:].sum(axis=0) fp_rate, selected = 0, [] argsort = fp_proba.argsort() for i in range(self.model.n_params): fp_rate = (len(selected) * fp_rate + fp_proba[argsort[i]]) / ( len(selected) + 1 ) if fp_rate <= alpha: selected.append(argsort[i]) else: break selected_mask = np.full(self.model.n_params, False) selected_mask[selected] = True return ~selected_mask if superset else selected_mask def _compute_best_params_fwer( self, n_best_params: int, alpha: float, superset: bool ) -> np.ndarray: """Compute the set of best parameters controlling the family-wise error rate. See :meth:`BayesResults.compute_best_params` for arguments. """ if superset: n_best_params = self.model.n_params - n_best_params ranks = self._posterior_rvs.argsort().argsort() + 1 else: ranks = (-self._posterior_rvs).argsort().argsort() + 1 # (n_samples, n_params) array where mask[i, k] indicates that k was not one of # the best parameters in draw i mask = ranks > n_best_params selected = [] unselected = list(range(self.model.n_params)) n_uncovered_rvs = 0 while n_uncovered_rvs / self._posterior_rvs.shape[0] <= alpha: # (n_params,) array of the number of rows that wouldn't be covered if parameter k # were selected uncovered = mask[:, unselected].sum(axis=0) index = np.argmin(uncovered) n_uncovered_rvs += uncovered[index] k = unselected[index] if n_uncovered_rvs / self._posterior_rvs.shape[0] <= alpha: selected.append(k) unselected.remove(k) # remove rows no longer covered after selecting parameter k mask = mask[~mask[:, k]] selected_mask = np.full(self.model.n_params, False) selected_mask[selected] = True return ~selected_mask if superset else selected_mask
[docs] def rank_conf_int( self, alpha: float = 0.05, columns: ColumnsType = None, simultaneous: bool = True, ) -> np.ndarray: """Compute rank confidence intervals. Args: alpha (float, optional): Significance level. Defaults to 0.05. columns (ColumnsType, optional): Selected columns. Defaults to None. simultaneous (bool, optional): Indicates that rank confidence intervals should have correct simultaneous coverage. If False, the rank confidence intervals will have correct marginal coverage. Defaults to True. Returns: np.ndarray: (# params, 2) array of rank confidence intervals. """ indices = self.model.get_indices(columns, self.exog_names) return ( self._simultaneous_rank_conf_int(alpha, indices) if simultaneous else self._marginal_rank_conf_int(alpha, indices) )
def _simultaneous_rank_conf_int( self, alpha: float, indices: np.ndarray ) -> np.ndarray: """Compute rank confidence intervals for simultaneous coverage. See :meth:`BayesResults.rank_conf_int` for arguments. """ # (n_samples, n_params) array where rank_rvs[i, k] is parameter k's rank in the # i-th draw rank_rvs = (-self._posterior_rvs).argsort().argsort() + 1 # (n_params, 2) array where conf_int[k] is the [lower bound, upper bound] of the # rank confidence interval conf_int = np.array(self.model.n_params * [[1, self.model.n_params]]) # number of posterior draws not simultaneously covered by the rank confidence intervals n_uncovered_rvs = 0 while n_uncovered_rvs / self._posterior_rvs.shape[0] <= alpha: # (2, n_samples, n_params) array where uncovered[b, i, k] indicates that # parameter k's rank in the i-th draw is equal to bound b uncovered = np.array( [rank_rvs == conf_int[:, 0], rank_rvs == conf_int[:, 1]] ) uncovered_sum = uncovered.sum(axis=1) # [b, k] indicating that changing bound b of parameter k will lead to the # minimal coverage reduction index = np.unravel_index(uncovered_sum.argmin(), uncovered_sum.shape) n_uncovered_rvs += uncovered_sum[index] if n_uncovered_rvs / self._posterior_rvs.shape[0] <= alpha: # remove rows no longer covered by the rank confidence intervals rank_rvs = rank_rvs[~uncovered[index[0], :, index[1]]] if index[0] == 0: # increment the lower bound conf_int[index[1], index[0]] += 1 else: # decrement the upper bound conf_int[index[1], index[0]] -= 1 return conf_int[indices] def _marginal_rank_conf_int(self, alpha: float, indices: np.ndarray) -> np.ndarray: """Compute rank confidence intervals for marginal coverage. See :meth:`BayesResults.rank_conf_int` for arguments. """ def compute_rank_ci(index): ci_upper = ci_lower = rank = param_rankings[index] - 1 probabilities = self.rank_df.iloc[:, index].values coverage = probabilities[ci_upper] while coverage < 1 - alpha and ci_upper < self.model.n_params - 1: # entend upper bound of the confidence interval until coverage is achieved ci_upper += 1 coverage += probabilities[ci_upper] while coverage < 1 - alpha and ci_lower > 0: # if the upper bound is at the maximum and coverage still isn't achieved, # entend the lower bound ci_lower -= 1 coverage += probabilities[ci_lower] best_ci_lower, best_ci_upper = ci_lower, ci_upper while ci_upper >= rank and ci_lower >= 0 and coverage > 1 - alpha: # shift the confidence interval upward while maintaining coverage coverage -= probabilities[ci_upper] ci_upper -= 1 while coverage < 1 - alpha and ci_lower > 0: ci_lower -= 1 coverage += probabilities[ci_lower] if ( ci_upper - ci_lower < best_ci_upper - best_ci_lower and coverage > 1 - alpha ): # select the shortest confidence interval best_ci_upper, best_ci_lower = ci_upper, ci_lower return [best_ci_lower, best_ci_upper] param_rankings = (-self.params).argsort().argsort() + 1 return np.array([compute_rank_ci(i) for i in indices]) + 1
[docs] def rank_point_plot( self, yname: str = None, xname: Sequence[str] = None, title: str = None, columns: ColumnsType = None, ax=None, **kwargs: Any, ): """Create a point plot. Args: yname (str, optional): Name of the endogenous variable. Defaults to None. xname (Sequence[str], optional): (# params,) sequence of parameter names. Defaults to None. title (str, optional): Plot title. Defaults to None. columns (ColumnsType, optional): Selected columns. Defaults to None. ax (AxesSubplot, optional): Axis to write on. **kwargs (Any): Passed to :meth:`ResultsBase.conf_int`. Returns: AxesSubplot: Plot. """ indices = self.model.get_indices(columns, self.exog_names) params = (-self.params).argsort().argsort()[indices] + 1 conf_int = self.rank_conf_int(columns=columns, **kwargs) yticks = np.arange(len(indices), 0, -1) if ax is None: _, ax = plt.subplots() ax.errorbar( x=params, # type: ignore, pylint: disable=no-member y=yticks, xerr=[params - conf_int[:, 0], conf_int[:, 1] - params], # type: ignore, pylint: disable=no-member fmt="o", ) ax.set_title(title or self.title) ax.set_xlabel(yname or self.model.endog_names) ax.set_yticks(yticks) ax.set_yticklabels(self.exog_names[indices] if xname is None else xname) return ax
[docs] def expected_wasserstein_distance( self, mean: Numeric1DArray = None, cov: np.ndarray = None, **kwargs: Any ) -> float: """Compute the Wasserstein distance metric. This method estimates the Wasserstein distance between the observed distribution (a joint normal characterized by ``mean`` and ``cov``) and the distribution you would expect to observe according to this model. Args: mean (Numeric1DArray, optional): (# params,) array of sample conventionally estimated means. If None, use the model's estimated means. Defaults to None. cov (np.ndarray, optional): (# params, # params) covaraince matrix for conventionally estimated means. If None, use the model's estimated covariance matrix. Defaults to None. **kwargs (Any): Keyword arguments for ``scipy.stats.wasserstein_distance``. Returns: float: Expected Wasserstein distance. """ if mean is None and cov is None: mean = self.params reconstructed_rvs = self._reconstructed_rvs else: if cov is None: cov = self.model.cov reconstructed_rvs = np.apply_along_axis( lambda mean: multivariate_normal.rvs(mean, cov), 1, self._posterior_rvs ) distances = np.apply_along_axis( lambda rv: wasserstein_distance(rv, mean, **kwargs), 1, reconstructed_rvs ) return (self._sample_weight * distances).sum()
[docs] def likelihood(self, mean: Numeric1DArray = None, cov: np.ndarray = None) -> float: """ Args: mean (Numeric1DArray, optional): (# params,) array of sample conventionally estimated means. If None, use the model's estimated means. Defaults to None. cov (np.ndarray, optional): (# params, # params) covaraince matrix for conventionally estimated means. If None, use the model's estimated covariance matrix. Defaults to None. Returns: float: Likelihood. """ if mean is None: mean = self.model.mean if cov is None: cov = self.model.cov return ( self._sample_weight * multivariate_normal.pdf(self._posterior_rvs, mean, cov) ).sum()
[docs] def line_plot( self, column: ColumnType = None, alpha: float = 0.05, title: str = None, yname: str = None, ax=None, ): """Create a line plot of the prior, conventional, and posterior estimates. Args: column (ColumnType, optional): Selected parameter. Defaults to None. alpha (float, optional): Sets the plot width. 0 is as wide as possible, 1 is as narrow as possible. Defaults to .05. title (str, optional): Plot title. Defaults to None. yname (str, optional): Name of the dependent variable. Defaults to None. ax (AxesSubplot, optional): Axis to write on. Returns: AxesSubplot: Plot. """ index = self.model.get_index(column) prior = self.model.get_marginal_prior(index) posterior = self.marginal_distributions[index] conventional = norm( self.model.mean[index], np.sqrt(self.model.cov[index, index]) ) xlim = np.array( [ dist.ppf([alpha / 2, 1 - alpha / 2]) for dist in (prior, conventional, posterior) ] ).T x = np.linspace(xlim[0].min(), xlim[1].max()) palette = sns.color_palette() if ax is None: _, ax = plt.subplots() sns.lineplot(x=x, y=prior.pdf(x), label="prior", ax=ax) ax.axvline(prior.mean(), linestyle="--", color=palette[0]) sns.lineplot(x=x, y=conventional.pdf(x), label="conventional") ax.axvline(conventional.mean(), linestyle="--", color=palette[1]) sns.lineplot(x=x, y=posterior.pdf(x), label="posterior") ax.axvline(posterior.mean(), linestyle="--", color=palette[2]) ax.set_title(title or self.model.exog_names[index]) ax.set_xlabel(yname or self.model.endog_names) return ax
[docs] def rank_matrix_plot(self, title: str = None, **kwargs: Any): """Plot a heatmap of the rank matrix. Args: title (str, optional): Plot title. Defaults to None. **kwargs (Any): Passed to ``sns.heatmap``. Returns: AxesSubplot: Heatmap. """ ax = sns.heatmap(self.rank_df, center=1 / self.model.n_params, **kwargs) ax.set_title(title or f"{self.title} rank matrix") return ax
[docs] def reconstruction_point_plot( self, yname: str = None, xname: Sequence[str] = None, title: str = None, alpha: float = 0.05, ax=None, ): """Create point plot of the reconstructed sample means. Plots the distribution of sample means you would expect to see if this model were correct. Args: yname (str, optional): Name of the endogenous variable. Defaults to None. xname (Sequence[str], optional): Names of x-ticks. Defaults to None. title (str, optional): Plot title. Defaults to None. alpha: (float, optional): Plot the 1-alpha CI. Defaults to 0.05. ax: (AxesSubplot, optional): Axis to write on. Returns: plt.axes._subplots.AxesSubplot: Plot. """ reconstructed_means = -np.sort(-self._reconstructed_rvs) params = np.average(reconstructed_means, axis=0, weights=self._sample_weight) conf_int = np.apply_along_axis( weighted_quantile, 0, reconstructed_means, quantiles=[alpha / 2, 1 - alpha / 2], sample_weight=self._sample_weight, ).T xname = xname or np.arange(self.model.n_params) yticks = np.arange(len(xname), 0, -1) if ax is None: _, ax = plt.subplots() ax.errorbar( x=params, y=yticks, xerr=[params - conf_int[:, 0], conf_int[:, 1] - params], fmt="o", ) ax.set_title(title or f"{self.title} reconstruction plot") ax.set_xlabel(yname or self.model.endog_names) ax.set_ylabel("rank") ax.set_yticks(yticks) ax.set_yticklabels(xname) ax.errorbar(x=-np.sort(-self.model.mean), y=yticks, fmt="x") return ax
def _make_summary_header(self, alpha: float) -> list[str]: return ["coef", "pvalue (1-sided)", f"[{alpha/2}", f"{1-alpha/2}]"]
[docs] class BayesBase(ModelBase): """Mixin for Bayesian models. Subclasses :class:`multiple_inference.base.ModelBase`. """ _results_cls = BayesResults
[docs] def get_marginal_prior(self, column: ColumnType) -> rv_continuous: """Get the marginal prior distribution of ``column``. Args: column (ColumnType): Name or index of the parameter of interest. Returns: rv_continuous: Prior distribution """ return self._get_marginal_prior(self.get_index(column))
def _get_marginal_prior(self, index: int) -> rv_continuous: """Private version of :meth:`self.get_marginal_prior`.""" raise NotImplementedError()
[docs] def get_marginal_distribution(self, column: ColumnType) -> rv_continuous: """Get the marginal posterior distribution of ``column``. Args: column (ColumnType): Name or index of the parameter of interest. Returns: rv_continuous: Posterior distribution. """ return self._get_marginal_distribution(self.get_index(column))
def _get_marginal_distribution(self, index: int) -> rv_continuous: """Private version of :meth:`self.get_marginal_distribution`.""" raise NotImplementedError()
[docs] def get_joint_prior(self, columns: ColumnsType = None): """Get the joint prior distribution. Args: columns (ColumnsType, optional): Selected columns. Defaults to None. Returns: rv_like: Joint distribution. """ return self._get_joint_prior(self.get_indices(columns))
def _get_joint_prior(self, indices: np.ndarray): """Private version of :meth:`self.get_joint_prior`.""" return joint_distribution([self.get_marginal_prior(i) for i in indices])
[docs] def get_joint_distribution(self, columns: ColumnsType = None): """Get the joint posterior distribution. Args: columns (ColumnsType, optional): Selected columns. Defaults to None. Returns: rv_like: Joint distribution. """ return self._get_joint_distribution(self.get_indices(columns))
def _get_joint_distribution(self, indices: np.ndarray): """Private version of :meth:`self.get_joint_distribution`.""" raise NotImplementedError()