"""Statistical distributions.
"""
from __future__ import annotations
import warnings
from typing import Any, List, Sequence, Tuple, Union
import numpy as np
from scipy.misc import derivative
from scipy.optimize import NonlinearConstraint, fsolve, minimize
from scipy.stats import norm, rv_continuous, truncnorm as truncnorm_base
from .base import Numeric1DArray
[docs]
class joint_distribution:
"""Join distribution based on independent marginal distributions.
Args:
marginal_distributions (Sequence[rv_continuous]): Marginal distributions.
"""
def __init__(self, marginal_distributions: Sequence[rv_continuous]):
self._marginal_distributions = list(marginal_distributions)
[docs]
def logpdf(self, x: np.ndarray) -> np.ndarray:
"""Log of the probability density function evaluated at ``x``.
Args:
x (np.ndarray): (n, # marginals) matrix of values at which to evaluate the
density function.
Returns:
np.ndarray: (n,) array of log density.
"""
x = np.array(x).reshape(-1, len(self._marginal_distributions))
return np.sum(
[dist.logpdf(x_i) for dist, x_i in zip(self._marginal_distributions, x.T)],
axis=0,
)
[docs]
def pdf(self, x: np.ndarray) -> np.ndarray:
"""Probability density function evaluated at ``x``.
Args:
x (np.ndarray): (n, # marginals) matrix of values at which to evaluate the
density function.
Returns:
np.ndarray: (n,) array of densities.
"""
return np.exp(self.logpdf(x))
[docs]
def rvs(self, size: int = 1) -> np.ndarray:
"""Sample random values.
Args:
size (int, optional): Number of samples to draw. Defaults to 1.
Returns:
np.ndarray: (size, # marginals) matrix of samples.
"""
return np.vstack(
[dist.rvs(size=size) for dist in self._marginal_distributions]
).T
[docs]
class mixture(rv_continuous):
"""Mixture distribution.
Args:
distributions (list[rv_continuous]): List of n distributions to mix over.
weights (Numeric1DArray, optional): (n,) array of mixture weights. Defaults to None.
Attributes:
distributions (list[rv_continuous]): Distributions to mix over.
weights (np.ndarray): Mixture weights.
"""
def __init__(
self,
distributions: list[rv_continuous],
weights: Numeric1DArray = None,
**kwargs: Any,
):
super().__init__(**kwargs)
self.distributions = distributions
self.weights = (
np.ones(len(distributions)) if weights is None else np.atleast_1d(weights)
)
self.weights /= self.weights.sum()
def _pdf(self, x):
return (
self.weights * np.array([dist.pdf(x) for dist in self.distributions]).T
).sum(axis=1)
def _cdf(self, x):
return (
self.weights * np.array([dist.cdf(x) for dist in self.distributions]).T
).sum(axis=1)
[docs]
def mean(self):
return (
self.weights * np.array([dist.mean() for dist in self.distributions])
).sum()
[docs]
def var(self):
return (
self.weights * np.array([dist.var() for dist in self.distributions])
).sum()
[docs]
def std(self):
return np.sqrt(self.var())
[docs]
class quantile_unbiased(rv_continuous): # pylint: disable=invalid-name
"""Conditional quantile-unbiased distribution.
Inherits from `scipy.stats.rv_continuous <https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.rv_continuous.html>`_
and handles standard public methods (``pdf``, ``cdf``, etc.).
Args:
y (float): Value at which the truncated CDF is evaluated
projection_interval (Union[float, Tuple[float, float]], optional): Lower and
upper bounds of the projection confidence interval. Defaults to
(-np.inf, np.inf).
bounds (Tuple[float, float], optional): Lower and upper bounds of the support
of the distribution. Defaults to (-np.inf, np.inf).
dx (float): Used to numerically approximate the PDF.
**truncnorm_kwargs (Any): Keyword arguments for :class:`truncnorm`.
Attributes:
y (float): Value at which the truncated CDF is evaluated.
bounds (Tuple[float, float]): Lower and upper bound of the support of the
distribution.
dx (float): Used to numerically approximate the PDF.
truncnorm_kwargs (dict): Keyword arguments for :class:`truncnorm`.
Examples:
Compute a median-unbiased estimate of a normally distributed variable given
that its observed value is 1 and falls between 0 and 3.
.. doctest::
>>> from multiple_inference.stats import quantile_unbiased
>>> dist = quantile_unbiased(1, truncation_set=[(0, 3)])
>>> dist.ppf(.5)
0.7108033900602351
"""
def __init__(
self,
y: float,
projection_interval: Union[float, Tuple[float, float]] = (-np.inf, np.inf),
bounds: Tuple[float, float] = (-np.inf, np.inf),
dx: float = None,
**truncnorm_kwargs: Any,
):
super().__init__()
self.y = y # pylint: disable=invalid-name
if np.isscalar(projection_interval):
projection_interval = abs(projection_interval) # type: ignore
projection_interval = (-projection_interval, projection_interval)
self.projection_interval = tuple(projection_interval)
self.bounds = bounds
self.truncnorm_kwargs = truncnorm_kwargs
self.dx = dx
self._cdf_min: float = (
0 # type: ignore
if bounds[0] == -np.inf
else 1 - self._truncated_cdf(np.array([bounds[0]]))
)
if self._cdf_min >= 1:
warnings.warn(
"Untruncated CDF of lower bound == 1: try decreasing the lower bound",
RuntimeWarning,
)
self._cdf_max: float = (
1 # type: ignore
if bounds[1] == np.inf
else 1 - self._truncated_cdf(np.array([bounds[1]]))
)
if self._cdf_max <= 0:
warnings.warn(
"Untruncated CDF of upper bound == 0: try increasing the upper bound",
RuntimeWarning,
)
@property
def dx(self): # pylint: disable=missing-docstring
# create a default dx value if this property has not yet been set
if self._dx is None:
self.dx = np.diff(self.ppf([0.95, 0.05])) / 50
return self._dx
@dx.setter
def dx(self, value):
self._dx = value
@property
def bounds(self): # pylint: disable=missing-docstring
# Potentially restrict the bounds of this distribution to ensure the truncation
# set and # projection interval overlap
truncation_set = self.truncnorm_kwargs.get("truncation_set")
if truncation_set is None or self.projection_interval == (-np.inf, np.inf):
return self._bounds
a, b = zip(*truncation_set) # pylint: disable=invalid-name
return (
max(self._bounds[0], np.min(a) - self.projection_interval[1]),
min(self._bounds[1], np.max(b) - self.projection_interval[0]),
)
@bounds.setter
def bounds(self, bounds):
self._bounds = bounds
def _truncated_cdf( # pylint: disable=invalid-name
self, x: np.ndarray
) -> np.ndarray:
"""Compute the truncated CDF evaluated at ``self.y`` with shift ``x``."""
def truncated_cdf(x_i):
# get the intersection of the truncation set and the projection confidence
# interval centered on x_i
intersection = []
for interval in truncation_set:
clipped_interval = (
float(max(interval[0], x_i + self.projection_interval[0])),
float(min(interval[1], x_i + self.projection_interval[1])),
)
if clipped_interval[0] < clipped_interval[1]:
interval = (
normalize(clipped_interval[0], x_i),
normalize(clipped_interval[1], x_i),
)
intersection.append(interval)
return truncnorm(intersection, loc=x_i, **truncnorm_kwargs).cdf(self.y)
def normalize(value, loc):
return (value - loc) / scale
truncation_set = self.truncnorm_kwargs.get("truncation_set")
scale = self.truncnorm_kwargs.get("scale", 1)
truncnorm_kwargs = {
key: value
for key, value in self.truncnorm_kwargs.items()
if key != "truncation_set"
}
rval = np.array([truncated_cdf(x_i) for x_i in x])
return rval[0] if len(rval) == 1 else rval
def _cdf(self, x: np.ndarray) -> np.ndarray: # pylint: disable=arguments-differ
"""Cumulative distribution function.
Args:
x (np.ndarray): (n,) array of values at which to evaluate the CDF.
Returns:
np.ndarray: (n,) array of evaluations.
"""
if self._cdf_min >= 1 or self._cdf_max <= 0:
def handle_cdf_out_of_bounds(x_i):
return self.bounds[0] <= x_i if self.y < x_i else self.bounds[1] < x_i
warnings.warn(
"Untruncated CDF of lower bound >= 1 or CDF of upper bound <= 0"
)
return np.array([handle_cdf_out_of_bounds(x_i) for x_i in x]).astype(float)
cdf = (1 - self._truncated_cdf(x) - self._cdf_min) / (
self._cdf_max - self._cdf_min
)
return ((self.bounds[0] < x) & (x < self.bounds[1])) * cdf + (
self.bounds[1] <= x
).astype(float)
def _pdf(self, x: np.ndarray) -> np.ndarray: # pylint: disable=arguments-differ
"""Probability density function.
Args:
x (np.ndarray): (n,) array of values at which to evaluate the PDF.
Returns:
np.ndarray: (n,) array of evaluations.
"""
pdf = derivative(self._cdf, x, dx=self.dx, order=5)
return ((self.bounds[0] < x) & (x < self.bounds[1])) * pdf
def _ppf(self, q: np.ndarray) -> np.ndarray: # pylint: disable=arguments-differ
"""See self.ppf."""
def func(mu, q_i):
return self._truncated_cdf(mu) - (1 - q_i)
if self._cdf_min >= 1 or self._cdf_max <= 0:
warnings.warn(
"Untruncated CDF of lower bound >= 1 or CDF of upper bound <= 0"
)
value = self.bounds[0] if self.bounds[0] > self.y else self.bounds[1]
return np.full(q.shape, value)
q_t = np.atleast_1d(q) * (self._cdf_max - self._cdf_min) + self._cdf_min
return np.array([fsolve(func, [self.y], args=(q_i,))[0] for q_i in q_t])
[docs]
def ppf( # pylint: disable=arguments-differ
self, q: Union[float, Numeric1DArray]
) -> np.ndarray:
"""Percent point function.
Args:
q (np.ndarray): (n,) array of quantiles at which to evaluate the PPF.
Returns:
np.ndarray: (n,) array of evaluations.
"""
return np.clip(super().ppf(q), *self.bounds)
[docs]
class truncnorm(rv_continuous): # pylint: disable=invalid-name
"""Truncated normal distribution.
Inherits from `scipy.stats.rv_continuous <https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.rv_continuous.html>`_
and handles standard public methods (``pdf``, ``cdf``, etc.).
This uses the `exponential tilting <https://ieeexplore.ieee.org/document/7408180>`_
approximation method.
Args:
truncation_set (List[Tuple[float, float]], optional): List of truncation
intervals, e.g., ``[(-1, 0), (1, 2)]`` truncates the distribution to
[-1, 0] union [1, 2]. Defaults to None.
loc (float, optional): Location. Defaults to 0.
scale (float, optional): Scale parameter. Defaults to 1.
n_samples (int, optional): Number of samples to draw for approximation.
Defaults to 10000.
seed (int, optional): Random seed. Defaults to 0.
Attributes:
loc (float): Location parameter.
scale (float): Scale parameter.
lower_bound (np.array): (# intervals,) array of lower bounds of the truncation
intervals.
upper_bound (np.array): (# intervals,) array of upper bounds of the truncation
intervals.
interval_masses (np.array): (# intervals,) array of the amount of mass in each
truncation interval.
n_samples (int): Number of samples to draw for approximation. Defaults to
10000.
Examples:
Let's evaluate the CDF of a standard normal truncated to the interval (-1, 0) at -0.5.
.. testcode::
from multiple_inference.stats import truncnorm
print(truncnorm([(-1, 0)]).cdf(-.5))
.. testoutput::
0.4390935748119969
Let's evaluate the CDF of a standard normal truncated to the union of (-1, 0) and (1, 2) at -0.5.
.. testcode::
from multiple_inference.stats import truncnorm
print(truncnorm([(-1, 0), (1, 2)]).cdf(-.5))
.. testoutput::
0.3140541146849627
Note:
The truncation set is defined over the domain of the standard normal. To
convert the truncation set for a specific mean and standard deviation, use:
.. code-block:: python
>>> truncation_set = [(myclip_a - my_mean) / my_std, (myclip_b - my_mean) / my_std)]
"""
def __init__(
self,
truncation_set: List[Tuple[float, float]] = None,
loc: float = 0,
scale: float = 1,
n_samples: int = 10000,
seed: int = 0,
):
self.seed = seed
self.loc = loc
self.scale = scale
self.n_samples = n_samples
if truncation_set is None:
truncation_set = [(-np.inf, np.inf)]
self.lower_bound, self.upper_bound = self._get_truncation_bounds(truncation_set)
self.interval_masses = np.array(
[
self._compute_mass_in_interval_avg(a, b)
for a, b in zip(self.lower_bound, self.upper_bound)
]
)
super().__init__()
def _pdf(self, x): # pylint: disable=arguments-differ
x = self._normalize(x)
# n x 1 indicator that x is in an interval
in_interval = np.array(
[np.any((self.lower_bound <= x_i) & (x_i <= self.upper_bound)) for x_i in x]
)
return in_interval * norm.pdf(x) / (self.scale * self.interval_masses.sum())
def _logpdf(self, x): # pylint: disable=arguments-differ
x = self._normalize(x)
# n x 1 indicator that x is in an interval
in_interval = np.array(
[np.any((self.lower_bound <= x_i) & (x_i <= self.upper_bound)) for x_i in x]
)
logpdf = (
norm.logpdf(x) - np.log(self.scale) - np.log(self.interval_masses.sum())
)
logpdf[~in_interval] = -np.inf
return logpdf
def _cdf(self, x): # pylint: disable=arguments-differ
x = self._normalize(x)
if self.lower_bound.size == self.upper_bound.size == 0:
# i.e., truncation set is empty
return (x > 0).astype(float)
denominator = self.interval_masses.sum()
if denominator == 0:
# converts cdf to 0 or 1 depending on bounds and x
a = self.lower_bound.min()
b = self.upper_bound.max()
convert_0_1 = lambda x_i: a < x_i if x_i > 0 else b < x_i
return np.array([convert_0_1(x_i) for x_i in x]).astype(float)
return np.clip(self._compute_cdf_numerator(x) / denominator, 0, 1)
def _logcdf(self, x): # pylint: disable=arguments-differ
x = self._normalize(x)
denominator = self.interval_masses.sum()
return np.log(self._compute_cdf_numerator(x)) - np.log(denominator)
def _compute_mass_in_interval_avg(self, a, b):
# compute the amount of mass in the interval [a, b] by averaging approximate
# mass in [a, b] and [-b, -a]
# the amount of mass in these intervals is the same because the normal is symmetric
# this can improve performance
arr = [
self._compute_mass_in_interval(a, b, int(self.n_samples / 2)),
self._compute_mass_in_interval(-b, -a, int(self.n_samples / 2)),
]
return np.mean(arr, where=~np.isnan(arr))
def _compute_mass_in_interval(self, a, b, n_samples=None):
# compute the amount of mass in the interval [a, b] using minimax exponential tilt
def compute_psi(params):
x, mu = params
return -x * mu + (0.5 * mu**2 + np.log(norm.cdf(b, mu) - norm.cdf(a, mu)))
def d_psi_d_mu(params):
# derivative of psi with respect to mu
x, mu = params
return (
-x
+ mu
+ (norm.pdf(b, mu) - norm.pdf(a, mu))
/ ((norm.cdf(b, mu) - norm.cdf(a, mu)))
)
def optimize_params(x0):
# maximize psi subject to x being contained in the interval and the
# derivative of psi wrt mu =0 0
derivative_constraint = NonlinearConstraint(d_psi_d_mu, 0, 0)
return minimize(
lambda x: -compute_psi(x),
x0=x0,
bounds=[(a, b), (-np.inf, np.inf)],
constraints=[derivative_constraint],
)
def compute_tilting_param():
# compute the optimal tilting parameter
try:
# initial guess for x, denoted as Psi in the paper
# in the univariate case, the initial guess for mu is 0
x_init = (norm.pdf(a) - norm.pdf(b)) / ((norm.cdf(b) - norm.cdf(a)))
except ZeroDivisionError:
x_init = a if a < 0 else b
if a < x_init < b:
# optimal x is in the truncation set, so optimal mu is 0
return 0
# optimal mu must be found by non-linear optimization
res = optimize_params([x_init, 0])
if res.success:
return res.x[1]
next_guess, final_guess = ([a, a], [b, b]) if a > 0 else ([b, b], [a, a])
res = optimize_params(next_guess)
if res.success:
return res.x[1]
res = optimize_params(final_guess)
if res.success:
return res.x[1]
warnings.warn(
"Optimizer failed to find truncated normal tilt parameter",
RuntimeWarning,
)
return x_init
if b < a:
return 0
mu = compute_tilting_param()
x = truncnorm_base.rvs(
a - mu, b - mu, mu, size=n_samples or self.n_samples, random_state=self.seed
)
return np.exp(compute_psi((x, mu))).mean()
def _compute_cdf_numerator(self, x):
# n x p indicates x is above the upper bound of the interval
index = np.array([self.upper_bound <= x_i for x_i in x])
# n x 1 CDF for the intervals where x is above the upper bound
cdf = index @ self.interval_masses
# tuple of (x index, interval index) such that x[x index] is in interval[interval_index]
indices = np.where(
[(self.lower_bound < x_i) & (x_i < self.upper_bound) for x_i in x]
)
# add mass from intervals containing x
cdf[indices[0]] += np.array(
[
self._compute_mass_in_interval_avg(
self.lower_bound[interval_index], x[x_index]
)
for x_index, interval_index in zip(*indices)
]
)
return cdf
def _get_truncation_bounds(self, truncation_set):
if not truncation_set:
return np.array([]), np.array([])
for interval in truncation_set:
if interval[1] < interval[0]:
raise ValueError(f"Invalid interval {interval}")
truncation_set.sort(key=lambda x: x[0])
a, b = list(zip(*truncation_set))
# ensure b is strictly increasing
b = [b[0]] + [max(b_i, b_j) for b_i, b_j in zip(b[1:], b[:-1])]
new_a, new_b = [a[0]], []
for a_i, b_i in zip(a[1:], b[:-1]):
if a_i > b_i:
new_b.append(b_i)
new_a.append(a_i)
new_b.append(b[-1])
return np.array(new_a), np.array(new_b)
def _normalize(self, arr):
return (arr - self.loc) / self.scale