Source code for multiple_inference.bayes.nonparametric

"""Nonparametric empirical Bayes.

References:

    .. code-block::

        @article{cai2021nonparametric,
            title={Nonparametric empirical bayes estimation and testing for sparse and heteroscedastic signals},
            author={Cai, Junhui and Han, Xu and Ritov, Ya'acov and Zhao, Linda},
            journal={arXiv preprint arXiv:2106.08881},
            year={2021}
        }

Notes:

    This implementation is based on Cai et al.'s nonparametric Dirac delta prior. Future
    work should also implement their mixture model with a Laplace prior.
"""
from __future__ import annotations

from typing import Any, Union

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from scipy.stats import norm, rv_discrete

from .base import BayesBase, BayesResults, ColumnType

# x-values of the Dirac-Delta prior extend some number of standard deviations from the
# min and max values of the conventionally-estimated means
STDS_FROM_MIN_MAX = 2
# minimum number of parameters of the Dirac-Delta prior when ``num_fit`` is "auto"
MIN_PARAMS = 5
# base parameter used for inferring the number of parameters when ``num_fit`` is "auto"
LOG_BASE = 1.3
# epsilon to add to the posterior rvs to break ties (because the posteriors are discrete)
EPS = 1e-8


[docs] class NonparametricResults(BayesResults): @property def _posterior_rvs(self): if hasattr(self, "_cached_posterior_rvs"): return self._cached_posterior_rvs self._cached_posterior_rvs = super()._posterior_rvs self._cached_posterior_rvs + EPS * np.random.rand( *self._cached_posterior_rvs.shape ) return self._cached_posterior_rvs
[docs] def line_plot( self, column: ColumnType = None, alpha: float = 0.05, title: str = None, yname: str = None, ax=None, ): """Create a line plot of the prior, conventional, and posterior estimates. Args: column (ColumnType, optional): Selected parameter. Defaults to None. alpha (float, optional): Sets the plot width. 0 is as wide as possible, 1 is as narrow as possible. Defaults to .05. title (str, optional): Plot title. Defaults to None. yname (str, optional): Name of the dependent variable. Defaults to None. ax (AxesSubplot, optional): Axis to write on. Returns: AxesSubplot: Plot. """ index = self.model.get_index(column) prior = self.model.get_marginal_prior(index) posterior = self.marginal_distributions[index] conventional = norm( self.model.mean[index], np.sqrt(self.model.cov[index, index]) ) xlim = np.array( [ dist.ppf([alpha / 2, 1 - alpha / 2]) for dist in (prior, conventional, posterior) ] ).T mask = (xlim[0].min() <= prior.xk) & (prior.xk <= xlim[1].max()) palette = sns.color_palette() if ax is None: _, ax = plt.subplots() sns.lineplot(x=prior.xk[mask], y=prior.pk[mask], label="prior", ax=ax) ax.axvline(prior.mean(), linestyle="--", color=palette[0]) conventional_pmf = conventional.pdf(prior.xk[mask]) conventional_pmf /= conventional_pmf.sum() sns.lineplot(x=prior.xk[mask], y=conventional_pmf, label="conventional") ax.axvline(conventional.mean(), linestyle="--", color=palette[1]) sns.lineplot( x=posterior.xk[mask], y=posterior.pk[mask], label="posterior", ax=ax ) ax.axvline(posterior.mean(), linestyle="--", color=palette[2]) ax.set_title(title or self.model.exog_names[index]) ax.set_xlabel(yname or self.model.endog_names) return ax
[docs] class Nonparametric(BayesBase): """Bayesian model with a nonparametric Dirac delta prior. Args: num_fit (Union[str, int], optional): Number of PMF parameters to fit in the Dirac-Delta prior. Defaults to "auto". num_interp (int, optional): Number of PMF parameters to interpolate for the Dirac-Delta prior. Defaults to 1_000. lr (float, optional): Learning rate for the Adam optimizer. Defaults to 1e-3. betas (tuple[float, float], optional): Beta parameters for Adam. Defaults to (0.9, 0.99). eps (float, optional): Epsilon parameter for Adam. Defaults to 1e-8. train_iter (int, optional): Number of iterations of gradient descent. Defaults to 1_000. Examples: .. testcode:: import numpy as np from multiple_inference.bayes import Nonparametric np.random.seed(0) model = Nonparametric(np.arange(10), np.identity(10)) results = model.fit() print(results.summary()) .. testoutput:: :options: -ELLIPSIS, +NORMALIZE_WHITESPACE Bayesian estimates ======================================= coef pvalue (1-sided) [0.025 0.975] --------------------------------------- x0 0.535 0.284 -1.068 2.295 x1 1.340 0.073 -0.433 3.136 x2 2.191 0.009 0.367 4.039 x3 3.073 0.001 1.229 4.982 x4 4.013 0.000 2.111 5.946 x5 4.987 0.000 3.054 6.889 x6 5.927 0.000 4.018 7.771 x7 6.809 0.000 4.961 8.633 x8 7.660 0.000 5.864 9.433 x9 8.465 0.000 6.705 10.068 =============== Dep. Variable y --------------- """ _results_cls = NonparametricResults def __init__( self, *args: Any, num_fit: Union[str, int] = "auto", num_interp: int = 1_000, lr: float = 1e-3, betas: tuple[float, float] = (0.9, 0.99), eps: float = 1e-8, train_iter: int = 1_000, **kwargs: Any, ): super().__init__(*args, **kwargs) std = self.mean.std() if num_fit == "auto": # this is a very simple heuristic that seems to work well for many datasets # ideally, ``num_fit`` should be tuned std_ratio = std / np.sqrt(self.cov.diagonal()).mean() num_fit = max( MIN_PARAMS, int( np.log(self.n_params) / LOG_BASE / (max(0, np.log10(std_ratio)) + 1) ), ) # x-values of the Dirac-Delta prior x = np.linspace( self.mean.min() - STDS_FROM_MIN_MAX * std, self.mean.max() + STDS_FROM_MIN_MAX * std, num=num_fit, ) # compute (n_means, num) array f(m | \mu) scale = np.sqrt(self.cov.diagonal()) # note that norm.logpdf(m, loc=mu) = norm.logpdf(mu, loc=m) because the normal # distribution is symmetric log_pdf_mean_given_mu = np.array( [ norm.logpdf(x, loc=loc_i, scale=scale_i) for loc_i, scale_i in zip(self.mean, scale) ] ) pdf_mean_given_mu = np.exp(log_pdf_mean_given_mu - log_pdf_mean_given_mu.max()) # optimize the log(pmf) values of the Dirac-Delta prior to maximize f(m) using Adam # optimizer pmf = pdf_mean_given_mu.mean(axis=0) # initialize log_pmf to the optimal values if the means were estimated without noise log_pmf = np.log(pmf / pmf.sum()) momentum, velocity = 0, 0 for _ in range(train_iter): pmf = np.exp(log_pmf) prod = (pmf * pdf_mean_given_mu).sum(axis=1, keepdims=True) grad = pmf / (1 - pmf) * (1 - (pdf_mean_given_mu / prod).mean(axis=0)) momentum = betas[0] * momentum + (1 - betas[0]) * grad velocity = betas[1] * velocity + (1 - betas[1]) * grad**2 momentum_hat = momentum / (1 - betas[0]) velocity_hat = velocity / (1 - betas[1]) log_pmf -= lr * momentum_hat / (np.sqrt(velocity_hat) + eps) pmf = np.exp(log_pmf) log_pmf = np.log(pmf / pmf.sum()) # interpolate between the fitted values to get a smoother prior self._values = np.linspace(x[0], x[-1], num=num_interp) pmf = np.interp(self._values, x, pmf) self._prior = rv_discrete(values=(self._values, pmf / pmf.sum())) # cache values for the computing the posterior self._logpdf_mean_given_mu = np.array( [ norm.logpdf(self._prior.xk, loc=loc_i, scale=scale_i) for loc_i, scale_i in zip(self.mean, scale) ] ) def _get_marginal_prior(self, index: int) -> rv_discrete: return self._prior def _get_marginal_distribution(self, index: int) -> rv_discrete: log_pmf = self._logpdf_mean_given_mu[index] + np.log(self._prior.pk) pmf = np.exp(log_pmf - log_pmf.max()) return rv_discrete(values=(self._prior.xk, pmf / pmf.sum()))