"""Nonparametric empirical Bayes.
References:
.. code-block::
@article{cai2021nonparametric,
title={Nonparametric empirical bayes estimation and testing for sparse and heteroscedastic signals},
author={Cai, Junhui and Han, Xu and Ritov, Ya'acov and Zhao, Linda},
journal={arXiv preprint arXiv:2106.08881},
year={2021}
}
Notes:
This implementation is based on Cai et al.'s nonparametric Dirac delta prior. Future
work should also implement their mixture model with a Laplace prior.
"""
from __future__ import annotations
from typing import Any, Union
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from scipy.stats import norm, rv_discrete
from .base import BayesBase, BayesResults, ColumnType
# x-values of the Dirac-Delta prior extend some number of standard deviations from the
# min and max values of the conventionally-estimated means
STDS_FROM_MIN_MAX = 2
# minimum number of parameters of the Dirac-Delta prior when ``num_fit`` is "auto"
MIN_PARAMS = 5
# base parameter used for inferring the number of parameters when ``num_fit`` is "auto"
LOG_BASE = 1.3
# epsilon to add to the posterior rvs to break ties (because the posteriors are discrete)
EPS = 1e-8
[docs]
class NonparametricResults(BayesResults):
@property
def _posterior_rvs(self):
if hasattr(self, "_cached_posterior_rvs"):
return self._cached_posterior_rvs
self._cached_posterior_rvs = super()._posterior_rvs
self._cached_posterior_rvs + EPS * np.random.rand(
*self._cached_posterior_rvs.shape
)
return self._cached_posterior_rvs
[docs]
def line_plot(
self,
column: ColumnType = None,
alpha: float = 0.05,
title: str = None,
yname: str = None,
ax=None,
):
"""Create a line plot of the prior, conventional, and posterior estimates.
Args:
column (ColumnType, optional): Selected parameter. Defaults to None.
alpha (float, optional): Sets the plot width. 0 is as wide as possible, 1 is
as narrow as possible. Defaults to .05.
title (str, optional): Plot title. Defaults to None.
yname (str, optional): Name of the dependent variable. Defaults to None.
ax (AxesSubplot, optional): Axis to write on.
Returns:
AxesSubplot: Plot.
"""
index = self.model.get_index(column)
prior = self.model.get_marginal_prior(index)
posterior = self.marginal_distributions[index]
conventional = norm(
self.model.mean[index], np.sqrt(self.model.cov[index, index])
)
xlim = np.array(
[
dist.ppf([alpha / 2, 1 - alpha / 2])
for dist in (prior, conventional, posterior)
]
).T
mask = (xlim[0].min() <= prior.xk) & (prior.xk <= xlim[1].max())
palette = sns.color_palette()
if ax is None:
_, ax = plt.subplots()
sns.lineplot(x=prior.xk[mask], y=prior.pk[mask], label="prior", ax=ax)
ax.axvline(prior.mean(), linestyle="--", color=palette[0])
conventional_pmf = conventional.pdf(prior.xk[mask])
conventional_pmf /= conventional_pmf.sum()
sns.lineplot(x=prior.xk[mask], y=conventional_pmf, label="conventional")
ax.axvline(conventional.mean(), linestyle="--", color=palette[1])
sns.lineplot(
x=posterior.xk[mask], y=posterior.pk[mask], label="posterior", ax=ax
)
ax.axvline(posterior.mean(), linestyle="--", color=palette[2])
ax.set_title(title or self.model.exog_names[index])
ax.set_xlabel(yname or self.model.endog_names)
return ax
[docs]
class Nonparametric(BayesBase):
"""Bayesian model with a nonparametric Dirac delta prior.
Args:
num_fit (Union[str, int], optional): Number of PMF parameters to fit in the Dirac-Delta prior. Defaults to "auto".
num_interp (int, optional): Number of PMF parameters to interpolate for the Dirac-Delta prior. Defaults to 1_000.
lr (float, optional): Learning rate for the Adam optimizer. Defaults to 1e-3.
betas (tuple[float, float], optional): Beta parameters for Adam. Defaults to (0.9, 0.99).
eps (float, optional): Epsilon parameter for Adam. Defaults to 1e-8.
train_iter (int, optional): Number of iterations of gradient descent. Defaults to 1_000.
Examples:
.. testcode::
import numpy as np
from multiple_inference.bayes import Nonparametric
np.random.seed(0)
model = Nonparametric(np.arange(10), np.identity(10))
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Bayesian estimates
=======================================
coef pvalue (1-sided) [0.025 0.975]
---------------------------------------
x0 0.535 0.284 -1.068 2.295
x1 1.340 0.073 -0.433 3.136
x2 2.191 0.009 0.367 4.039
x3 3.073 0.001 1.229 4.982
x4 4.013 0.000 2.111 5.946
x5 4.987 0.000 3.054 6.889
x6 5.927 0.000 4.018 7.771
x7 6.809 0.000 4.961 8.633
x8 7.660 0.000 5.864 9.433
x9 8.465 0.000 6.705 10.068
===============
Dep. Variable y
---------------
"""
_results_cls = NonparametricResults
def __init__(
self,
*args: Any,
num_fit: Union[str, int] = "auto",
num_interp: int = 1_000,
lr: float = 1e-3,
betas: tuple[float, float] = (0.9, 0.99),
eps: float = 1e-8,
train_iter: int = 1_000,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
std = self.mean.std()
if num_fit == "auto":
# this is a very simple heuristic that seems to work well for many datasets
# ideally, ``num_fit`` should be tuned
std_ratio = std / np.sqrt(self.cov.diagonal()).mean()
num_fit = max(
MIN_PARAMS,
int(
np.log(self.n_params) / LOG_BASE / (max(0, np.log10(std_ratio)) + 1)
),
)
# x-values of the Dirac-Delta prior
x = np.linspace(
self.mean.min() - STDS_FROM_MIN_MAX * std,
self.mean.max() + STDS_FROM_MIN_MAX * std,
num=num_fit,
)
# compute (n_means, num) array f(m | \mu)
scale = np.sqrt(self.cov.diagonal())
# note that norm.logpdf(m, loc=mu) = norm.logpdf(mu, loc=m) because the normal
# distribution is symmetric
log_pdf_mean_given_mu = np.array(
[
norm.logpdf(x, loc=loc_i, scale=scale_i)
for loc_i, scale_i in zip(self.mean, scale)
]
)
pdf_mean_given_mu = np.exp(log_pdf_mean_given_mu - log_pdf_mean_given_mu.max())
# optimize the log(pmf) values of the Dirac-Delta prior to maximize f(m) using Adam
# optimizer
pmf = pdf_mean_given_mu.mean(axis=0)
# initialize log_pmf to the optimal values if the means were estimated without noise
log_pmf = np.log(pmf / pmf.sum())
momentum, velocity = 0, 0
for _ in range(train_iter):
pmf = np.exp(log_pmf)
prod = (pmf * pdf_mean_given_mu).sum(axis=1, keepdims=True)
grad = pmf / (1 - pmf) * (1 - (pdf_mean_given_mu / prod).mean(axis=0))
momentum = betas[0] * momentum + (1 - betas[0]) * grad
velocity = betas[1] * velocity + (1 - betas[1]) * grad**2
momentum_hat = momentum / (1 - betas[0])
velocity_hat = velocity / (1 - betas[1])
log_pmf -= lr * momentum_hat / (np.sqrt(velocity_hat) + eps)
pmf = np.exp(log_pmf)
log_pmf = np.log(pmf / pmf.sum())
# interpolate between the fitted values to get a smoother prior
self._values = np.linspace(x[0], x[-1], num=num_interp)
pmf = np.interp(self._values, x, pmf)
self._prior = rv_discrete(values=(self._values, pmf / pmf.sum()))
# cache values for the computing the posterior
self._logpdf_mean_given_mu = np.array(
[
norm.logpdf(self._prior.xk, loc=loc_i, scale=scale_i)
for loc_i, scale_i in zip(self.mean, scale)
]
)
def _get_marginal_prior(self, index: int) -> rv_discrete:
return self._prior
def _get_marginal_distribution(self, index: int) -> rv_discrete:
log_pmf = self._logpdf_mean_given_mu[index] + np.log(self._prior.pk)
pmf = np.exp(log_pmf - log_pmf.max())
return rv_discrete(values=(self._prior.xk, pmf / pmf.sum()))