Source code for ax.utils.stats.statstools

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from __future__ import annotations

from logging import Logger

import numpy as np
import numpy.typing as npt
import pandas as pd
from ax.utils.common.logger import get_logger
from ax.utils.stats.math_utils import relativize

logger: Logger = get_logger(__name__)
num_mixed = npt.NDArray | list[float]


[docs] def inverse_variance_weight( means: npt.NDArray, variances: npt.NDArray, conflicting_noiseless: str = "warn", ) -> tuple[float, float]: """Perform inverse variance weighting. Args: means: The means of the observations. variances: The variances of the observations. conflicting_noiseless: How to handle the case of multiple observations with zero variance but different means. Options are "warn" (default), "ignore" or "raise". """ if conflicting_noiseless not in {"warn", "ignore", "raise"}: raise ValueError( f"Unsupported option `{conflicting_noiseless}` for conflicting_noiseless." ) if len(means) != len(variances): raise ValueError("Means and variances must be of the same length.") # new_mean = \sum_i 1/var_i mean_i / \sum_i (1/var_i), unless any var = 0, # in which case we report the mean of all values with var = 0. idx_zero = variances == 0 if idx_zero.any(): means_z = means[idx_zero] if np.var(means_z) > 0: message = "Multiple observations zero variance but different means." if conflicting_noiseless == "warn": logger.warning(message) elif conflicting_noiseless == "raise": raise ValueError(message) return np.mean(means_z), 0 inv_vars = np.divide(1.0, variances) sum_inv_vars = inv_vars.sum() new_mean = np.inner(inv_vars, means) / sum_inv_vars new_var = np.divide(1.0, sum_inv_vars) return new_mean, new_var
[docs] def total_variance( means: npt.NDArray, variances: npt.NDArray, sample_sizes: npt.NDArray, ) -> float: """Compute total variance.""" variances = variances * sample_sizes weighted_variance_of_means = np.average( (means - means.mean()) ** 2, weights=sample_sizes ) weighted_mean_of_variance = np.average(variances, weights=sample_sizes) return (weighted_variance_of_means + weighted_mean_of_variance) / sample_sizes.sum()
[docs] def positive_part_james_stein( means: num_mixed, sems: num_mixed, ) -> tuple[npt.NDArray, npt.NDArray]: """Estimation method for Positive-part James-Stein estimator. This method takes a vector of K means (`y_i`) and standard errors (`sigma_i`) and calculates the positive-part James Stein estimator. Resulting estimates are the shrunk means and standard errors. The positive part James-Stein estimator shrinks each constituent average to the grand average: y_i - phi_i * y_i + phi_i * ybar The variable phi_i determines the amount of shrinkage. For `phi_i = 1`, `mu_hat` is equal to `ybar` (the mean of all `y_i`), while for `phi_i = 0`, `mu_hat` is equal to `y_i`. It can be shown that restricting `phi_i <= 1` dominates the unrestricted estimator, so this method restricts `phi_i` in this manner. The amount of shrinkage, `phi_i`, is determined by: (K - 3) * sigma2_i / s2 That is, less shrinkage is applied when individual means are estimated with greater precision, and more shrinkage is applied when individual means are very tightly clustered together. We also restrict `phi_i` to never be larger than 1. The variance of the mean estimator is: (1 - phi_i) * sigma2_i + phi * sigma2_i / K + 2 * phi_i ** 2 * (y_i - ybar)^2 / (K - 3) The first term is the variance component from `y_i`, the second term is the contribution from the mean of all `y_i`, and the third term is the contribution from the uncertainty in the sum of squared deviations of `y_i` from the mean of all `y_i`. For more information, see https://ax.dev/docs/models.html#empirical-bayes-and-thompson-sampling. Args: means: Means of each arm sems: Standard errors of each arm Returns: mu_hat_i: Empirical Bayes estimate of each arm's mean sem_i: Empirical Bayes estimate of each arm's sem """ if np.min(sems) < 0: raise ValueError("sems cannot be negative.") y_i = np.array(means) K = y_i.shape[0] if K < 4: raise ValueError( "Less than 4 measurements passed to positive_part_james_stein. " + "Returning raw estimates." ) sigma2_i = np.power(sems, 2) ybar = np.mean(y_i) s2 = np.var(y_i - ybar, ddof=3) # sample variance normalized by K-3 phi_i = np.ones_like(sigma2_i) if s2 == 0 else np.minimum(1, sigma2_i / s2) mu_hat_i = y_i + phi_i * np.subtract(ybar, y_i) sigma_hat_i = np.sqrt( np.subtract(1.0, phi_i) * sigma2_i + phi_i * sigma2_i / K + np.multiply(2, phi_i**2) * (y_i - ybar) ** 2 / (K - 3) ) return mu_hat_i, sigma_hat_i
[docs] def agresti_coull_sem( n_numer: pd.Series | npt.NDArray | int, n_denom: pd.Series | npt.NDArray | int, prior_successes: int = 2, prior_failures: int = 2, ) -> npt.NDArray | float: """Compute the Agresti-Coull style standard error for a binomial proportion. Reference: *Agresti, Alan, and Brent A. Coull. Approximate Is Better than 'Exact' for Interval Estimation of Binomial Proportions." The American Statistician, vol. 52, no. 2, 1998, pp. 119-126. JSTOR, www.jstor.org/stable/2685469.* """ n_numer = np.array(n_numer) n_denom = np.array(n_denom) p_for_sem = (n_numer + prior_successes) / ( n_denom + prior_successes + prior_failures ) sem = np.sqrt(p_for_sem * (1 - p_for_sem) / n_denom) return sem
[docs] def marginal_effects( df: pd.DataFrame, covariates: list[str] | None = None ) -> pd.DataFrame: """ This method calculates the relative (in %) change in the outcome achieved by using any individual factor level versus randomizing across all factor levels. It does this by estimating a baseline under the experiment by marginalizing over all factors/levels. For each factor level, then, it conditions on that level for the individual factor and then marginalizes over all levels for all other factors. Args: df: Dataframe containing columns named mean and sem. All other columns are assumed to be factors for which to calculate marginal effects. covariates: List of columns to be used as covariates. If None, then use all columns in df that are not named "mean" or "sem". Returns: A dataframe containing columns "Name", "Level", "Beta" and "SE" corresponding to the factor, level, effect and standard error. Results are relativized as percentage changes. """ covariates = covariates or [col for col in df.columns if col not in ["mean", "sem"]] formatted_vals = [] overall_mean, overall_sem = inverse_variance_weight( df["mean"], np.power(df["sem"], 2), ) for cov in covariates: if len(df[cov].unique()) <= 1: continue df_gb = df.groupby(cov) for name, group_df in df_gb: group_mean, group_var = inverse_variance_weight( group_df["mean"], np.power(group_df["sem"], 2) ) effect, effect_sem = relativize( group_mean, np.sqrt(group_var), overall_mean, overall_sem, cov_means=0.0, as_percent=True, ) formatted_vals.append( {"Name": cov, "Level": name, "Beta": effect, "SE": effect_sem} ) return pd.DataFrame(formatted_vals, columns=["Name", "Level", "Beta", "SE"])