Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 193 additions & 3 deletions cluster_experiments/power_analysis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import logging
import random
from typing import Dict, Generator, Iterable, List, Optional, Tuple
from typing import (
Callable,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Tuple,
)

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -655,6 +664,20 @@ class NormalPowerAnalysis:
```
"""

VALID_AGG_FUNCS = (
"sum",
"mean",
"median",
"min",
"max",
"count",
"std",
"var",
"nunique",
"first",
"last",
)

def __init__(
self,
splitter: RandomSplitter,
Expand Down Expand Up @@ -775,6 +798,13 @@ def _normal_mde_calculation(

return float(z_alpha + z_beta) * std_error

def _get_time_col(self) -> str:
if self.time_col is None:
raise ValueError(
"Time column not specified. You must provide `time_col` when initializing NormalPowerAnalysis."
)
return self.time_col

def mde_power_line(
self,
df: pd.DataFrame,
Expand Down Expand Up @@ -885,12 +915,13 @@ def run_average_standard_error(
experiment_length: Length of the experiment in days.
"""
n_simulations = self.n_simulations if n_simulations is None else n_simulations
time_col = self._get_time_col()

for n_days in experiment_length:
df_time = df.copy()
experiment_start = df_time[self.time_col].min()
experiment_start = df_time[time_col].min()
df_time = df_time.loc[
df_time[self.time_col] < experiment_start + pd.Timedelta(days=n_days)
df_time[time_col] < experiment_start + pd.Timedelta(days=n_days)
]
std_error_mean = self._get_average_standard_error(
df=df_time,
Expand Down Expand Up @@ -971,6 +1002,165 @@ def mde_time_line(
)
return results

def mde_rolling_time_line(
self,
df: pd.DataFrame,
pre_experiment_df: Optional[pd.DataFrame] = None,
powers: Iterable[float] = (),
experiment_length: Iterable[int] = (),
n_simulations: Optional[int] = None,
alpha: Optional[float] = None,
*,
agg_func: Literal[
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we have to put it as second argument if we don't want to make it optional, but still I think making it optional is better than breaking the order of arguments from mde_time_line where pre_exepriment_df is the second argument. Do you have an opinion on moving this to an argument below or keeping it here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved it and added * before it to make it a “keyword-only argument”. this way is still mandatory but it's aligned with the order of arguments from mde_time_line

"sum",
"mean",
"median",
"min",
"max",
"count",
"std",
"var",
"nunique",
"first",
"last",
],
post_process_func: Optional[Callable[[float], float]] = None,
) -> List[Dict]:
"""
Computes the Minimum Detectable Effect (MDE) for varying experiment lengths
using a sliding time window, with optional element-wise post-processing
on the aggregated metric.

Args:
df: Input DataFrame.
pre_experiment_df: Optional pre-experiment DataFrame.
powers: Iterable of powers for MDE computation (e.g., [0.8, 0.9]).
experiment_length: Iterable of experiment durations in days.
n_simulations: Number of simulations to run (default = self.n_simulations).
alpha: Significance level (default = self.alpha).
agg_func: Aggregation function applied to the metric in each cluster window.
post_process_func: Optional callable applied element-wise to the aggregated metric
(like `Series.apply`). Must take a single scalar as input and return a scalar.

Usage:

```python
import pandas as pd
import numpy as np
from cluster_experiments.random_splitter import ClusteredSplitter
from cluster_experiments.experiment_analysis import ClusteredOLSAnalysis
from cluster_experiments.power_analysis import NormalPowerAnalysis

np.random.seed(42)

# Create a synthetic dataset
n_customers = 10
n_days = 60

df = pd.DataFrame({
"customer_id": np.repeat(np.arange(1, n_customers + 1), n_days),
"date": np.tile(pd.date_range("2024-01-01", periods=n_days), n_customers),
})

p_values = np.concatenate([
np.full(20, 0.1),
np.full(20, 0.2),
np.full(20, 0.3),
])

p = np.tile(p_values, n_customers)
df["conversion"] = np.random.binomial(1, p)

# Define a post-processing function
def flag_positive(x):
return 1 if x > 0 else 0

splitter = ClusteredSplitter(
cluster_cols=['customer_id'],
splitter_weights=[0.5, 0.5],
treatments=['A', 'B'],
)

analysis = ClusteredOLSAnalysis(
cluster_cols=['customer_id'],
target_col='conversion',
)

pw = NormalPowerAnalysis(
splitter=splitter,
analysis=analysis,
target_col="conversion",
treatment="B",
control="A",
n_simulations=100,
time_col="date",
)

results = pw.mde_rolling_time_line(
df=df,
pre_experiment_df=None,
powers=[0.8],
experiment_length=[7, 14, 21, 28, 56],
n_simulations=5,
alpha=0.05,
agg_func="sum",
post_process_func=flag_positive,
)
```
"""
time_col = self._get_time_col()

if agg_func not in self.VALID_AGG_FUNCS:
raise ValueError(
f"Invalid aggregation function `{agg_func}`. "
f"Choose one of: {', '.join(self.VALID_AGG_FUNCS)}."
)

alpha = self.alpha if alpha is None else alpha
n_simulations = self.n_simulations if n_simulations is None else n_simulations
cluster_cols = self.splitter.cluster_cols
results = []

experiment_start = df[time_col].min()

for n_days in experiment_length:
df_time_filter = df[
df[time_col] <= experiment_start + pd.Timedelta(days=n_days)
]

df_grouped = df_time_filter.groupby(cluster_cols, as_index=False)[
self.target_col
].agg(agg_func)

if post_process_func is not None:
df_grouped[self.target_col] = df_grouped[self.target_col].apply(
post_process_func
)

std_error_mean = self._get_average_standard_error(
df=df_grouped,
pre_experiment_df=pre_experiment_df,
n_simulations=n_simulations,
)

for power in powers:
mde_value = self._normal_mde_calculation(
alpha=alpha, std_error=std_error_mean, power=power
)

relative_mde = mde_value / abs(df_grouped[self.target_col].mean())

results.append(
{
"power": power,
"mde": mde_value,
"experiment_length": n_days,
"relative_mde": relative_mde,
}
)

return results

def power_line(
self,
df: pd.DataFrame,
Expand Down
125 changes: 83 additions & 42 deletions docs/normal_power_lines.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies = [
"numpy>=1.20.0",
]
name = "cluster-experiments"
version = "0.27.0"
version = "0.28.0"
description = ""
readme = "README.md"
classifiers=[
Expand Down
38 changes: 38 additions & 0 deletions tests/power_analysis/test_normal_power_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,41 @@ def test_power_time_line(df):
power_df.query("experiment_length == 2")["power"].squeeze()
< power_df.query("experiment_length == 3")["power"].squeeze()
)


def test_mde_rolling_time_line(df):
# given
splitter = ClusteredSplitter(cluster_cols=["cluster"])

analysis = ClusteredOLSAnalysis(cluster_cols=["cluster"])

pw = NormalPowerAnalysis(
splitter=splitter,
analysis=analysis,
n_simulations=100,
seed=20240922,
time_col="date",
)

df_cp = df.copy()
df_cp["date"] = pd.to_datetime(df_cp["date"])

# when
mde_rolling_time_line = pw.mde_rolling_time_line(
df_cp,
powers=[0.8],
experiment_length=[1, 2, 3],
agg_func="sum",
)

mde_df = pd.DataFrame(mde_rolling_time_line)

# then
assert (
mde_df.query("experiment_length == 1")["relative_mde"].squeeze()
> mde_df.query("experiment_length == 2")["relative_mde"].squeeze()
)
assert (
mde_df.query("experiment_length == 2")["relative_mde"].squeeze()
> mde_df.query("experiment_length == 3")["relative_mde"].squeeze()
)