"""Base classes.
"""
from __future__ import annotations
import pickle
from typing import Any, Optional, Sequence, Type, TypeVar, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from statsmodels.base.model import LikelihoodModelResults
from statsmodels.iolib.summary import Summary
from statsmodels.iolib.table import SimpleTable
# TODO: remove type ignore statements for starred arguments when this issue is fixed
# https://github.com/python/mypy/issues/6799
ColumnType = Union[str, int]
ColumnsType = Union[Sequence[int], Sequence[str], Sequence[bool]]
ModelType = TypeVar("ModelType", bound="ModelBase")
Numeric1DArray = Sequence[float]
ResultsType = TypeVar("ResultsType", bound="ResultsBase")
[docs]
class ResultsBase:
"""Base for results classes.
Args:
model (ModelBase): Model on which the results are based.
title (str, optional): Results title. Defaults to "Estimation results".
"""
_default_title = "Estimation results"
def __init__(
self,
model: ModelType,
title: str = None,
):
self.model = model
self.exog_names = model.exog_names.copy()
if not hasattr(self, "pvalues"):
self.pvalues = np.full(len(model.mean), np.nan)
self.title = title
self._conf_int_cached = {}
@property
def title(self) -> str:
return self._title or self._default_title
@title.setter
def title(self, title: str) -> None:
self._title = title
[docs]
def conf_int(
self, alpha: float = 0.05, columns: ColumnsType = None, **kwargs: Any
) -> np.ndarray:
"""Compute the 1-alpha confidence interval.
Args:
alpha (float, optional): The CI will cover the truth with probability
1-alpha. Defaults to 0.05.
columns (ColumnsType, optional): Selected columns. Defaults to None.
Returns:
np.ndarray: (# params, 2) array of confidence intervals.
"""
return self._conf_int(
alpha, self.model.get_indices(columns, self.exog_names), **kwargs
)
def _conf_int(self, alpha: float, indices: np.array) -> np.ndarray:
if not hasattr(self, "marginal_distributions"):
raise AttributeError(
"Results object does not have `marginal_distributions` attribute."
)
return np.array(
[
self.marginal_distributions[i].ppf([alpha / 2, 1 - alpha / 2])
for i in indices
]
)
[docs]
def point_plot(
self,
yname: str = None,
xname: Sequence[str] = None,
title: str = None,
columns: ColumnsType = None,
ax=None,
**kwargs: Any,
):
"""Create a point plot.
Args:
yname (str, optional): Name of the endogenous variable. Defaults to None.
xname (Sequence[str], optional): (# params,) sequence of parameter names.
Defaults to None.
title (str, optional): Plot title. Defaults to None.
columns (ColumnsType, optional): Selected columns. Defaults to None.
ax (AxesSubplot, optional): Axis to write on.
**kwargs (Any): Passed to :meth:`ResultsBase.conf_int`.
Returns:
AxesSubplot: Plot.
"""
if not hasattr(self, "params"):
raise AttributeError("Results object does not have `params` attribute.")
indices = self.model.get_indices(columns, self.exog_names)
params = self.params[indices]
conf_int = self.conf_int(columns=columns, **kwargs)
yticks = np.arange(len(indices), 0, -1)
if ax is None:
_, ax = plt.subplots()
ax.errorbar(
x=params, # type: ignore, pylint: disable=no-member
y=yticks,
xerr=[params - conf_int[:, 0], conf_int[:, 1] - params], # type: ignore, pylint: disable=no-member
fmt="o",
)
ax.set_title(title or self.title)
ax.set_xlabel(yname or self.model.endog_names)
ax.set_yticks(yticks)
ax.set_yticklabels(self.exog_names[indices] if xname is None else xname)
return ax
[docs]
def save(self: ResultsType, filename: str) -> None:
"""Pickle results.
Args:
filename (str): File name.
"""
with open(filename, "wb") as results_file:
pickle.dump(self, results_file)
[docs]
def summary(
self,
yname: str = None,
xname: Sequence[str] = None,
title: str = None,
alpha: float = 0.05,
columns: ColumnsType = None,
**kwargs: Any,
) -> Summary:
"""Create a summary table.
Args:
yname (str, optional): Name of the endogenous variable. Defaults to None.
xname (Sequence[str], optional): Names of the exogenous variables. Defaults
to None.
title (str, optional): Table title. Defaults to None.
alpha (float, optional): Display 1-alpha confidence interval. Defaults to
0.05.
columns (ColumnsType, optional): Selected columns. Defaults to None.
**kwargs (Any): Passed to :meth:`ResultsBase.conf_int`.
Returns:
Summary: Summary table.
"""
indices = self.model.get_indices(columns, self.exog_names)
params_header = self._make_summary_header(alpha)
params_data = self._make_summary_data(alpha, indices, **kwargs)
params_data_str = [[f"{val:.3f}" for val in row] for row in params_data]
if xname is None:
xname = self.exog_names[indices]
smry = Summary()
smry.tables = [
SimpleTable(
params_data_str,
params_header,
list(xname),
title=title or self.title,
),
SimpleTable(
[[yname or self.model.endog_names]],
stubs=["Dep. Variable"],
),
]
return smry
def _make_summary_header(self, alpha: float) -> list[str]:
# make the header for the summary table
# when subclassing ResultsBase, you may wish to overwrite this method
return ["coef", "pvalue", f"[{alpha/2}", f"{1-alpha/2}]"]
def _make_summary_data(
self, alpha: float, indices: np.ndarray, **kwargs: Any
) -> np.ndarray:
# make the data for the summary table
# returns (# params, # columns) array
# columns should correspond to the output of _make_summary_header
if not hasattr(self, "params"):
raise AttributeError("Results object does not have `params` attribute.")
return np.hstack(
(np.array([self.params, self.pvalues]).T[indices], self.conf_int(alpha, indices, **kwargs)) # type: ignore, pylint: disable=no-member
)
[docs]
class ModelBase:
"""Base for model classes.
Args:
mean (Numeric1DArray): (# params,) array of conventionally estimated means.
cov (np.ndarray): (# params, # params) covariance matrix.
X (np.ndarray, optional): (# params, # features) feature matrix. Defaults to
None.
endog_names (str, optional): Name of endogenous variable. Defaults to None.
exog_names (Sequence[str], optional): Names of the exogenous variables. Defaults
to None.
columns (ColumnsType, optional): Columns to use. This can be a sequence of
indices (int), parameter names (str), or a Boolean mask. Defaults to None.
sort (bool, optional): Sort the parameters by the conventionally estimated
mean. Defaults to False.
seed (int, optional): Random seed. Defaults to 0.
Attributes:
n_params (int): Number of estimated parameters.
mean (np.ndarray): (# params,) array of conventionally estimated means.
cov (np.ndarray): (# params, # params) covariance matrix.
X (np.ndarray): (# params, # features) feature matrix.
endog_names (str): Name of the endogenous variable.
exog_names (np.ndarray): Name of exogenous variables.
seed (int): Random seed.
"""
_results_cls = ResultsBase
def __init__(
self,
mean: Numeric1DArray,
cov: np.ndarray,
X: np.ndarray = None,
endog_names: str = None,
exog_names: Sequence[str] = None,
columns: ColumnsType = None,
sort: bool = False,
random_state: int = 0,
):
self.mean = mean
self.n_params = len(self.mean) if columns is None else len(columns)
self.cov = cov * np.identity(self.n_params) if np.isscalar(cov) else cov
self.X = np.ones((len(self.mean), 1)) if X is None else X
self.endog_names = endog_names
if exog_names is None and isinstance(mean, pd.Series):
self.exog_names = np.array(mean.index)
else:
self.exog_names = exog_names
self.random_state = random_state
# select columns
indices = self.get_indices(columns)
self.mean = self.mean[indices]
self.cov = self.cov[indices][:, indices]
self.X = self.X[indices]
if exog_names is not None or isinstance(mean, pd.Series):
self.exog_names = self.exog_names[indices]
# sort columns
if sort:
argsort = (-self.mean).argsort()
self.mean = self.mean[argsort]
self.cov = self.cov[argsort][:, argsort]
self.X = self.X[argsort]
if exog_names is not None or isinstance(mean, pd.Series):
self.exog_names = self.exog_names[argsort]
@property
def mean(self) -> np.ndarray: # pylint: disable=missing-function-docstring
return self._mean
@mean.setter
def mean(
self, mean: Numeric1DArray
) -> None: # pylint: disable=missing-function-docstring
self._mean = np.atleast_1d(mean)
@property
def cov(self) -> np.ndarray:
return self._cov
@cov.setter
def cov(self, cov: np.ndarray) -> None:
self._cov = np.atleast_2d(cov)
@property
def endog_names(self) -> str: # pylint: disable=missing-function-docstring
return "y" if self._endog_names is None else self._endog_names
@endog_names.setter
def endog_names(
self, endog_names: str
) -> None: # pylint: disable=missing-function-docstring
self._endog_names = endog_names
@property
def exog_names(self) -> np.ndarray: # pylint: disable=missing-function-docstring
if self._exog_names is not None:
return self._exog_names
zfill = int(np.log10(max(1, len(self.mean) - 1))) + 1
return np.array([f"x{str(i).zfill(zfill)}" for i in range(len(self.mean))])
@exog_names.setter
def exog_names(
self, exog_names: Optional[Sequence[str]]
) -> None: # pylint: disable=missing-function-docstring
self._exog_names = None if exog_names is None else np.atleast_1d(exog_names)
[docs]
@classmethod
def from_results(
cls: Type[ModelType],
results: LikelihoodModelResults,
**kwargs: Any,
) -> ModelType:
"""Initialize an estimator from conventional regression results.
Args:
results (LikelihoodModelResults): Conventional estimation results.
**kwargs (Any): Passed to the model class constructor.
Returns:
Model: Estimator.
Examples:
.. testcode::
import numpy as np
import pandas as pd
import statsmodels.api as sm
from multiple_inference.base import ModelBase
X = np.repeat(np.identity(3), 100, axis=0)
beta = np.arange(3)
y = X @ beta + np.random.normal(size=300)
ols_results = sm.OLS(y, X).fit()
model = ModelBase.from_results(ols_results)
print(model.mean)
print(model.cov)
.. testoutput::
[0.05980802 1.08201297 1.94076774]
[[0.01007633 0. 0. ]
[0. 0.01007633 0. ]
[0. 0. 0.01007633]]
"""
cov = results.cov_params()
if isinstance(cov, pd.DataFrame):
cov = cov.values
return cls(
results.params,
cov,
endog_names=kwargs.pop("endog_names", results.model.endog_names),
exog_names=kwargs.pop("exog_names", results.model.exog_names),
**kwargs,
)
[docs]
@classmethod
def from_csv(
cls: Type[ModelType],
filename: str,
**kwargs: Any,
) -> ModelType:
"""Instantiate an estimator from csv file.
Args:
filename (str): Name of the csv file.
**kwargs (Any): Passed to the model class constructor.
Returns:
Model: Estimator.
"""
df = pd.read_csv(filename)
mean, cov = df.values[:, 0], df.values[:, 1:] # pylint: disable=no-member
endog_names, exog_names = (
df.columns[0], # pylint: disable=no-member
df.columns[1:], # pylint: disable=no-member
)
return cls(
mean,
cov,
endog_names=kwargs.pop("endog_names", endog_names),
exog_names=kwargs.pop("exog_names", exog_names),
**kwargs,
)
[docs]
def to_csv(self, filename: str) -> None:
"""Write data to a csv.
Args:
filename (str): Name of the file to write to.
"""
pd.DataFrame(
np.hstack((self.mean.reshape(-1, 1), self.cov)),
columns=[self.endog_names] + list(self.exog_names),
).to_csv(filename, index=False)
[docs]
def fit(self, *args: Any, **kwargs: Any) -> ResultsType:
"""Fit the model.
Args:
*args (Any): Passed to the results class constructor.
**kwargs (Any): Passed to the results class constructor.
Returns:
ResultsType: Results.
"""
return self._results_cls(self, *args, **kwargs)
[docs]
def get_index(self, column: ColumnType, names: Sequence[str] = None) -> int:
"""Get the index of a selected column.
Args:
column (ColumnType): Index or name of selected column.
names (Sequence[str], optional): (# params,) sequence of names to select
from.
Returns:
int: Index.
"""
try:
return int(column)
except:
pass
return list(self.exog_names if names is None else names).index(column)
[docs]
def get_indices(
self, columns: ColumnsType = None, names: Sequence[str] = None
) -> np.ndarray:
"""Get indices of the selected columns.
Args:
columns (ColumnsType, optional): Sequence of columns to select. The
sequence can be a (# selected params,) sequence of column names (str) or
indices (int), or a (# params,) boolean mask. Defaults to None.
names (Sequence[str], optional): (# params,) sequence of names to select
from.
Returns:
np.ndarray: (# selected params,) array of indices.
"""
if names is None:
names = self.exog_names
if columns is None:
return np.arange(len(names)).astype(int)
cols = np.atleast_1d(columns)
if cols.dtype == np.dtype("float"):
cols = cols.astype(int)
if cols.dtype in (np.dtype(int), np.dtype("int64")):
# cols is a sequence of indices
return cols
if cols.dtype == np.dtype(bool):
# cols is a boolean mask
return np.where(cols)[0]
# cols are the parameter names (exogenous variables)
sorter = np.argsort(names)
return sorter[np.searchsorted(names, cols, sorter=sorter)]