Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions cluster_experiments/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,3 +1676,228 @@ def __warn_small_group_size(self):
warnings.warn(
"Delta Method approximation may not be accurate for small group sizes"
)


class DiDAnalysis(ExperimentAnalysis):
"""
Difference-in-Differences (DiD) analysis using OLS regression.

Arguments:
target_col: name of the outcome column
treatment_col: column identifying treatment/control
treatment: value in treatment_col to consider as treated
time_col: column containing date or time of observation
intervention_date: date when the intervention occurs (used to convert time_col to 0/1)
covariates: optional list of covariates
hypothesis: "two-sided", "less", "greater"
cov_type: type of robust covariance (default = "HC3")

Usage:

```python
import pandas as pd
import numpy as np
from cluster_experiments.experiment_analysis import DiDAnalysis

dates = pd.date_range("2022-01-01", periods=100)
df = pd.DataFrame({
"country": ["IT"]*50 + ["ES"]*50 + ["IT"]*50 + ["ES"]*50,
"time": list(dates[:50])*2 + list(dates[50:])*2,
"cvr": np.concatenate([
np.random.normal(0.10, 0.01, 50),
np.random.normal(0.12, 0.01, 50),
np.random.normal(0.11, 0.01, 50),
np.random.normal(0.16, 0.01, 50),
])
})

DiDAnalysis(
target_col="cvr",
treatment_col="country",
treatment="ES",
time_col="time",
intervention_date="2022-01-15"
).get_pvalue(df)
```
"""

def __init__(
self,
target_col: str = "target",
treatment_col: str = "treatment",
treatment: str = "B",
time_col: str = "time_col",
intervention_date: str = None,
covariates: Optional[List[str]] = None,
hypothesis: str = "two-sided",
cov_type: str = "HC3",
):
if intervention_date is None:
raise ValueError("intervention_date is required")
super().__init__(
cluster_cols=[],
target_col=target_col,
treatment_col=treatment_col,
treatment=treatment,
covariates=covariates,
hypothesis=hypothesis,
)
self.time_col = time_col
self.intervention_date = pd.to_datetime(intervention_date)
self.cov_type = cov_type

def _prepare_time(self, df: pd.DataFrame) -> pd.DataFrame:
"""Convert the time_col into 0/1 indicator based on intervention_date."""
df = df.copy()
df["_did_time"] = (pd.to_datetime(df[self.time_col]) > self.intervention_date).astype(int)
return df

@property
def formula(self) -> str:
base = f"{self.target_col} ~ C({self.treatment_col}) * C(_did_time)"
if self.covariates:
base += " + " + " + ".join(self.covariates)
return base

def fit_did(self, df: pd.DataFrame):
df_prepared = self._prepare_time(df)
return sm.OLS.from_formula(self.formula, data=df_prepared).fit(cov_type=self.cov_type)

def _interaction_term(self, results) -> str:
for name in results.params.index:
if self.treatment_col in name and "_did_time" in name:
return name
raise ValueError("Interaction term not found in regression results")

def analysis_point_estimate(self, df: pd.DataFrame, verbose: bool = False) -> float:
results = self.fit_did(df)
if verbose:
print(results.summary())
term = self._interaction_term(results)
return results.params[term]

def analysis_standard_error(self, df: pd.DataFrame, verbose: bool = False) -> float:
results = self.fit_did(df)
if verbose:
print(results.summary())
term = self._interaction_term(results)
return results.bse[term]

def analysis_pvalue(self, df: pd.DataFrame, verbose: bool = False) -> float:
results = self.fit_did(df)
if verbose:
print(results.summary())
term = self._interaction_term(results)
return self._pvalue_based_on_hypothesis(results, term)

def analysis_confidence_interval(self, df: pd.DataFrame, alpha: float = 0.05, verbose: bool = False):
results = self.fit_did(df)
if verbose:
print(results.summary())
term = self._interaction_term(results)
ci_lower, ci_upper = results.conf_int(alpha=alpha).loc[term]
return ConfidenceInterval(lower=ci_lower, upper=ci_upper, alpha=alpha)

def analysis_inference_results(self, df: pd.DataFrame, alpha: float = 0.05, verbose: bool = False):
results = self.fit_did(df)
if verbose:
print(results.summary())
term = self._interaction_term(results)
ate = results.params[term]
se = results.bse[term]
p_value = self._pvalue_based_on_hypothesis(results, term)
ci_lower, ci_upper = results.conf_int(alpha=alpha).loc[term]

return InferenceResults(
ate=ate,
p_value=p_value,
std_error=se,
conf_int=ConfidenceInterval(lower=ci_lower, upper=ci_upper, alpha=alpha),
)

def _pvalue_based_on_hypothesis(self, model_result, term: str) -> float:
effect = model_result.params[term]
pval = model_result.pvalues[term]

if HypothesisEntries(self.hypothesis) == HypothesisEntries.LESS:
return pval / 2 if effect <= 0 else 1 - pval / 2
if HypothesisEntries(self.hypothesis) == HypothesisEntries.GREATER:
return pval / 2 if effect >= 0 else 1 - pval / 2
return pval

@classmethod
def from_config(cls, config):
"""Creates a DiDAnalysis object from a PowerConfig object"""
return cls(
target_col=config.target_col,
treatment_col=config.treatment_col,
treatment=config.treatment,
time_col=config.time_col,
intervention_date=config.intervention_date,
covariates=config.covariates,
hypothesis=config.hypothesis,
cov_type=getattr(config, "cov_type", "HC3"),
)


class ClusteredDiDAnalysis(DiDAnalysis):
"""
Difference-in-Differences (DiD) analysis with clustered standard errors.

Arguments:
cluster_cols: list of columns to use as clusters
target_col: name of the outcome column
treatment_col: column identifying treatment/control
treatment: value in treatment_col to consider as treated
time_col: column containing date or time of observation
intervention_date: date when the intervention occurs (used to convert time_col to 0/1)
covariates: optional list of covariates
hypothesis: "two-sided", "less", "greater"
"""

def __init__(
self,
cluster_cols: List[str],
target_col: str = "target",
treatment_col: str = "treatment",
treatment: str = "B",
time_col: str = "time_col",
intervention_date: str = None,
covariates: Optional[List[str]] = None,
hypothesis: str = "two-sided",
):
if intervention_date is None:
raise ValueError("intervention_date is required")
super().__init__(
target_col=target_col,
treatment_col=treatment_col,
treatment=treatment,
time_col=time_col,
intervention_date=intervention_date,
covariates=covariates,
hypothesis=hypothesis,
cov_type="cluster",
cluster_cols=cluster_cols,
)

def fit_did(self, df: pd.DataFrame):
"""Returns the fitted DiD model with clustered standard errors"""
df_prepared = self._prepare_time(df)
return sm.OLS.from_formula(self.formula, data=df_prepared).fit(
cov_type="cluster",
cov_kwds={"groups": self._get_cluster_column(df_prepared)},
)

@classmethod
def from_config(cls, config):
"""Creates a ClusteredDiDAnalysis object from a PowerConfig object"""
return cls(
cluster_cols=config.cluster_cols,
target_col=config.target_col,
treatment_col=config.treatment_col,
treatment=config.treatment,
time_col=config.time_col,
intervention_date=config.intervention_date,
covariates=config.covariates,
hypothesis=config.hypothesis,
)
5 changes: 5 additions & 0 deletions cluster_experiments/power_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
OLSAnalysis,
PairedTTestClusteredAnalysis,
TTestClusteredAnalysis,
DiDAnalysis,
ClusteredDiDAnalysis,
)
from cluster_experiments.perturbator import (
BetaRelativePerturbator,
Expand Down Expand Up @@ -287,6 +289,9 @@ def _raise_error_if_present(self, attr, other_attr):
"paired_ttest_clustered": PairedTTestClusteredAnalysis,
"mlm": MLMExperimentAnalysis,
"delta": DeltaMethodAnalysis,
"did": DiDAnalysis,
"clustered_did": ClusteredDiDAnalysis,
"did_clustered": ClusteredDiDAnalysis,
}

cupac_model_mapping = {"": EmptyRegressor, "mean_cupac_model": TargetAggregation}
Loading