"""Simultaneous confidence sets and multiple hypothesis testing.
References:
.. code-block::
@article{storey2003statistical,
title={Statistical significance for genomewide studies},
author={Storey, John D and Tibshirani, Robert},
journal={Proceedings of the National Academy of Sciences},
volume={100},
number={16},
pages={9440--9445},
year={2003},
publisher={National Acad Sciences}
}
@article{romano2005stepwise,
title={Stepwise multiple testing as formalized data snooping},
author={Romano, Joseph P and Wolf, Michael},
journal={Econometrica},
volume={73},
number={4},
pages={1237--1282},
year={2005},
publisher={Wiley Online Library}
}
@techreport{mogstad2020inference,
title={Inference for ranks with applications to mobility across neighborhoods and academic achievement across countries},
author={Mogstad, Magne and Romano, Joseph P and Shaikh, Azeem and Wilhelm, Daniel},
year={2020},
institution={National Bureau of Economic Research}
}
"""
from __future__ import annotations
import warnings
from itertools import combinations
from typing import Any, Sequence, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.optimize import minimize
from scipy.stats import multivariate_normal, norm
from multiple_inference.base import ColumnsType, ModelBase, ResultsBase
def _test_hypotheses(
z_stat_rvs: np.ndarray,
z_stats: np.ndarray,
max_rvs: np.ndarray,
alpha: float,
fast: Union[bool, str],
) -> np.ndarray:
"""Perform stepwise hypothesis testing.
Args:
rvs (np.ndarray): (# params, # samples) array of standardized samples from the
conventional distribution.
z_stats (np.ndarray): ($ params,) array of z statistics.
max_rvs (np.ndarray): (# samples,) array of maximum random values.
alpha (float): Significance level.
fast (Union[bool, str]): If True, do not use the stepwise procedure.
Raises:
ValueError: ``fast`` must be True, False, or "auto".
Returns:
np.ndarray: (# params,) boolean array indicating that hypothesis k was rejected.
"""
if fast not in ("auto", True, False):
raise ValueError(f"`fast` must be True, False, or 'auto', got {fast}.")
if not fast or (fast == "auto" and z_stat_rvs.shape[1] < 10000):
# stepwise rejection will take a long time with more than 10000 parameters
rejected, newly_rejected = np.full(z_stat_rvs.shape[1], False), None
while newly_rejected is None or (newly_rejected.any() and not rejected.all()):
critical_value = np.quantile(
z_stat_rvs[:, ~rejected].max(axis=1), 1 - alpha
)
newly_rejected = (z_stats > critical_value) & ~rejected
rejected = rejected | newly_rejected
else:
rejected = z_stats > np.quantile(max_rvs, 1 - alpha)
return rejected
[docs]
class ConfidenceSetResults(ResultsBase):
"""Results for simultaneous confidence sets.
Subclasses :class:`multiple_inference.base.ResultsBase`.
Args:
n_samples (int, optional): Number of samples to draw when approximating the
confidence set. Defaults to 10000.
"""
_default_title = "Confidence set results"
def __init__(self, *args: Any, n_samples: int = 10000, **kwargs: Any):
super().__init__(*args, **kwargs)
self.params = self.model.mean.copy()
# draw random values for confidence set approximation
self._std_diagonal = np.sqrt(self.model.cov.diagonal())
self._z_stats = self.params / self._std_diagonal
self._z_stat_rvs = (
multivariate_normal.rvs(
np.zeros(self.model.n_params),
self.model.cov,
size=n_samples,
random_state=self.model.random_state,
)
/ self._std_diagonal
)
self._max_z_stats = (
self._z_stat_rvs.copy()
if self.model.n_params == 1
else abs(self._z_stat_rvs).max(axis=1)
)
self._set_pvalues()
self._set_qvalues()
def _set_pvalues(self):
if self.model.n_params == 1:
self.pvalues = 2 * np.atleast_1d(
norm.cdf(-abs(self.model.mean[0]), 0, np.sqrt(self.model.cov[0, 0]))
)
else:
self.pvalues = (
np.expand_dims(abs(self._z_stats), -1) < self._max_z_stats
).mean(axis=1)
def _set_qvalues(self):
if self.model.n_params == 1:
self.qvalues = self.pvalues.copy()
else:
# get conventional pvalues
pvalues = norm.cdf(self.params / self._std_diagonal)
# convert to 2-sided pvalues
pvalues = 2 * np.min([pvalues, 1 - pvalues], axis=0)
argsort = (-pvalues).argsort()
pvalues = pvalues[argsort]
# define x over (0, .9 * max pvalue)
# extending x too close to the max pvalue introduces noise
x = np.linspace(0, 0.9 * pvalues[0])
pi0 = (pvalues > np.expand_dims(x, 1)).mean(axis=1) / (1 - x)
# fit an exponential smoothing spline to estimate the true null rate
spline = lambda x, params: params[0] + params[1] * np.exp(
-params[1] * x / params[2]
)
params = minimize(
lambda params: ((spline(x, params) - pi0) ** 2).sum(), [0, 1, 1]
).x
true_null_rate = np.clip(spline(pvalues[0], params), 0, 1)
# compute qvalues
qvalues = [1]
for i, pvalue in enumerate(pvalues):
qvalues.append(
min(
true_null_rate * pvalue * len(pvalues) / (len(pvalues) - i),
qvalues[-1],
)
)
self.qvalues = np.array(qvalues[1:])[argsort.argsort()]
def _conf_int(self, alpha: float, indices: np.ndarray) -> np.ndarray:
if self.model.n_params == 1:
return np.atleast_2d(
norm.ppf(
[alpha / 2, 1 - alpha / 2],
self.model.mean[0],
np.sqrt(self.model.cov[0, 0]),
)
)
params = self.params[indices]
arr = np.quantile(self._max_z_stats, 1 - alpha) * self._std_diagonal[indices]
return np.array([params - arr, params + arr]).T
[docs]
def test_hypotheses(
self,
alpha: float = 0.05,
columns: ColumnsType = None,
two_tailed: bool = True,
fast: Union[bool, str] = "auto",
) -> Union[pd.DataFrame, pd.Series]:
"""Test the null hypothesis that the parameter is equal to 0.
Args:
alpha (float, optional): Significance level. Defaults to 0.05.
columns (ColumnsType, optional): Selected columns. Defaults to None.
two_tailed (bool, optional): Run two-tailed hypothesis tests. Set to False
to run one-tailed hypothesis tests. Defaults to True.
fast (Union[bool, str], optional): Avoid the stepdown procedure. Defaults to
"auto".
Returns:
Union[pd.DataFrame, pd.Series]: Results dataframe (if two-tailed) or series
(if one-tailed).
"""
# use pvalues to reject hypotheses to control the familywise error rate
if two_tailed:
rejected = _test_hypotheses(
np.hstack([self._z_stat_rvs, -self._z_stat_rvs]),
np.concatenate([self._z_stats, -self._z_stats]),
self._max_z_stats,
alpha,
fast,
)
else:
rejected = _test_hypotheses(
self._z_stat_rvs,
self._z_stats,
self._max_z_stats,
alpha,
fast,
)
indices = self.model.get_indices(columns, self.exog_names)
if two_tailed:
return pd.DataFrame(
rejected.reshape(2, -1).T[indices],
columns=["param>0", "param<0"],
index=self.exog_names[indices],
)
return pd.Series(
rejected[indices], name="param>0", index=self.exog_names[indices]
)
def _make_summary_header(self, alpha: float) -> list[str]:
return [
"coef (conventional)",
"pvalue",
"qvalue",
f"{1-alpha} CI lower",
f"{1-alpha} CI upper",
]
def _make_summary_data(
self, alpha: float, indices: np.ndarray, **kwargs: Any
) -> np.ndarray:
if not hasattr(self, "params"):
raise AttributeError("Results object does not have `params` attribute.")
return np.hstack(
(
np.array([self.params, self.pvalues, self.qvalues]).T[indices],
self.conf_int(alpha, indices, **kwargs),
)
)
[docs]
class ConfidenceSet(ModelBase):
"""Model for simultaneous confidence sets.
Examples:
.. testcode::
import numpy as np
from multiple_inference.confidence_set import ConfidenceSet
x = np.arange(-1, 2)
cov = np.identity(3) / 10
model = ConfidenceSet(x, cov)
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Confidence set results
================================================================
coef (conventional) pvalue qvalue 0.95 CI lower 0.95 CI upper
----------------------------------------------------------------
x0 -1.000 0.004 0.002 -1.755 -0.245
x1 0.000 1.000 1.000 -0.755 0.755
x2 1.000 0.004 0.002 0.245 1.755
===============
Dep. Variable y
---------------
.. testcode::
print(results.test_hypotheses())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
param>0 param<0
x0 False True
x1 False False
x2 True False
"""
_results_cls = ConfidenceSetResults
[docs]
class AverageComparison(ConfidenceSet):
"""Compare each parameter to the average value across all parameters.
Subclasses :class:`ConfidenceSet`.
Args:
*args (Any): Passed to :class:`ConfidenceSet`.
**kwargs (Any): Passed to :class:`ConfidenceSet`.
Examples:
.. testcode::
import numpy as np
from multiple_inference.confidence_set import AverageComparison
x = np.arange(-1, 2)
cov = np.identity(3) / 10
model = AverageComparison(x, cov)
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Confidence set results
================================================================
coef (conventional) pvalue qvalue 0.95 CI lower 0.95 CI upper
----------------------------------------------------------------
x0 -1.000 0.000 0.000 -1.604 -0.396
x1 0.000 1.000 1.000 -0.604 0.604
x2 1.000 0.000 0.000 0.396 1.604
===============
Dep. Variable y
---------------
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
identity = np.identity(self.n_params)
cov_inv = np.linalg.inv(self.cov)
projection = (
self.X @ np.linalg.inv(self.X.T @ cov_inv @ self.X) @ self.X.T @ cov_inv
)
self.mean = (identity - projection) @ self.mean
cov = (identity - projection) @ self.cov @ (identity - projection).T
if np.isnan(cov).all():
warnings.warn(
f"Error encountered in recomputing covariance matrix. "
"Using original covariance matrix instead.",
RuntimeWarning,
)
else:
self.cov = cov
[docs]
class BaselineComparison(ConfidenceSet):
"""Compare parameters to a baseline parameter.
Subclasses :class:`ConfidenceSet`.
Args:
baseline (Union[int, str]): Index or name of the baseline parameter.
Examples:
.. testcode::
import numpy as np
from multiple_inference.confidence_set import BaselineComparison
x = np.arange(-1, 2)
cov = np.identity(3) / 10
model = BaselineComparison(x, cov, exog_names=["x0", "x1", "x2"], baseline="x0")
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Confidence set results
================================================================
coef (conventional) pvalue qvalue 0.95 CI lower 0.95 CI upper
----------------------------------------------------------------
x1 1.000 0.045 0.012 0.022 1.978
x2 2.000 0.000 0.000 1.022 2.978
===============
Dep. Variable y
---------------
"""
def __init__(self, *args, baseline: Union[int, str], **kwargs):
super().__init__(*args, **kwargs)
index = int(
baseline
if isinstance(baseline, (float, int))
else list(self.exog_names).index(baseline)
)
self.n_params -= 1
self.mean = np.delete(self.mean, index) - self.mean[index]
cov = (
self.cov[index, index]
+ self.cov
- np.expand_dims(self.cov[index], 1)
- np.expand_dims(self.cov[index], 0)
)
self.cov = np.delete(np.delete(cov, index, axis=0), index, axis=1)
if self._exog_names is not None:
self.exog_names = np.delete(self.exog_names, index)
[docs]
class PairwiseComparisonResults(ConfidenceSetResults):
"""Results of pairwise comparisons.
Subclasses :class:`ConfidenceSetResults`.
Args:
n_samples (int, optional): Number of samples to draw to obtain critical
values. Defaults to 10000.
groups (np.ndarray, optional): (# params,) array of parameter groups.
Defaults to None.
Raises:
ValueError: Length of ``groups`` must match the number of parameters.
"""
_default_title = "Pairwise comparisons"
def __init__(
self,
*args: Any,
n_samples: int = 10000,
groups: np.ndarray = None,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
self._groups = (
np.zeros(self.model.n_params) if groups is None else np.array(groups)
)
if len(self._groups) != self.model.n_params:
raise ValueError(
"Length of groups must equal the number of parameters. Got "
f"{self.model.n_params} parameters and groups of length "
f" {len(self._groups)}."
)
self._rvs = multivariate_normal.rvs(
self.model.mean,
self.model.cov,
size=n_samples,
random_state=self.model.random_state,
)
self._exog_names, self._params, self._std_diagonal = [], [], []
self._max_z_stats = np.full(n_samples, -np.inf)
pairwise_groups = []
for group in np.unique(self._groups):
mask = self._groups == group
pairwise_groups += int(0.5 * mask.sum() * (mask.sum() - 1)) * [group]
group_exog_names, group_params, group_variance, group_rvs = (
self.exog_names[mask],
self.params[mask],
self.model.cov[mask][:, mask],
self._rvs[:, mask],
)
group_variance_diag = group_variance.diagonal()
for i in range(mask.sum()):
self._exog_names.append(
[
f"{name} - {group_exog_names[i]}"
for name in group_exog_names[i + 1 :]
]
)
self._params.append(group_params[i + 1 :] - group_params[i])
# variance of the differences between x_i and x_j
self._std_diagonal.append(
np.sqrt(
group_variance_diag[i]
+ group_variance_diag[i + 1 :]
- 2 * group_variance[i, i + 1 :]
)
)
# random sample of the differences between x_i and x_j
delta_rvs = group_rvs[:, i + 1 :] - np.expand_dims(group_rvs[:, i], -1)
self._max_z_stats = np.hstack(
[
np.expand_dims(self._max_z_stats, -1),
abs(delta_rvs - self._params[-1]) / self._std_diagonal[-1],
]
).max(axis=1)
self.exog_names = np.concatenate(self._exog_names)
self.params = np.concatenate(self._params)
self._std_diagonal = np.concatenate(self._std_diagonal)
self._z_stats = self.params / self._std_diagonal
self._pairwise_groups = np.array(pairwise_groups)
self._set_pvalues()
self._set_qvalues()
[docs]
def test_hypotheses(
self,
alpha: float = 0.05,
columns: ColumnsType = None,
criterion: str = "fwer",
groups: Sequence = None,
fast: Union[bool, str] = "auto",
) -> Union[pd.DataFrame, dict[Any, pd.DataFrame]]:
"""Test pairwise hypotheses.
Args:
alpha (float, optional): Significance level. Defaults to .05.
columns (ColumnsType, optional): Selected columns. In wide format, these are
the original column names (e.g., "x0"). In long format, these are the
names of the differences (e.g., "x1 - x0"). Defaults to None.
criterion (str, optional): "fwer" to control for the family-wise error rate
(using pvalues), "fdr" to control for the false discovery rate (using
qvalues). Defaults to "fwer".
groups (Sequence, optional). Selected groups of parameters. Defaults to None.
fast (Union[bool, str], optional): Indicates to use a fast version of the
algorithm. Defaults to "auto".
Raises:
ValueError: ``criterion`` must be one of "fwer" or "fdr".
Returns:
Union[pd.DataFrame, dict[Any, pd.DataFrame]]: Results dataframe if only one
group is used, mapping of group name to results dataframe if multiple
groups are used.
Notes:
When controlling for the familywise error rate, the null hypotheses are
$$mu_k \leq \mu_j$$. When controlling for the false discovery rate, the null
hypotheses are $$\mu_k = \mu_j$$.
"""
if criterion not in ("fwer", "fdr"):
raise ValueError(f"criterion should be in 'fwer', 'fdr', got {criterion}.")
if criterion == "fdr":
rejected = self.qvalues < alpha
rejected = np.vstack([rejected, rejected]).T
else:
if fast not in ("auto", True, False):
raise ValueError(f"`fast` must be True, False, or 'auto', got {fast}.")
if not fast or (fast == "auto" and self.model.n_params < 100):
# stepwise rejection will take a long time with more than 100 pairwise
# comparisons
rejected = self._stepwise_rejection(alpha)
else:
critical_value = np.quantile(self._max_z_stats, 1 - alpha)
rejected = np.vstack(
[self._z_stats > critical_value, self._z_stats < -critical_value]
).T
# reshape rejected array into a square matrix
return_value = {}
for group in groups or np.unique(self._groups):
mask = self._groups == group
pairwise_mask = self._pairwise_groups == group
n_params = mask.sum()
tri = np.full((n_params, n_params), False)
indices = np.triu_indices(n_params, 1)
tri[indices] = rejected[pairwise_mask, 0]
tri[(indices[1], indices[0])] = rejected[pairwise_mask, 1]
exog_names = self.model.exog_names[mask]
indices = self.model.get_indices(columns, exog_names)
column_names = exog_names[indices]
return_value[group] = pd.DataFrame(
tri[indices][:, indices], index=column_names, columns=column_names
)
return (
list(return_value.values())[0] if len(return_value) == 1 else return_value
)
def _stepwise_rejection(self, alpha: float) -> np.ndarray:
# indicates directional hypothesis rejection
rejected_pos = np.full(len(self.params), False)
rejected_neg = np.full(len(self.params), False)
newly_rejected = None
while newly_rejected is None or newly_rejected.any():
max_z_stats = np.full(len(self._rvs), -np.inf)
for group in np.unique(self._groups):
# get the parameters, standard errors, and random samples for this group
mask = self._groups == group
pairwise_mask = self._pairwise_groups == group
rvs = self._rvs[:, mask]
params = self.params[pairwise_mask]
std_diag = self._std_diagonal[pairwise_mask]
group_rejected_pos = rejected_pos[pairwise_mask]
group_rejected_neg = rejected_neg[pairwise_mask]
start, stop = 0, mask.sum() - 1
for i in range(mask.sum()):
# update max z stats in linearly sized chunks
# this reduces memory complexity from quadratic to linear
# which is important when testing many parameters
delta_rvs = rvs[:, i + 1 :] - np.expand_dims(rvs[:, i], -1)
max_z_stats = np.hstack(
[
np.expand_dims(max_z_stats, 1),
((delta_rvs - params[start:stop]) / std_diag[start:stop])[
:, ~group_rejected_pos[start:stop]
],
]
).max(axis=1)
max_z_stats = np.hstack(
[
np.expand_dims(max_z_stats, -1),
-((delta_rvs - params[start:stop]) / std_diag[start:stop])[
:, ~group_rejected_neg[start:stop]
],
]
).max(axis=1)
start, stop = stop, stop + (stop - start - 1)
# compute the critical value and reject hypotheses
critical_value = np.quantile(max_z_stats, 1 - alpha)
rejected = np.concatenate([rejected_pos, rejected_neg])
rejected_pos = self._z_stats > critical_value
rejected_neg = self._z_stats < -critical_value
newly_rejected = np.concatenate([rejected_pos, rejected_neg]) & ~rejected
return np.vstack([rejected_pos, rejected_neg]).T
[docs]
def hypothesis_heatmap(
self,
*args: Any,
title: str = None,
axes=None,
triangular: bool = False,
**kwargs: Any,
):
"""Create a heatmap of pairwise hypothesis tests.
Args:
title (str, optional): Title.
axes (Union[AxesSubplot, Sequence[AxesSubplot]], optional): Axes to write
on. Defaults to None.
triangular (bool, optional): Display the results in a triangular (as opposed
to square) output. Usually, you should set this to True if and only if
your columns are sorted. Defaults to False.
Returns:
np.ndarray: Array of axes.
"""
def write_heatmap(group, matrix, ax):
if triangular:
mask = np.zeros_like(matrix)
mask[np.triu_indices_from(mask)] = True
else:
mask = None
sns.heatmap(
matrix,
cbar=False,
ax=ax,
yticklabels=matrix.index,
xticklabels=matrix.columns,
mask=mask,
square=True,
cmap=sns.color_palette()[3:1:-1],
center=0.5,
)
ax.set_title(
(title or self.title) + ("" if group is None else f" group = {group}")
)
ax.set_yticklabels(ax.get_yticklabels(), rotation=0)
results = self.test_hypotheses(*args, **kwargs)
if isinstance(results, dict):
if axes is None:
fig, axes = plt.subplots(len(results))
fig.tight_layout()
for (group, matrix), ax in zip(results.items(), axes):
write_heatmap(group, matrix, ax)
else:
if axes is None:
_, axes = plt.subplots()
write_heatmap(None, results, axes)
return axes
def _make_summary_header(self, alpha: float) -> list[str]:
return [
"delta (conventional)",
"pvalue",
"qvalue",
f"{1-alpha} CI lower",
f"{1-alpha} CI upper",
]
[docs]
class PairwiseComparison(ConfidenceSet):
"""Compute pairwise comparisons.
Examples:
.. testcode::
import numpy as np
from multiple_inference.confidence_set import PairwiseComparison
x = np.arange(-1, 2)
cov = np.identity(3) / 10
model = PairwiseComparison(x, cov)
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Pairwise comparisons
======================================================================
delta (conventional) pvalue qvalue 0.95 CI lower 0.95 CI upper
----------------------------------------------------------------------
x1 - x0 1.000 0.061 0.017 -0.031 2.031
x2 - x0 2.000 0.000 0.000 0.969 3.031
x2 - x1 1.000 0.061 0.017 -0.031 2.031
===============
Dep. Variable y
---------------
.. testcode::
print(results.test_hypotheses())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
x0 x1 x2
x0 False False True
x1 False False False
x2 False False False
This means that parameter x2 is significantly greater than x0.
"""
_results_cls = PairwiseComparisonResults
[docs]
class MarginalRankingResults(ResultsBase):
"""Marginal ranking results."""
_default_title = "Marginal ranking"
def __init__(
self, model: MarginalRanking, *args: Any, n_samples: int = 10000, **kwargs: Any
):
super().__init__(model, *args, **kwargs)
self.params = (-self.model.mean).argsort().argsort() + 1
self._rvs = multivariate_normal.rvs(
self.model.mean,
self.model.cov,
size=n_samples,
random_state=self.model.random_state,
)
def _conf_int(
self, alpha: float, indices: np.ndarray, fast: Union[bool, str] = "auto"
) -> np.array:
def get_rank_ci(index):
# get the marginal rank confidence interval for `index`
# get random samples for the difference between this index and other parameters
rvs = np.delete(
self._rvs - np.expand_dims(self._rvs[:, index], 1), index, axis=1
)
params = np.delete(self.model.mean - self.model.mean[index], index)
variance = variance_diag[index] + variance_diag - 2 * self.model.cov[index]
std_diagonal = np.sqrt(np.delete(variance, index))
z_stats = params / std_diagonal
# standardize the differences
rvs = (rvs - params) / std_diagonal
rejected = _test_hypotheses(
np.hstack([rvs, -rvs]),
np.concatenate([z_stats, -z_stats]),
abs(rvs).max(axis=1),
alpha,
fast,
)
n_params = int(len(rejected) / 2)
return [rejected[:n_params].sum(), n_params - rejected[n_params:].sum()]
variance_diag = self.model.cov.diagonal()
return np.array([get_rank_ci(i) for i in indices]) + 1
def _make_summary_header(self, alpha: float) -> list[str]:
return [
"rank (conventional)",
"pvalue",
f"{1-alpha} CI lower",
f"{1-alpha} CI upper",
]
[docs]
class MarginalRanking(ConfidenceSet):
"""Estimate rankings with marginal confidence intervals.
Subclasses :class:`ConfidenceSet`.
Examples:
.. testcode::
import numpy as np
from multiple_inference.confidence_set import MarginalRanking
x = np.arange(-1, 2)
cov = np.diag([1, 2, 3]) / 10
model = MarginalRanking(x, cov)
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Marginal ranking
=========================================================
rank (conventional) pvalue 0.95 CI lower 0.95 CI upper
---------------------------------------------------------
x0 3.000 nan 2.000 3.000
x1 2.000 nan 1.000 3.000
x2 1.000 nan 1.000 2.000
===============
Dep. Variable y
---------------
"""
_results_cls = MarginalRankingResults
[docs]
class SimultaneousRankingResults(ResultsBase):
"""Simultaneous ranking results."""
_default_title = "Simultaneous ranking"
def __init__(
self,
model: SimultaneousRanking,
*args: Any,
n_samples: int = 10000,
**kwargs: Any,
):
super().__init__(model, *args, **kwargs)
self.params = (-model.mean).argsort().argsort() + 1
pairwise_model = PairwiseComparison(model.mean, model.cov)
self._pairwise_comparison = pairwise_model.fit(n_samples=n_samples)
# compute test statistics for finding the top tau parameters
# self._test_stats is a (# params, # params) matrix where
# `self._test_stats[tau, k]`` is the test statistic for the null hypothesis that
# the parameter k is not in the top tau parameters
indices = np.triu_indices(model.n_params, 1)
self._test_stats = np.full((model.n_params, model.n_params), 0.0)
self._test_stats[indices] = -self._pairwise_comparison._z_stats
self._test_stats[(indices[1], indices[0])] = self._pairwise_comparison._z_stats
self._test_stats = np.sort(self._test_stats, 0)[::-1]
def _conf_int(self, alpha: float, indices: np.ndarray, **kwargs: Any) -> np.ndarray:
hypothesis_matrix = self._pairwise_comparison.test_hypotheses(
alpha, **kwargs
).values
return (
np.array(
[
hypothesis_matrix.sum(axis=1),
self.model.n_params - hypothesis_matrix.sum(axis=0) - 1,
]
).T[indices]
+ 1
)
[docs]
def compute_best_params(
self,
n_best_params: int = 1,
alpha: float = 0.05,
superset: bool = True,
fast: Union[bool, str] = "auto",
) -> pd.Series:
"""Compute the set of best (largest) parameters.
Find the set of parameters such that the truly best ``n_best_params`` parameters
are in this set with probability ``1-alpha``. Or, find the set of parameters
such that these parameters are in the truly best ``n_best_params`` parameters
with probability ``1-alpha``.
Args:
n_best_params (int, optional): Number of best parameters. Defaults to 1.
alpha (float, optional): Significance level. Defaults to 0.05.
superset (bool, optional): Indicates that the returned set is a superset of
the truly best n parameters. If False, the returned set is a subset of
the truly best n parameters. Defaults to True.
fast (Union[bool, str], optional). Indicates to use a fast version of the
selection algorithm. Defaults to "auto".
Returns:
pd.Series: Indicates which parameters are in the selected set.
"""
if not isinstance(fast, bool) and fast != "auto":
raise ValueError(f"`fast` must be bool or 'auto', got {fast}.")
if not fast or (fast == "auto" and self.model.n_params < 100):
# perform stepwise rejection algorithm
# this will take a long time when testing more than 100 parameters
if superset:
test_stats = self._test_stats[n_best_params - 1]
else:
test_stats = -self._test_stats[n_best_params]
n_best_params = self.model.n_params - n_best_params
rejected = self._stepwise_rejection(test_stats, n_best_params, alpha)
else:
# perform rejections based on confidence intervals
conf_int = self.conf_int(alpha)
rejected = (
conf_int[:, 0] > n_best_params
if superset
else conf_int[:, 1] <= n_best_params
)
return pd.Series(
~rejected if superset else rejected, index=self.model.exog_names
)
def _stepwise_rejection(
self, test_stats: np.ndarray, n_best_params: int, alpha: float
) -> np.ndarray:
cov_diag = self.model.cov.diagonal()
rejected, newly_rejected = np.full(self.model.n_params, False), None
while newly_rejected is None or (newly_rejected.any() and not rejected.all()):
# perform stepwise rejection of parameters that cannot be the best
max_critical_value = -np.inf
for indices_k in combinations(
np.arange(self.model.n_params), n_best_params - 1
):
# compute maximum critical value over all possible subsets of rejected
# hypotheses (indices_k)
max_z_stats = np.full(len(self._pairwise_comparison._rvs), -np.inf)
subset = np.full(self.model.n_params, True)
if len(indices_k) > 0:
subset[list(indices_k)] = False
for index_j in np.where(~rejected)[0]:
# update max z statistics using the difference between one parameter
# (index_j) # and the rest each iteration
# this reduces memory complexity
subset_j = subset.copy()
subset_j[index_j] = False
delta_rvs = (
np.expand_dims(self._pairwise_comparison._rvs[:, index_j], 1)
- self._pairwise_comparison._rvs[:, subset_j]
)
params = self.model.mean[index_j] - self.model.mean[subset_j]
std = np.sqrt(
cov_diag[index_j]
+ cov_diag[subset_j]
- 2 * self.model.cov[index_j, subset_j]
)
max_z_stats = np.hstack(
[np.expand_dims(max_z_stats, 1), (delta_rvs - params) / std]
).max(axis=1)
if max_critical_value < (
critical_value := np.quantile(max_z_stats, 1 - alpha)
):
max_critical_value = critical_value
newly_rejected = (test_stats > max_critical_value) & ~rejected
rejected = newly_rejected | rejected
return rejected
def _make_summary_header(self, alpha: float) -> list[str]:
return [
"rank (conventional)",
"pvalue",
f"{1-alpha} CI lower",
f"{1-alpha} CI upper",
]
[docs]
class SimultaneousRanking(ConfidenceSet):
"""Estimate rankings with simultaneous confidence intervals.
Subclasses :class:`ConfidenceSet`.
Examples:
.. testcode::
import numpy as np
from multiple_inference.confidence_set import SimultaneousRanking
x = np.arange(3)
cov = np.identity(3) / 10
model = SimultaneousRanking(x, cov)
results = model.fit()
print(results.summary())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
Simultaneous ranking
=========================================================
rank (conventional) pvalue 0.95 CI lower 0.95 CI upper
---------------------------------------------------------
x0 3.000 nan 2.000 3.000
x1 2.000 nan 1.000 3.000
x2 1.000 nan 1.000 2.000
===============
Dep. Variable y
---------------
.. testcode::
print(results.compute_best_params())
.. testoutput::
:options: -ELLIPSIS, +NORMALIZE_WHITESPACE
x0 False
x1 False
x2 True
dtype: bool
This we can be 95% confident that the best (largest) parameter is x2.
"""
_results_cls = SimultaneousRankingResults