"""
Utilities for plotting effects learned by Contextualized models.
"""
from typing import *
import numpy as np
import matplotlib.pyplot as plt
from contextualized.easy.wrappers import SKLearnWrapper
def simple_plot(
x_vals: List[Union[float, int]],
y_vals: List[Union[float, int]],
**kwargs,
) -> None:
"""
Simple plotting of y vs x with kwargs passed to matplotlib helpers.
Args:
x_vals: x values to plot
y_vals: y values to plot
**kwargs: kwargs passed to matplotlib helpers (fill_alpha, fill_color, y_lowers, y_uppers, x_label, y_label, x_ticks, x_ticklabels, y_ticks, y_ticklabels)
Returns:
None
"""
plt.figure(figsize=kwargs.get("figsize", (8, 8)))
if "y_lowers" in kwargs and "y_uppers" in kwargs:
plt.fill_between(
x_vals,
np.squeeze(kwargs["y_lowers"]),
np.squeeze(kwargs["y_uppers"]),
alpha=kwargs.get("fill_alpha", 0.2),
color=kwargs.get("fill_color", "blue"),
)
plt.plot(x_vals, y_vals)
plt.xlabel(kwargs.get("x_label", "X"))
plt.ylabel(kwargs.get("y_label", "Y"))
if (
kwargs.get("x_ticks", None) is not None
and kwargs.get("x_ticklabels", None) is not None
):
plt.xticks(kwargs["x_ticks"], kwargs["x_ticklabels"])
if (
kwargs.get("y_ticks", None) is not None
and kwargs.get("y_ticklabels", None) is not None
):
plt.yticks(kwargs["y_ticks"], kwargs["y_ticklabels"])
plt.show()
def plot_effect(x_vals, y_means, y_lowers=None, y_uppers=None, **kwargs):
"""Plots a single effect."""
min_val = np.min(y_means)
y_means -= min_val
if y_lowers is not None and y_uppers is not None:
y_lowers -= min_val
y_uppers -= min_val
if kwargs.get("should_exponentiate", False):
y_means = np.exp(y_means)
if y_lowers is not None and y_uppers is not None:
y_lowers = np.exp(y_lowers)
y_uppers = np.exp(y_uppers)
try:
if "x_encoder" in kwargs and kwargs["x_encoder"] is not None:
x_classes = kwargs["x_encoder"].classes_
# Line up class values with centered values.
x_ticks = np.array(list(range(len(x_classes))))
if (
kwargs.get("x_means", None) is not None
and kwargs.get("x_stds", None) is not None
):
x_ticks = (x_ticks - kwargs["x_means"]) / kwargs["x_stds"]
else:
x_ticks = None
x_classes = None
except:
x_classes = None
x_ticks = None
if np.max(y_means) > kwargs.get("min_effect_size", 0.0):
simple_plot(
x_vals,
y_means,
x_label=kwargs.get("xlabel", "X"),
y_label=kwargs.get("ylabel", "Odds Ratio of Outcome"),
y_lowers=y_lowers,
y_uppers=y_uppers,
x_ticks=x_ticks,
x_ticklabels=x_classes,
)
def get_homogeneous_context_effects(
model: SKLearnWrapper, C: np.ndarray, **kwargs
) -> Tuple[np.ndarray, np.ndarray]:
"""
Get the homogeneous (context-invariant) effects of context.
Args:
model (SKLearnWrapper): a fitted ``contextualized.easy`` model
C: the context values to use to estimate the effects
verbose (bool, optional): print progess. Default True.
individual_preds (bool, optional): whether to use plot each bootstrap. Default True.
C_vis (np.ndarray, optional): Context bins used to visualize context (n_vis, n_contexts). Default None to construct anew.
n_vis (int, optional): Number of bins to use to visualize context. Default 1000.
Returns:
Tuple[np.ndarray, np.ndarray]:
c_vis: the context values that were used to estimate the effects
effects: array of effects, one for each context. Each homogeneous effect is a matrix of shape:
(n_bootstraps, n_context_vals, n_outcomes).
"""
if kwargs.get("verbose", True):
print("Estimating Homogeneous Contextual Effects.")
c_vis = maybe_make_c_vis(C, **kwargs)
effects = []
for j in range(C.shape[1]):
c_j = np.zeros_like(c_vis)
c_j[:, j] = c_vis[:, j]
try:
(_, mus) = model.predict_params(
c_j, individual_preds=kwargs.get("individual_preds", True)
)
except ValueError:
(_, mus) = model.predict_params(c_j)
effects.append(mus)
return c_vis, np.array(effects)
def get_homogeneous_predictor_effects(model, C, **kwargs):
"""
Get the homogeneous (context-invariant) effects of predictors.
:param model:
:param C:
returns:
c_vis: the context values that were used to estimate the effects
effects: np array of effects, one for each predictor. Each homogeneous effect is a matrix of shape:
(n_bootstraps, n_outcomes).
"""
if kwargs.get("verbose", True):
print("Estimating Homogeneous Predictor Effects.")
c_vis = maybe_make_c_vis(C, **kwargs)
c_idx = 0
try:
(betas, _) = model.predict_params(
c_vis, individual_preds=kwargs.get("individual_preds", True)
)
# bootstraps x C_vis x outcomes x predictors
if len(betas.shape) == 4:
c_idx = 1
except ValueError:
(betas, _) = model.predict_params(c_vis)
betas = np.mean(
betas, axis=c_idx
) # homogeneous predictor effect is context-invariant
return c_vis, np.transpose(betas, (2, 0, 1))
def get_heterogeneous_predictor_effects(model, C, **kwargs):
"""
Get the heterogeneous (context-variant) effects of predictors.
:param model:
:param C:
returns:
c_vis: the context values that were used to estimate the effects
effects: np array of effects, one for each context x predictor pair.
Each heterogeneous effect is a matrix of shape:
(n_predictors, n_bootstraps, n_context_vals, n_outcomes).
"""
if kwargs.get("verbose", True):
print("Estimating Heterogeneous Predictor Effects.")
c_vis = maybe_make_c_vis(C, **kwargs)
effects = []
for j in range(C.shape[1]):
c_j = np.zeros_like(c_vis)
c_j[:, j] = c_vis[:, j]
c_idx = 0
try:
(betas, _) = model.predict_params(
c_j, individual_preds=kwargs.get("individual_preds", True)
)
# bootstraps x C_vis x outcomes x predictors
if len(betas.shape) == 4:
c_idx = 1
except ValueError:
(betas, _) = model.predict_params(c_j)
# Heterogeneous Effects are mean-centered wrt C
effect = np.transpose(
np.transpose(betas, (0, 2, 3, 1))
- np.tile(
np.expand_dims(np.mean(betas, axis=c_idx), -1),
(1, 1, 1, betas.shape[1]),
),
(0, 3, 1, 2),
)
effects.append(effect)
effects = np.array(effects)
if len(effects.shape) == 5:
effects = np.transpose(
effects, (0, 4, 1, 2, 3)
) # (n_contexts, n_predictors, n_bootstraps, n_context_vals, n_outcomes)
else:
effects = np.transpose(
effects, (0, 3, 1, 2)
) # (n_contexts, n_predictors, n_context_vals, n_outcomes)
return c_vis, effects
def plot_boolean_vars(names, y_mean, y_err, **kwargs):
"""
Plots Boolean variables.
"""
plt.figure(figsize=kwargs.get("figsize", (12, 8)))
sorted_i = np.argsort(y_mean)
if kwargs.get("classification", True):
y_mean = np.exp(y_mean)
y_err = np.exp(y_err)
for counter, i in enumerate(sorted_i):
plt.bar(
counter,
y_mean[i],
width=0.5,
color=kwargs.get("fill_color", "blue"),
edgecolor=kwargs.get("edge_color", "black"),
yerr=y_err,
)
plt.xticks(
range(len(names)),
np.array(names)[sorted_i],
rotation=60,
fontsize=kwargs.get("boolean_x_ticksize", 18),
ha="right",
)
plt.ylabel(
kwargs.get("ylabel", "Odds Ratio of Outcome"),
fontsize=kwargs.get("ylabel_fontsize", 32),
)
plt.yticks(fontsize=kwargs.get("ytick_fontsize", 18))
if kwargs.get("bool_figname", None) is not None:
plt.savefig(kwargs.get("bool_figname"), dpi=300, bbox_inches="tight")
else:
plt.show()
[docs]def plot_homogeneous_context_effects(
model: SKLearnWrapper,
C: np.ndarray,
**kwargs,
) -> None:
"""
Plot the direct effect of context on outcomes, disregarding other features.
This context effect is homogeneous in that it is a static function of context (context-invariant).
Args:
model (SKLearnWrapper): a fitted ``contextualized.easy`` model
C: the context values to use to estimate the effects
verbose (bool, optional): print progess. Default True.
individual_preds (bool, optional): whether to use plot each bootstrap. Default True.
C_vis (np.ndarray, optional): Context bins used to visualize context (n_vis, n_contexts). Default None to construct anew.
n_vis (int, optional): Number of bins to use to visualize context. Default 1000.
lower_pct (int, optional): Lower percentile for bootstraps. Default 2.5.
upper_pct (int, optional): Upper percentile for bootstraps. Default 97.5.
classification (bool, optional): Whether to exponentiate the effects. Default True.
C_encoders (List[sklearn.preprocessing.LabelEncoder], optional): encoders for each context. Default None.
C_means (np.ndarray, optional): means for each context. Default None.
C_stds (np.ndarray, optional): standard deviations for each context. Default None.
xlabel_prefix (str, optional): prefix for x label. Default "".
figname (str, optional): name of figure to save. Default None.
Returns:
None
"""
c_vis, effects = get_homogeneous_context_effects(model, C, **kwargs)
# effects.shape is (n_context, n_bootstraps, n_context_vals, n_outcomes)
for outcome in range(effects.shape[-1]):
for j in range(effects.shape[0]):
try:
mus = effects[j, :, :, outcome]
means = np.mean(mus, axis=0)
lowers = np.percentile(mus, kwargs.get("lower_pct", 2.5), axis=0)
uppers = np.percentile(mus, kwargs.get("upper_pct", 97.5), axis=0)
except ValueError:
mus = effects[j, :, outcome]
means = mus # no bootstraps were provided.
lowers, uppers = None, None
if "C_encoders" in kwargs:
encoder = kwargs["C_encoders"][j]
else:
encoder = None
if "C_means" in kwargs:
c_means = kwargs["C_means"][j]
else:
c_means = None
if "C_stds" in kwargs:
c_stds = kwargs["C_stds"][j]
else:
c_stds = None
plot_effect(
c_vis[:, j],
means,
lowers,
uppers,
should_exponentiate=kwargs.get("classification", True),
x_encoder=encoder,
x_means=c_means,
x_stds=c_stds,
xlabel=C.columns.tolist()[j],
**kwargs,
)
[docs]def plot_homogeneous_predictor_effects(
model: SKLearnWrapper,
C: np.ndarray,
X: np.ndarray,
**kwargs,
) -> None:
"""
Plot the effect of predictors on outcomes that do not change with context (homogeneous).
Args:
model (SKLearnWrapper): a fitted ``contextualized.easy`` model
C: the context values to use to estimate the effects
X: the predictor values to use to estimate the effects
max_classes_for_discrete (int, optional): maximum number of classes to treat as discrete. Default 10.
min_effect_size (float, optional): minimum effect size to plot. Default 0.003.
ylabel (str, optional): y label for plot. Default "Influence of ".
xlabel_prefix (str, optional): prefix for x label. Default "".
X_names (List[str], optional): names of predictors. Default None.
X_encoders (List[sklearn.preprocessing.LabelEncoder], optional): encoders for each predictor. Default None.
X_means (np.ndarray, optional): means for each predictor. Default None.
X_stds (np.ndarray, optional): standard deviations for each predictor. Default None.
verbose (bool, optional): print progess. Default True.
lower_pct (int, optional): Lower percentile for bootstraps. Default 2.5.
upper_pct (int, optional): Upper percentile for bootstraps. Default 97.5.
classification (bool, optional): Whether to exponentiate the effects. Default True.
figname (str, optional): name of figure to save. Default None.
Returns:
None
"""
c_vis = np.zeros_like(C.values)
x_vis = make_grid_mat(X.values, 1000)
(betas, _) = model.predict_params(
c_vis, individual_preds=True
) # bootstraps x C_vis x outcomes x predictors
homogeneous_betas = np.mean(betas, axis=1) # bootstraps x outcomes x predictors
for outcome in range(homogeneous_betas.shape[1]):
betas = homogeneous_betas[:, outcome, :] # bootstraps x predictors
my_avg_betas = np.mean(betas, axis=0)
lowers = np.percentile(betas, kwargs.get("lower_pct", 2.5), axis=0)
uppers = np.percentile(betas, kwargs.get("upper_pct", 97.5), axis=0)
max_impacts = []
# Calculate the max impact of each effect.
for k in range(my_avg_betas.shape[0]):
effect_range = my_avg_betas[k] * np.ptp(x_vis[:, k])
max_impacts.append(effect_range)
effects_by_desc_impact = np.argsort(max_impacts)[::-1]
boolean_vars = [j for j in range(X.shape[-1]) if len(set(X.values[:, j])) == 2]
if len(boolean_vars) > 0:
plot_boolean_vars(
[X.columns[j] for j in boolean_vars],
[max_impacts[j] for j in boolean_vars],
[np.max(uppers[j]) - max_impacts[j] for j in boolean_vars],
**kwargs,
)
for j in effects_by_desc_impact:
if j in boolean_vars:
continue
means = my_avg_betas[j] * x_vis[:, j]
my_lowers = lowers[j] * x_vis[:, j]
my_uppers = uppers[j] * x_vis[:, j]
if "X_encoders" in kwargs:
encoder = kwargs["X_encoders"][j]
else:
encoder = None
if "X_means" in kwargs:
x_means = kwargs["X_means"][j]
else:
x_means = None
if "X_stds" in kwargs:
x_stds = kwargs["X_stds"][j]
else:
x_stds = None
plot_effect(
x_vis[:, j],
means,
my_lowers,
my_uppers,
should_exponentiate=kwargs.get("classification", True),
x_encoder=encoder,
x_means=x_means,
x_stds=x_stds,
xlabel=f"{kwargs.get('xlabel_prefix', '')} {X.columns[j]}",
**kwargs,
)
[docs]def plot_heterogeneous_predictor_effects(model, C, X, **kwargs):
"""
Plot how the effect of predictors on outcomes changes with context (heterogeneous).
Args:
model (SKLearnWrapper): a fitted ``contextualized.easy`` model
C: the context values to use to estimate the effects
X: the predictor values to use to estimate the effects
max_classes_for_discrete (int, optional): maximum number of classes to treat as discrete. Default 10.
min_effect_size (float, optional): minimum effect size to plot. Default 0.003.
y_prefix (str, optional): y prefix for plot. Default "Influence of ".
X_names (List[str], optional): names of predictors. Default None.
verbose (bool, optional): print progess. Default True.
individual_preds (bool, optional): whether to use plot each bootstrap. Default True.
C_vis (np.ndarray, optional): Context bins used to visualize context (n_vis, n_contexts). Default None to construct anew.
n_vis (int, optional): Number of bins to use to visualize context. Default 1000.
lower_pct (int, optional): Lower percentile for bootstraps. Default 2.5.
upper_pct (int, optional): Upper percentile for bootstraps. Default 97.5.
classification (bool, optional): Whether to exponentiate the effects. Default True.
C_encoders (List[sklearn.preprocessing.LabelEncoder], optional): encoders for each context. Default None.
C_means (np.ndarray, optional): means for each context. Default None.
C_stds (np.ndarray, optional): standard deviations for each context. Default None.
xlabel_prefix (str, optional): prefix for x label. Default "".
figname (str, optional): name of figure to save. Default None.
Returns:
None
"""
c_vis = maybe_make_c_vis(C, **kwargs)
n_vis = c_vis.shape[0]
# c_names = C.columns.tolist()
for j in range(C.shape[1]):
c_j = c_vis.copy()
c_j[:, :j] = 0.0
c_j[:, j + 1 :] = 0.0
(models, _) = model.predict_params(
c_j, individual_preds=True
) # n_bootstraps x n_vis x outcomes x predictors
homogeneous_effects = np.mean(
models, axis=1
) # n_bootstraps x outcomes x predictors
heterogeneous_effects = models.copy()
for i in range(n_vis):
heterogeneous_effects[:, i] -= homogeneous_effects
# n_bootstraps x n_vis x outcomes x predictors
for outcome in range(heterogeneous_effects.shape[2]):
my_effects = heterogeneous_effects[
:, :, outcome, :
] # n_bootstraps x n_vis x predictors
means = np.mean(my_effects, axis=0) # n_vis x predictors
my_lowers = np.percentile(my_effects, kwargs.get("lower_pct", 2.5), axis=0)
my_uppers = np.percentile(my_effects, kwargs.get("upper_pct", 97.5), axis=0)
x_ticks = None
x_ticklabels = None
try:
x_classes = kwargs["encoders"][j].classes_
if len(x_classes) <= kwargs.get("max_classes_for_discrete", 10):
x_ticks = np.array(list(range(len(x_classes))))
if "c_means" in kwargs:
x_ticks -= kwargs["c_means"][j]
if "c_stds" in kwargs:
x_ticks /= kwargs["c_stds"][j]
x_ticklabels = x_classes
except KeyError:
pass
for k in range(my_effects.shape[-1]):
if np.max(heterogeneous_effects[:, k]) > kwargs.get(
"min_effect_size", 0.0
):
simple_plot(
c_vis[:, j],
means[:, k],
x_label=C.columns[j],
y_label=f"{kwargs.get('y_prefix', 'Influence of')} {X.columns[k]}",
y_lowers=my_lowers[:, k],
y_uppers=my_uppers[:, k],
x_ticks=x_ticks,
x_ticklabels=x_ticklabels,
**kwargs,
)
def make_grid_mat(observation_mat, n_vis):
"""
:param observation_mat: defines the domain for each feature.
:param n_vis:
returns a matrix of n_vis x n_features that can be used to visualize the effects of the features.
"""
ar_vis = np.zeros((n_vis, observation_mat.shape[1]))
for j in range(observation_mat.shape[1]):
ar_vis[:, j] = np.linspace(
np.min(observation_mat[:, j]), np.max(observation_mat[:, j]), n_vis
)
return ar_vis
def make_c_vis(C, n_vis):
"""
:param C:
:param n_vis:
returns a matrix of n_vis x n_contexts that can be used to visualize the effects of the context variables.
"""
if isinstance(C, np.ndarray):
return make_grid_mat(C, n_vis)
return make_grid_mat(C.values, n_vis)
def maybe_make_c_vis(C, **kwargs):
"""
:param C:
:param n_vis:
returns a matrix of n_vis x n_contexts that can be used to visualize the effects of the context variables.
if C_vis is supplied, then we use that instead.
"""
if kwargs.get("C_vis", None) is None:
if kwargs.get("verbose", True):
print(
"""Generating datapoints for visualization by assuming the encoder is
an additive model and thus doesn't require sampling on a manifold.
If the encoder has interactions, please supply C_vis so that we
can visualize these effects on the correct data manifold."""
)
return make_c_vis(C, kwargs.get("n_vis", 1000))
return kwargs["C_vis"]