From 4c6d618a09a35d4e66b661c4cef78dc933aa22cc Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 07:39:05 -0700 Subject: [PATCH 01/32] Add configs and parser for scalar adjustments --- .../configs/model_inputs/__init__.py | 81 +++++++++++++- .../model_inputs/scalar_adjustments.yaml | 105 ++++++++++++++++++ ...arch_forecasting_revenue_per_ad_click.yaml | 31 ++++++ 3 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml create mode 100644 jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index 1ebd482e..6bc03e6e 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -1,7 +1,10 @@ import attr -from typing import List, Optional, Union +from datetime import datetime +from dotmap import DotMap from pathlib import Path +from typing import Dict, List, Optional, Union +import pandas as pd from kpi_forecasting.inputs import YAML @@ -9,9 +12,11 @@ PARENT_PATH = Path(__file__).parent HOLIDAY_PATH = PARENT_PATH / "holidays.yaml" REGRESSOR_PATH = PARENT_PATH / "regressors.yaml" +SCALAR_PATH = PARENT_PATH / "scalar_adjustments.yaml" holiday_collection = YAML(HOLIDAY_PATH) regressor_collection = YAML(REGRESSOR_PATH) +scalar_adjustments = YAML(SCALAR_PATH) @attr.s(auto_attribs=True, frozen=False) @@ -38,3 +43,77 @@ class ProphetHoliday: ds: List lower_window: int upper_window: int + + +@attr.s(auto_attribs=True, frozen=False) +class ScalarAdjustments: + """ + Holds the names and dates where a scalar adjustment should be applied. + """ + + name: str + forecast_start_date: datetime + adjustments_dataframe: pd.DataFrame + + @classmethod + def from_dotmap(cls, name: str, adjustment_dotmap: DotMap): + + adj_list = [] + forecast_start_date = datetime.strptime( + adjustment_dotmap.forecast_start_date, "%Y-%m-%d" + ) + for segment_dat in adjustment_dotmap.segments: + segment = {**segment_dat.segment} + segment_adjustment_dat = [ + {**segment, **adj} for adj in segment_dat.adjustments + ] + adj_list.append(pd.DataFrame(segment_adjustment_dat)) + adj_df = pd.concat(adj_list, ignore_index=True) + + return cls(name, forecast_start_date, adj_df) + + +def parse_scalar_adjustments( + metric_hub_slug: str, forecast_start_date: datetime +) -> List[ScalarAdjustments]: + """ + Parses the scalar_adjustments to find the applicable scalar adjustments for a given metric hub slug + and forecast start date. + + Args: + metric_hub_slug (str): The metric hub slug being forecasted. It must be present by name in the + scalar_adjustments.yaml. + forecast_start_date (str): The first date being forecasted. Used here to map to the correct scalar + adjustments as the adjustments will be updated over time. + + Returns: + List[ScalarAdjustments]: A list of ScalarAdjustments, where each ScalarAdjustments is a named scalar adjustment with the + dates that the adjustment should be applied for each segment being modeled. + """ + metric_adjustments = getattr(scalar_adjustments.data, metric_hub_slug) + if not metric_adjustments: + raise KeyError(f"No adjustments found for {metric_hub_slug} in {SCALAR_PATH}.") + + # Creates a list of ScalarAdjustments objects that apply for this metric and forecast_start_date + applicable_adjustments = [] + for named_adjustment in metric_adjustments: + parsed_named_adjustments = [ + ScalarAdjustments.from_dotmap(named_adjustment.name, adj_dotmap) + for adj_dotmap in named_adjustment.adjustments + ] + + # Sort list of parsed adjustments by forecast_start_date + sorted_parsed_named_adjustments = sorted( + parsed_named_adjustments, key=lambda d: d.forecast_start_date + ) + + # Iterate over the sorted list to end with any + matched_adjustment = None + for parsed_adjustment in sorted_parsed_named_adjustments: + if forecast_start_date >= parsed_adjustment.forecast_start_date: + matched_adjustment = parsed_adjustment + + applicable_adjustments.append(matched_adjustment) + + # Parse the list of + return applicable_adjustments diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml new file mode 100644 index 00000000..6905fda4 --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml @@ -0,0 +1,105 @@ +--- +search_forecasting_revenue_per_ad_click: + - name: "year_over_year_growth" + description: "Estimate of YoY growth in RPC, from input from stakeholders." + adjustments: + - forecast_start_date: "2024-01-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.03 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - forecast_start_date: "2024-04-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - forecast_start_date: "2024-05-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.03 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.04 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml new file mode 100644 index 00000000..f4101dd9 --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml @@ -0,0 +1,31 @@ +--- +metric_hub: + app_name: "multi_product" + slug: "search_forecasting_revenue_per_ad_click" + alias: "search_forecasting_revenue_per_ad_click" + start_date: "2020-01-01" + end_date: "last complete month" + segments: + device: "device" + channel: "'all'" + country: "CASE WHEN country = 'US' THEN 'US' ELSE 'ROW' END" + partner: "partner" + where: "partner = 'Google'" + +forecast_model: + model_type: "scalar" + start_date: NULL + end_date: NULL + use_holidays: False + parameters: + adjustment_type: "YOY" + +summarize: + periods: ["year"] + numpy_aggregations: ["mean"] + +write_results: + project: "moz-fx-data-shared-prod" + dataset: "revenue_derived" + table: "search_revenue_forecasts_v1" + components_table: "search_revenue_model_components_v1" From 67cb2040e17587918d22f40519098281c8484f56 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 07:42:31 -0700 Subject: [PATCH 02/32] Remove comment --- .../kpi_forecasting/configs/model_inputs/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index 6bc03e6e..1eff37a1 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -115,5 +115,4 @@ def parse_scalar_adjustments( applicable_adjustments.append(matched_adjustment) - # Parse the list of return applicable_adjustments From f00ec824ba89321a973878d797f4ec723f448ab0 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 07:46:19 -0700 Subject: [PATCH 03/32] ScalarAdjustments docstring update --- .../kpi_forecasting/configs/model_inputs/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index 1eff37a1..25601241 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -49,6 +49,15 @@ class ProphetHoliday: class ScalarAdjustments: """ Holds the names and dates where a scalar adjustment should be applied. + + Args: + name (str): The name of the adjustment from the scalar_adjustments.yaml file. + forecast_start_date (datetime): The first forecast_start_date where this iteration of the + adjustment should be applied. This adjustment will apply to any subsequent forecast + until another update of this adjustment is made. + adjustments_dataframe (DataFrame): A DataFrame that contains the dimensions of the segments + being forecasted as columns, as well as the start dates and values for each scalar + adjustment. """ name: str @@ -107,7 +116,8 @@ def parse_scalar_adjustments( parsed_named_adjustments, key=lambda d: d.forecast_start_date ) - # Iterate over the sorted list to end with any + # Iterate over the sorted list to find any adjustments that apply after the supplied forecast_start_date. + ## Returns `None` if no applicable value is found matched_adjustment = None for parsed_adjustment in sorted_parsed_named_adjustments: if forecast_start_date >= parsed_adjustment.forecast_start_date: From ae4d1f3f8297c5ed1d7b63bb2dd2db45387ccf1e Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 11:28:31 -0700 Subject: [PATCH 04/32] Update to use dataclass and remove classmethod --- .../configs/model_inputs/__init__.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index 25601241..04c9ffcb 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -1,4 +1,5 @@ import attr +from dataclasses import dataclass from datetime import datetime from dotmap import DotMap from pathlib import Path @@ -45,7 +46,7 @@ class ProphetHoliday: upper_window: int -@attr.s(auto_attribs=True, frozen=False) +@dataclass class ScalarAdjustments: """ Holds the names and dates where a scalar adjustment should be applied. @@ -61,14 +62,11 @@ class ScalarAdjustments: """ name: str - forecast_start_date: datetime - adjustments_dataframe: pd.DataFrame - @classmethod - def from_dotmap(cls, name: str, adjustment_dotmap: DotMap): + def __post_init__(self, adjustment_dotmap: DotMap): adj_list = [] - forecast_start_date = datetime.strptime( + self.forecast_start_date = datetime.strptime( adjustment_dotmap.forecast_start_date, "%Y-%m-%d" ) for segment_dat in adjustment_dotmap.segments: @@ -77,9 +75,7 @@ def from_dotmap(cls, name: str, adjustment_dotmap: DotMap): {**segment, **adj} for adj in segment_dat.adjustments ] adj_list.append(pd.DataFrame(segment_adjustment_dat)) - adj_df = pd.concat(adj_list, ignore_index=True) - - return cls(name, forecast_start_date, adj_df) + self.adjustments_dataframe = pd.concat(adj_list, ignore_index=True) def parse_scalar_adjustments( @@ -107,7 +103,7 @@ def parse_scalar_adjustments( applicable_adjustments = [] for named_adjustment in metric_adjustments: parsed_named_adjustments = [ - ScalarAdjustments.from_dotmap(named_adjustment.name, adj_dotmap) + ScalarAdjustments(named_adjustment.name, adj_dotmap) for adj_dotmap in named_adjustment.adjustments ] From 843c8f28864401e993356f608cd978b27634df66 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 13:45:56 -0700 Subject: [PATCH 05/32] Add ScalarForecast class --- ...arch_forecasting_revenue_per_ad_click.yaml | 6 +- .../kpi_forecasting/models/scalar_forecast.py | 315 ++++++++++++++++++ 2 files changed, 318 insertions(+), 3 deletions(-) create mode 100644 jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml index f4101dd9..2935c288 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml @@ -18,11 +18,11 @@ forecast_model: end_date: NULL use_holidays: False parameters: - adjustment_type: "YOY" + formula: "search_forecasting_revenue_per_ad_click:YOY * scalar" summarize: - periods: ["year"] - numpy_aggregations: ["mean"] + requires_summarization: False + periods: ["month"] write_results: project: "moz-fx-data-shared-prod" diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py new file mode 100644 index 00000000..06ebceb8 --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -0,0 +1,315 @@ +from dataclasses import dataclass, field +from datetime import datetime, timedelta +import itertools +import json +import re +from typing import Dict, List, Union + +from google.cloud import bigquery +from google.cloud.bigquery.enums import SqlTypeNames as bq_types +import numpy as np +import pandas as pd +from pandas.api import types as pd_types +import prophet +from prophet.diagnostics import cross_validation + +from kpi_forecasting.configs.model_inputs import parse_scalar_adjustments +from kpi_forecasting.models.base_forecast import BaseForecast +from kpi_forecasting import pandas_extras as pdx + + +@dataclass +class SegmentScalarSettings: + """ + Holds the configuration and results for each segment + in a scalar forecasting model. + + Args: + segment (Dict[str, str]): Dictionary holding the segment dimensions and values. + + """ + + segment: Dict[str, str] + scalars: List[Dict[str, int | float]] + + # Hold forecast results + forecast_df: pd.DataFrame = None + + +@dataclass +class ScalarForecast(BaseForecast): + """ + ScalarForecast class for generating and managing forecast models where forecasts are + scalar adjustments of historical data or preceding Prophet-based forecasts. The + class handles cases where forecasts for a combination of dimensions are required for + a metric. + + Inherits from BaseForecast and provides methods for initializing forecast + parameters, building models, generating forecasts, summarizing results, + and writing results to BigQuery. + """ + + def __post_init__(self) -> None: + """ + Post-initialization method to set up necessary attributes and configurations. + + This method sets up the dates to predict, constructs segment combinations, + initializes models for each segment, and prepares attributes for storing results. + """ + super().__post_init__() + + # Get the list of adjustments for the metric slug being forecasted. That + ## slug must be a key in scalar_adjustments.yaml; otherwise, this will raise a KeyError + self.scalar_adjustments = parse_scalar_adjustments( + self.metric_hub.slug, self.start_date + ) + + # Construct a DataFrame containing all combination of segment values + ## in the observed_df + self.combination_df = self.observed_df[ + self.metric_hub.segments.keys() + ].drop_duplicates() + + # Set up the columns to be used to join the observed_df to the forecast_df in subsequent + ## methods + self.join_columns = self.combination_df.columns.to_list() + ["submission_date"] + + # Rename the value column to the metric slug name, to enable supporting a formula with + ## covariates in the future + self.observed_df.rename(columns={"value": self.metric_hub.slug}, inplace=True) + + # Cross join to the dates_to_predict DataFrame to create a DataFrame that contains a row + ## for each forecast date for each segment + self.forecast_df = self.dates_to_predict.merge(self.combination_df, how="cross") + + @property + def period_names_map(self) -> Dict[str, pd.DateOffset]: + """ + Map a period-over-period name to an offset to apply to DataFrame date columns. + + Returns: + Dict[str, str]: Mapping of column names. + """ + return {"YOY": pd.DateOffset(years=1), "MOM": pd.DateOffset(months=1)} + + def _parse_formula_for_over_period_changes(self) -> Dict | None: + """ + Find period-over-period metric specifications in provided formula. If present, create a dict that + maps a metric name to a period-over-period change. + """ + + # Pattern to match to the words before and after a colon. This will be the standard pattern + ## in a formula to denote that a period-over-period change will be applied to a metric + ## for a forecast. + pattern = r"\b(\w+:\w+)\b" + match = re.findall(pattern, self.parameters.formula) + + if match: + # Create dict from list of colon-separated strings (e.g. "metric_name:YOY"). + pop_dict = dict(pair.split(":") for pair in match) + return pop_dict + + return None + + def _add_scalar_columns(self) -> None: + """ + Adds the scalars to make metric adjustments to the dates specified in the self.scalar_adjustments + DataFrames. + """ + + for scalar_adjustment in self.scalar_adjustments: + adj_df = scalar_adjustment.adjustments_dataframe.rename( + columns={"value": f"scalar_{scalar_adjustment.name}"} + ) + + # Merge asof to align values based on start dates and dimensions + self.forecast_df = pd.merge_asof( + self.forecast_df.sort_values("submission_date"), + adj_df.sort_values("start_date"), + by=[self.combination_df.columns], + left_on="submission_date", + right_on="start_date", + direction="backward", + ) + + # Fill values with submission_date before start_date with np.nan, then replace NaN with + ## 1 to not apply any scalar for dates that don't apply or for segments without that + ## scalar + self.forecast_df[f"scalar_{scalar_adjustment.name}"] = np.where( + self.forecast_df["submission_date"] < self.forecast_df["start_date"], + np.nan, + self.forecast_df[f"scalar_{scalar_adjustment.name}"], + ) + + # Drop the start_date column that isn't needed for forecasting and can be reused for multiple + ## metrics + self.forecast_df.drop(columns=["start_date"], inplace=True) + + def _fit(self) -> None: + + # Create period-over-period dict, which defines how observed data is carried forward in cases + ## where the forecast is a scalar * previously observed data + pop_dict = self._parse_formula_for_over_period_changes() + if pop_dict: + for metric, period in pop_dict: + metric_pop_name = f"{metric}_{period}" + + # Create date column in the forecast_df with the specified date offset + ## in order to merge in observed data from that period + offset = self.period_names_map[period] + self.forecast_df[f"{metric_pop_name}_date"] = pd.to_datetime( + self.forecast_df["submission_date"] - offset + ) + + # Merge observed data to be used in adjustments + self.forecast_df.merge( + self.observed_df[[*self.join_columns, metric]], + how="left", + left_on=f"{metric_pop_name}_date", + right_on="submission_date", + inplace=True, + ) + + # Remove unneeded date column + self.forecast_df.drop(columns=[f"{metric_pop_name}_date"], inplace=True) + + # Update the forecast_df with scalar columns + self._add_scalar_columns() + + def _predict(self) -> None: + + # Create final scalar as product of individual scalar effects + self.forecast_df["scalar"] = self.forecast_df[ + [c for c in self.forecast_df.columns if "scalar_" in c] + ].prod(axis=1) + + # Calculate forecast as product of scalar value and observed value + self.forecast_df["value"] = ( + self.forecast_df["scalar"] * self.forecast_df[self.metric_hub.slug] + ) + + # Record each scalar value in a dictionary to record in model records + self.forecast_df["forecast_parameters"] = self.forecast_df[ + [c for c in self.forecast_df.columns if "scalar" in c] + ].to_dict(orient="records") + + def _summarize(self, period: str) -> pd.DataFrame: + """ + In cases where no summarization is required, adds the expected columns to a summary DataFrame. + + Args: + period (str): Aggregation period that should be consistent with the aggregation period of + the observed data. + """ + if isinstance(period, list): + if len(period) > 1: + raise ValueError( + "Can only supply one aggregation period when not summarizing results." + ) + period = period[0] + + df = self.forecast_df.copy() + df["source"] = np.where( + df["submission_date"] < self.start_date, + "historical", + "forecast", + ) + df["measure"] = np.where( + df["submission_date"] < self.start_date, + "observed", + "forecast", + ) + + df["aggregation_period"] = period + # add Metric Hub metadata columns + df["metric_alias"] = self.metric_hub.alias.lower() + df["metric_hub_app_name"] = self.metric_hub.app_name.lower() + df["metric_hub_slug"] = self.metric_hub.slug.lower() + df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date) + df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date) + df["metric_collected_at"] = self.collected_at + + # add forecast model metadata columns + df["forecast_start_date"] = self.start_date + df["forecast_end_date"] = self.end_date + df["forecast_trained_at"] = self.trained_at + df["forecast_predicted_at"] = self.predicted_at + + def summarize( + self, + requires_summarization: bool = True, + periods: List[str] | str = ["day", "month"], + numpy_aggregations: List[str] = ["mean"], + percentiles: List[int] = [10, 50, 90], + ) -> None: + """ + There are cases where forecasts created by this class do not require summarization (e.g. the + scalar adjustment was made to a prior forecast) + """ + if not requires_summarization: + self._summarize(periods) + + else: + # If summarization is required, use the summarization method in the BaseForecast class + self.summary_df = pd.concat( + [self._summarize(i, numpy_aggregations, percentiles) for i in periods] + ) + + def write_results( + self, + project: str, + dataset: str, + table: str, + write_disposition: str = "WRITE_APPEND", + components_table: str = "", + components_dataset: str = "", + ) -> None: + """ + Write `self.summary_df` to Big Query. + + Args: + project (str): The Big Query project that the data should be written to. + dataset (str): The Big Query dataset that the data should be written to. + table (str): The Big Query table that the data should be written to. + write_disposition (str, optional): In the event that the destination table exists, + should the table be overwritten ("WRITE_TRUNCATE") or appended to ("WRITE_APPEND")? Defaults to "WRITE_APPEND". + components_table (str, optional): The Big Query table for model components. Defaults to "". + components_dataset (str, optional): The Big Query dataset for model components. Defaults to "". + """ + print( + f"Writing results to `{project}.{dataset}.{table}`.", + flush=True, + ) + client = bigquery.Client(project=project) + schema = [ + bigquery.SchemaField("submission_date", bq_types.DATE), + *[ + bigquery.SchemaField(k, bq_types.STRING) + for k in self.metric_hub.segments.keys() + ], + bigquery.SchemaField("aggregation_period", bq_types.STRING), + bigquery.SchemaField("source", bq_types.STRING), + bigquery.SchemaField("value", bq_types.FLOAT), + bigquery.SchemaField("metric_alias", bq_types.STRING), + bigquery.SchemaField("metric_hub_app_name", bq_types.STRING), + bigquery.SchemaField("metric_hub_slug", bq_types.STRING), + bigquery.SchemaField("metric_start_date", bq_types.DATE), + bigquery.SchemaField("metric_end_date", bq_types.DATE), + bigquery.SchemaField("metric_collected_at", bq_types.TIMESTAMP), + bigquery.SchemaField("forecast_start_date", bq_types.DATE), + bigquery.SchemaField("forecast_end_date", bq_types.DATE), + bigquery.SchemaField("forecast_trained_at", bq_types.TIMESTAMP), + bigquery.SchemaField("forecast_predicted_at", bq_types.TIMESTAMP), + bigquery.SchemaField("forecast_parameters", bq_types.STRING), + ] + job = client.load_table_from_dataframe( + dataframe=self.summary_df, + destination=f"{project}.{dataset}.{table}", + job_config=bigquery.LoadJobConfig( + schema=schema, + autodetect=False, + write_disposition=write_disposition, + ), + ) + # Wait for the job to complete. + job.result() From eeb23088c9ea43f0b7b6d070bff4e4281ddb25f0 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 13:47:20 -0700 Subject: [PATCH 06/32] Remove unneeded class --- .../kpi_forecasting/models/scalar_forecast.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 06ebceb8..b4bad329 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -18,24 +18,6 @@ from kpi_forecasting import pandas_extras as pdx -@dataclass -class SegmentScalarSettings: - """ - Holds the configuration and results for each segment - in a scalar forecasting model. - - Args: - segment (Dict[str, str]): Dictionary holding the segment dimensions and values. - - """ - - segment: Dict[str, str] - scalars: List[Dict[str, int | float]] - - # Hold forecast results - forecast_df: pd.DataFrame = None - - @dataclass class ScalarForecast(BaseForecast): """ From b7b079d5413d844d016f397e1b4cbe26fe2f183b Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 13:47:52 -0700 Subject: [PATCH 07/32] Remove unused imports --- .../kpi_forecasting/models/scalar_forecast.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index b4bad329..271f283b 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -1,17 +1,11 @@ -from dataclasses import dataclass, field -from datetime import datetime, timedelta -import itertools -import json +from dataclasses import dataclass import re -from typing import Dict, List, Union +from typing import Dict, List from google.cloud import bigquery from google.cloud.bigquery.enums import SqlTypeNames as bq_types import numpy as np import pandas as pd -from pandas.api import types as pd_types -import prophet -from prophet.diagnostics import cross_validation from kpi_forecasting.configs.model_inputs import parse_scalar_adjustments from kpi_forecasting.models.base_forecast import BaseForecast From cfd7b81c30cc68f820559e53361234b2041787af Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 13:50:48 -0700 Subject: [PATCH 08/32] Fillna for scalar columns with 1 --- .../kpi-forecasting/kpi_forecasting/models/scalar_forecast.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 271f283b..86c027d4 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -117,6 +117,10 @@ def _add_scalar_columns(self) -> None: self.forecast_df[f"scalar_{scalar_adjustment.name}"], ) + # Fill scalar column with 1. Scalars are always multiplicative, so this removes the scalar effect + ## for dates/segments where it shouldn't apply + self.forecast_df[f"scalar_{scalar_adjustment.name}"].fillna(1, inplace=True) + # Drop the start_date column that isn't needed for forecasting and can be reused for multiple ## metrics self.forecast_df.drop(columns=["start_date"], inplace=True) From da13c853f067751c97f28d7c72e8d9ec9be9b062 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 14:01:05 -0700 Subject: [PATCH 09/32] Fix error, caps for module-level variables --- .../kpi_forecasting/configs/model_inputs/__init__.py | 10 +++++----- .../kpi_forecasting/models/funnel_forecast.py | 11 +++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index 04c9ffcb..d6f67dde 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -15,9 +15,9 @@ REGRESSOR_PATH = PARENT_PATH / "regressors.yaml" SCALAR_PATH = PARENT_PATH / "scalar_adjustments.yaml" -holiday_collection = YAML(HOLIDAY_PATH) -regressor_collection = YAML(REGRESSOR_PATH) -scalar_adjustments = YAML(SCALAR_PATH) +HOLIDAY_COLLECTION = YAML(HOLIDAY_PATH) +REGRESSOR_COLLECTION = YAML(REGRESSOR_PATH) +SCALAR_ADJUSTMENTS = YAML(SCALAR_PATH) @attr.s(auto_attribs=True, frozen=False) @@ -95,7 +95,7 @@ def parse_scalar_adjustments( List[ScalarAdjustments]: A list of ScalarAdjustments, where each ScalarAdjustments is a named scalar adjustment with the dates that the adjustment should be applied for each segment being modeled. """ - metric_adjustments = getattr(scalar_adjustments.data, metric_hub_slug) + metric_adjustments = getattr(SCALAR_ADJUSTMENTS.data, metric_hub_slug) if not metric_adjustments: raise KeyError(f"No adjustments found for {metric_hub_slug} in {SCALAR_PATH}.") @@ -119,6 +119,6 @@ def parse_scalar_adjustments( if forecast_start_date >= parsed_adjustment.forecast_start_date: matched_adjustment = parsed_adjustment - applicable_adjustments.append(matched_adjustment) + applicable_adjustments.append(matched_adjustment) return applicable_adjustments diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py index c5d4a980..0efa56aa 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py @@ -15,8 +15,8 @@ from kpi_forecasting.configs.model_inputs import ( ProphetHoliday, ProphetRegressor, - holiday_collection, - regressor_collection, + HOLIDAY_COLLECTION, + REGRESSOR_COLLECTION, ) from kpi_forecasting.models.base_forecast import BaseForecast from kpi_forecasting import pandas_extras as pdx @@ -99,12 +99,12 @@ def __post_init__(self) -> None: if model_params["holidays"]: holiday_list = [ - getattr(holiday_collection.data, h) + getattr(HOLIDAY_COLLECTION.data, h) for h in model_params["holidays"] ] if model_params["regressors"]: regressor_list = [ - getattr(regressor_collection.data, r) + getattr(REGRESSOR_COLLECTION.data, r) for r in model_params["regressors"] ] @@ -531,8 +531,7 @@ def _summarize( # forecast is generated in the middle of the month. .add(overlap[["value"]].values) # calculate summary values, aggregating by submission_date, - .agg(aggregations, axis=1) - .reset_index() + .agg(aggregations, axis=1).reset_index() ).rename(columns=self._percentile_name_map(percentiles)) # add datasource-specific metadata columns From 479704287b0bef233822ccb66ae79ed97347c26d Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 11 Jul 2024 14:10:08 -0700 Subject: [PATCH 10/32] Fix Ruff errors --- .../kpi_forecasting/configs/model_inputs/__init__.py | 3 +-- jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py | 3 ++- jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py | 3 --- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index d6f67dde..a0b9941c 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -3,7 +3,7 @@ from datetime import datetime from dotmap import DotMap from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import List, Optional, Union import pandas as pd @@ -64,7 +64,6 @@ class ScalarAdjustments: name: str def __post_init__(self, adjustment_dotmap: DotMap): - adj_list = [] self.forecast_start_date = datetime.strptime( adjustment_dotmap.forecast_start_date, "%Y-%m-%d" diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py index 0efa56aa..f41ac68a 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py @@ -531,7 +531,8 @@ def _summarize( # forecast is generated in the middle of the month. .add(overlap[["value"]].values) # calculate summary values, aggregating by submission_date, - .agg(aggregations, axis=1).reset_index() + .agg(aggregations, axis=1) + .reset_index() ).rename(columns=self._percentile_name_map(percentiles)) # add datasource-specific metadata columns diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 86c027d4..c6a4e056 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -9,7 +9,6 @@ from kpi_forecasting.configs.model_inputs import parse_scalar_adjustments from kpi_forecasting.models.base_forecast import BaseForecast -from kpi_forecasting import pandas_extras as pdx @dataclass @@ -126,7 +125,6 @@ def _add_scalar_columns(self) -> None: self.forecast_df.drop(columns=["start_date"], inplace=True) def _fit(self) -> None: - # Create period-over-period dict, which defines how observed data is carried forward in cases ## where the forecast is a scalar * previously observed data pop_dict = self._parse_formula_for_over_period_changes() @@ -157,7 +155,6 @@ def _fit(self) -> None: self._add_scalar_columns() def _predict(self) -> None: - # Create final scalar as product of individual scalar effects self.forecast_df["scalar"] = self.forecast_df[ [c for c in self.forecast_df.columns if "scalar_" in c] From ae6d4a69183113f4be77836e624adf5208822b78 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Mon, 15 Jul 2024 10:15:48 -0700 Subject: [PATCH 11/32] Update post_init to remove arguments --- .../configs/model_inputs/__init__.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index a0b9941c..c271d5f3 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -62,13 +62,14 @@ class ScalarAdjustments: """ name: str + adjustment_dotmap: DotMap - def __post_init__(self, adjustment_dotmap: DotMap): + def __post_init__(self): adj_list = [] self.forecast_start_date = datetime.strptime( - adjustment_dotmap.forecast_start_date, "%Y-%m-%d" + self.adjustment_dotmap.forecast_start_date, "%Y-%m-%d" ) - for segment_dat in adjustment_dotmap.segments: + for segment_dat in self.adjustment_dotmap.segments: segment = {**segment_dat.segment} segment_adjustment_dat = [ {**segment, **adj} for adj in segment_dat.adjustments @@ -81,7 +82,7 @@ def parse_scalar_adjustments( metric_hub_slug: str, forecast_start_date: datetime ) -> List[ScalarAdjustments]: """ - Parses the scalar_adjustments to find the applicable scalar adjustments for a given metric hub slug + Parses the SCALAR_ADJUSTMENTS to find the applicable scalar adjustments for a given metric hub slug and forecast start date. Args: @@ -117,7 +118,10 @@ def parse_scalar_adjustments( for parsed_adjustment in sorted_parsed_named_adjustments: if forecast_start_date >= parsed_adjustment.forecast_start_date: matched_adjustment = parsed_adjustment - + else: + break + + if matched_adjustment: applicable_adjustments.append(matched_adjustment) return applicable_adjustments From 5d8797d35c7b0977402722a55d1f4aa1c0fbb067 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Mon, 15 Jul 2024 10:16:09 -0700 Subject: [PATCH 12/32] Reference alias instead of slug --- .../kpi_forecasting/models/scalar_forecast.py | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index c6a4e056..0a79c208 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -51,7 +51,7 @@ def __post_init__(self) -> None: # Rename the value column to the metric slug name, to enable supporting a formula with ## covariates in the future - self.observed_df.rename(columns={"value": self.metric_hub.slug}, inplace=True) + self.observed_df.rename(columns={"value": self.metric_hub.alias}, inplace=True) # Cross join to the dates_to_predict DataFrame to create a DataFrame that contains a row ## for each forecast date for each segment @@ -151,6 +151,33 @@ def _fit(self) -> None: # Remove unneeded date column self.forecast_df.drop(columns=[f"{metric_pop_name}_date"], inplace=True) + # For cases where period-over-period change isn't defined, copy over the observed_df values into + ## the forecast_df. Check for values in the forecast period and raise an error if it's filled with + ## nan. + else: + self.forecast_df.merge( + self.observed_df[[*self.join_columns, metric]], + how="left", + on="submission_date", + inplace=True, + ) + # The forecast data should have no nan values + if ( + self.forecast_df.loc[ + self.forecast_df["submission_date"].isin( + pd.date_range( + pd.to_datetime(self.start_date), + pd.to_datetime(self.end_date), + ) + ), + self.metric_hub.alias, + ] + .isnull() + .sum() + > 0 + ): + raise ValueError("Found nan values in forecast values.") + # Update the forecast_df with scalar columns self._add_scalar_columns() @@ -162,7 +189,7 @@ def _predict(self) -> None: # Calculate forecast as product of scalar value and observed value self.forecast_df["value"] = ( - self.forecast_df["scalar"] * self.forecast_df[self.metric_hub.slug] + self.forecast_df["scalar"] * self.forecast_df[self.metric_hub.alias] ) # Record each scalar value in a dictionary to record in model records From 9971b46ab977fb1a5f35ac48a0e6931f1b90de78 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Tue, 16 Jul 2024 08:34:33 -0700 Subject: [PATCH 13/32] Add base data pull class and class to pull forecast data --- jobs/kpi-forecasting/kpi_forecasting.py | 13 ++- .../kpi_forecasting/metric_hub.py | 107 +++++++++++++++++- 2 files changed, 110 insertions(+), 10 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting.py b/jobs/kpi-forecasting/kpi_forecasting.py index e7dcca7c..b11de24b 100644 --- a/jobs/kpi-forecasting/kpi_forecasting.py +++ b/jobs/kpi-forecasting/kpi_forecasting.py @@ -2,7 +2,7 @@ from kpi_forecasting.models.prophet_forecast import ProphetForecast from kpi_forecasting.models.funnel_forecast import FunnelForecast from kpi_forecasting.metric_hub import MetricHub - +from kpi_forecasting.metric_hub import ForecastDataPull # A dictionary of available models in the `models` directory. MODELS = { @@ -16,10 +16,15 @@ def main() -> None: config = YAML(filepath=CLI().args.config).data model_type = config.forecast_model.model_type - if model_type in MODELS: - metric_hub = MetricHub(**config.metric_hub) - model = MODELS[model_type](metric_hub=metric_hub, **config.forecast_model) + if hasattr(config, "metric_hub"): + data_puller = MetricHub(**config.metric_hub) + elif hasattr(config, "forecast_data_pull"): + data_puller = ForecastDataPull(**config.forecast_data_pull) + else: + raise KeyError("No metric_hub or forecast_data_pull key in config to pull data.") + if model_type in MODELS: + model = MODELS[model_type](data_puller=data_puller, **config.forecast_model) model.fit() model.predict() model.summarize(**config.summarize) diff --git a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py index 64cf9d42..ae1b0b2b 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py +++ b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py @@ -1,6 +1,7 @@ import pandas as pd from dataclasses import dataclass +from datetime import date, timedelta from dotmap import DotMap from google.cloud import bigquery from mozanalysis.config import ConfigLoader @@ -10,14 +11,15 @@ @dataclass -class MetricHub: +class BaseDataPull: """ - Programatically get Metric Hub metrics from Big Query. - See https://mozilla.github.io/metric-hub/metrics/ for a list of metrics. + A base class to pull data from BigQuery. For use with the forecast classes in this + module, a `fetch` method must be implemented. Args: - app_name (str): The Metric Hub app name for the metric. - slug (str): The Metric Hub slug for the metric. + app_name (str): The app name that applies to the metric being retrieved. + slug (str): A slug for the metric, intended to mimic the nomenclature used for + metrics on Metric Hub. start_date (str): A 'YYYY-MM-DD' formatted-string that specifies the first date the metric should be queried. segments (Dict): A dictionary of segments to use to group metric values. @@ -31,8 +33,13 @@ class MetricHub: 'daily_active_users'. project (str): The Big Query project to use when establishing a connection to the Big Query client. + forecast_project (str): BigQuery project where forecast table to be accessed is + located. + forecast_dataset (str): For pulling forecast data, the dataset where the forecast + data is stored in BigQuery. + forecast_table (str): The table name where data is stored in BigQuery for pulling + past forecast data. """ - app_name: str slug: str start_date: str @@ -41,6 +48,19 @@ class MetricHub: end_date: str = None alias: str = None project: str = "mozdata" + forecast_project: str = None + forecast_dataset: str = None + forecast_table: str = None + + def fetch(self) -> pd.DataFrame: + raise NotImplementedError + +@dataclass +class MetricHub(BaseDataPull): + """ + Programatically get Metric Hub metrics from Big Query. + See https://mozilla.github.io/metric-hub/metrics/ for a list of metrics. + """ def __post_init__(self) -> None: self.start_date = pd.to_datetime(self.start_date).date() @@ -112,3 +132,78 @@ def fetch(self) -> pd.DataFrame: self.max_date = str(df[self.submission_date_column].max()) return df + + +@dataclass +class ForecastDataPull(BaseDataPull): + """ + Programatically get metrics from Big Query forecast data tables. The tables + must follow the schema patterns found in the forecast tables produced by the + `write_results` methods of the model classes in this module. + """ + + def __post_init__(self) -> None: + self.start_date = pd.to_datetime(self.start_date).date() + + if self.end_date: + self.end_date = pd.to_datetime(parse_end_date(self.end_date)).date() + else: + # Default forecast horizon is 18 months. End date here is extended to 36 months, + ## to cover all current usecases + self.end_date = pd.to_datetime(date.today() + timedelta(days = 365 * 3)).date() + + self.alias = self.alias or (self.slug + "_adjusted") + + # Default submission_date column name is "submission_date". This could be altered to accept + ## an input, but there is no current need + self.submission_date_column = "submission_date" + + self.from_expression = f"{self.project}.{self.forecast_dataset}.{self.forecast_table}" + + # Add query snippets for segments + self.segment_select_query = "" + + if self.segments: + segment_select_query = [] + segments = dict(self.segments) + for alias, sql in segments.items(): + segment_select_query.append(f" {sql} AS {alias},") + self.segment_select_query = "," + "\n ".join( + segment_select_query + ) + + self.where = f"AND {self.where}" if self.where else "" + + def query(self) -> str: + """Build a string to query the relevant metric values from Big Query.""" + return dedent( + f""" + SELECT {self.submission_date_column} AS submission_date, + value + {self.segment_select_query} + FROM {self.from_expression} + WHERE {self.submission_date_column} BETWEEN '{self.start_date}' AND '{self.end_date}' + AND metric_slug = '{self.slug}' + {self.where} + """ + ) + + def fetch(self) -> pd.DataFrame: + """Fetch the relevant metric values from Big Query.""" + print( + f"\nQuerying for '{self.app_name}.{self.slug}' aliased as '{self.alias}':" + f"\n{self.query()}" + ) + df = bigquery.Client(project=self.project).query(self.query()).to_dataframe() + + # ensure submission_date has type 'date' + df[self.submission_date_column] = pd.to_datetime( + df[self.submission_date_column] + ).dt.date + + # Track the min and max dates in the data, which may differ from the + # start/end dates + self.min_date = str(df[self.submission_date_column].min()) + self.max_date = str(df[self.submission_date_column].max()) + + return df \ No newline at end of file From 6d3fb98d95ed04b2810f1e306bcce269ed1390c6 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Tue, 16 Jul 2024 11:24:49 -0700 Subject: [PATCH 14/32] Update link to metric hub github.io --- jobs/kpi-forecasting/kpi_forecasting/metric_hub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py index ae1b0b2b..97f0d4cd 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py +++ b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py @@ -59,7 +59,7 @@ def fetch(self) -> pd.DataFrame: class MetricHub(BaseDataPull): """ Programatically get Metric Hub metrics from Big Query. - See https://mozilla.github.io/metric-hub/metrics/ for a list of metrics. + See https://mozilla.github.io/metric-hub/metrics/fenix/ for a list of metrics. """ def __post_init__(self) -> None: From b64c21fe418540ff0ee3309a4e748a7a26d8577d Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Tue, 16 Jul 2024 14:40:19 -0700 Subject: [PATCH 15/32] Keep metric_hub attribute in BaseForecast --- jobs/kpi-forecasting/kpi_forecasting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting.py b/jobs/kpi-forecasting/kpi_forecasting.py index b11de24b..05a178b7 100644 --- a/jobs/kpi-forecasting/kpi_forecasting.py +++ b/jobs/kpi-forecasting/kpi_forecasting.py @@ -24,7 +24,7 @@ def main() -> None: raise KeyError("No metric_hub or forecast_data_pull key in config to pull data.") if model_type in MODELS: - model = MODELS[model_type](data_puller=data_puller, **config.forecast_model) + model = MODELS[model_type](metric_hub=data_puller, **config.forecast_model) model.fit() model.predict() model.summarize(**config.summarize) From 53ab92ba06515680b1b54acea7b139f9fae26f47 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 10:11:37 -0700 Subject: [PATCH 16/32] Add scalar model configs and updates --- ...rch_forecasting_ad_clicks_adjustments.yaml | 33 +++++++++++++++++++ ...arch_forecasting_revenue_per_ad_click.yaml | 5 ++- 2 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustments.yaml diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustments.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustments.yaml new file mode 100644 index 00000000..e1e140bc --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustments.yaml @@ -0,0 +1,33 @@ +--- +forecast_data_pull: + app_name: "multi_product" + slug: "search_forecasting_ad_clicks" + alias: "search_forecasting_ad_clicks_adjusted" + start_date: "2020-01-01" + forecast_start_date: "2024-05-01" + forecast_project: "mozdata" + forecast_dataset: "revenue_cat3_analysis" + forecast_table: "search_revenue_forecast_stage" + segments: + device: "device" + channel: "channel" + country: "country" + partner: "partner" + where: "aggregation_period = 'day'" + +forecast_model: + model_type: "scalar" + start_date: NULL + end_date: NULL + use_holidays: False + parameters: + formula: "search_forecasting_ad_clicks * scalar" + +summarize: + requires_summarization: True + periods: ["day", "month", "year"] + +write_results: + project: "moz-fx-data-shared-prod" + dataset: "revenue_derived" + table: "search_revenue_forecasts_v1" diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml index 2935c288..a8978b4b 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml @@ -9,8 +9,8 @@ metric_hub: device: "device" channel: "'all'" country: "CASE WHEN country = 'US' THEN 'US' ELSE 'ROW' END" - partner: "partner" - where: "partner = 'Google'" + partner: "partner_name" + where: "partner_name = 'Google'" forecast_model: model_type: "scalar" @@ -28,4 +28,3 @@ write_results: project: "moz-fx-data-shared-prod" dataset: "revenue_derived" table: "search_revenue_forecasts_v1" - components_table: "search_revenue_model_components_v1" From 963dd0af097b7d9e4311b06b155ec83c01d3d4b9 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 10:12:05 -0700 Subject: [PATCH 17/32] Metric hub query logic updates --- jobs/kpi-forecasting/kpi_forecasting.py | 2 + .../kpi_forecasting/metric_hub.py | 60 +++++++++++++------ 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting.py b/jobs/kpi-forecasting/kpi_forecasting.py index 05a178b7..b16bd929 100644 --- a/jobs/kpi-forecasting/kpi_forecasting.py +++ b/jobs/kpi-forecasting/kpi_forecasting.py @@ -1,6 +1,7 @@ from kpi_forecasting.inputs import CLI, YAML from kpi_forecasting.models.prophet_forecast import ProphetForecast from kpi_forecasting.models.funnel_forecast import FunnelForecast +from kpi_forecasting.models.scalar_forecast import ScalarForecast from kpi_forecasting.metric_hub import MetricHub from kpi_forecasting.metric_hub import ForecastDataPull @@ -8,6 +9,7 @@ MODELS = { "prophet": ProphetForecast, "funnel": FunnelForecast, + "scalar": ScalarForecast } diff --git a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py index 97f0d4cd..a2381602 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py +++ b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py @@ -33,6 +33,8 @@ class BaseDataPull: 'daily_active_users'. project (str): The Big Query project to use when establishing a connection to the Big Query client. + forecast_start_date (str): The forecast_start_date to use as the key to pull + forecast data. forecast_project (str): BigQuery project where forecast table to be accessed is located. forecast_dataset (str): For pulling forecast data, the dataset where the forecast @@ -40,6 +42,7 @@ class BaseDataPull: forecast_table (str): The table name where data is stored in BigQuery for pulling past forecast data. """ + app_name: str slug: str start_date: str @@ -48,6 +51,7 @@ class BaseDataPull: end_date: str = None alias: str = None project: str = "mozdata" + forecast_start_date: str = None forecast_project: str = None forecast_dataset: str = None forecast_table: str = None @@ -55,6 +59,7 @@ class BaseDataPull: def fetch(self) -> pd.DataFrame: raise NotImplementedError + @dataclass class MetricHub(BaseDataPull): """ @@ -122,14 +127,12 @@ def fetch(self) -> pd.DataFrame: df = bigquery.Client(project=self.project).query(self.query()).to_dataframe() # ensure submission_date has type 'date' - df[self.submission_date_column] = pd.to_datetime( - df[self.submission_date_column] - ).dt.date + df["submission_date"] = pd.to_datetime(df["submission_date"]).dt.date # Track the min and max dates in the data, which may differ from the # start/end dates - self.min_date = str(df[self.submission_date_column].min()) - self.max_date = str(df[self.submission_date_column].max()) + self.min_date = str(df["submission_date"].min()) + self.max_date = str(df["submission_date"].max()) return df @@ -150,7 +153,9 @@ def __post_init__(self) -> None: else: # Default forecast horizon is 18 months. End date here is extended to 36 months, ## to cover all current usecases - self.end_date = pd.to_datetime(date.today() + timedelta(days = 365 * 3)).date() + self.end_date = pd.to_datetime( + date.today() + timedelta(days=365 * 3) + ).date() self.alias = self.alias or (self.slug + "_adjusted") @@ -158,10 +163,13 @@ def __post_init__(self) -> None: ## an input, but there is no current need self.submission_date_column = "submission_date" - self.from_expression = f"{self.project}.{self.forecast_dataset}.{self.forecast_table}" + self.from_expression = ( + f"{self.project}.{self.forecast_dataset}.{self.forecast_table}" + ) # Add query snippets for segments self.segment_select_query = "" + self.segment_groupby_query = "" if self.segments: segment_select_query = [] @@ -171,39 +179,57 @@ def __post_init__(self) -> None: self.segment_select_query = "," + "\n ".join( segment_select_query ) + self.segment_groupby_query = "," + "\n ,".join( + self.segments.keys() + ) self.where = f"AND {self.where}" if self.where else "" + # Check if forecast_start_date was supplied. If not, create strting to grab the most recent forecast. + if not self.forecast_start_date: + self.forecast_start_date_snippet = f"""( + SELECT + MAX(forecast_start_date) + FROM {self.from_expression} + WHERE metric_slug = '{self.slug}')""" + else: + self.forecast_start_date_snippet = f"'{self.forecast_start_date}'" + def query(self) -> str: """Build a string to query the relevant metric values from Big Query.""" return dedent( f""" - SELECT {self.submission_date_column} AS submission_date, - value + WITH cte AS ( + SELECT + {self.submission_date_column} AS submission_date, + forecast_start_date, + ANY_VALUE(value HAVING MAX forecast_trained_at) AS value {self.segment_select_query} FROM {self.from_expression} WHERE {self.submission_date_column} BETWEEN '{self.start_date}' AND '{self.end_date}' - AND metric_slug = '{self.slug}' + AND metric_alias = '{self.slug}' AND forecast_start_date = {self.forecast_start_date_snippet} {self.where} + GROUP BY {self.submission_date_column}, forecast_start_date + {self.segment_groupby_query} + ) + SELECT * EXCEPT (forecast_start_date) FROM cte """ ) def fetch(self) -> pd.DataFrame: """Fetch the relevant metric values from Big Query.""" print( - f"\nQuerying for '{self.app_name}.{self.slug}' aliased as '{self.alias}':" + f"\nQuerying for the '{self.app_name}.{self.slug}' forecast':" f"\n{self.query()}" ) df = bigquery.Client(project=self.project).query(self.query()).to_dataframe() # ensure submission_date has type 'date' - df[self.submission_date_column] = pd.to_datetime( - df[self.submission_date_column] - ).dt.date + df["submission_date"] = pd.to_datetime(df["submission_date"]).dt.date # Track the min and max dates in the data, which may differ from the # start/end dates - self.min_date = str(df[self.submission_date_column].min()) - self.max_date = str(df[self.submission_date_column].max()) + self.min_date = str(df["submission_date"].min()) + self.max_date = str(df["submission_date"].max()) - return df \ No newline at end of file + return df From c169af2bb9cf31dc1c6a3568b825d8ee8a2febea Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 10:12:24 -0700 Subject: [PATCH 18/32] Remove unneeded break and add comment --- .../kpi_forecasting/configs/model_inputs/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index c271d5f3..e57c0ef2 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -75,6 +75,9 @@ def __post_init__(self): {**segment, **adj} for adj in segment_dat.adjustments ] adj_list.append(pd.DataFrame(segment_adjustment_dat)) + + # Create a DataFrame with each dimension in the segments, the start date of + ## each scalar adjustment, and the value of that adjustment self.adjustments_dataframe = pd.concat(adj_list, ignore_index=True) @@ -118,9 +121,7 @@ def parse_scalar_adjustments( for parsed_adjustment in sorted_parsed_named_adjustments: if forecast_start_date >= parsed_adjustment.forecast_start_date: matched_adjustment = parsed_adjustment - else: - break - + if matched_adjustment: applicable_adjustments.append(matched_adjustment) From 436a3646d75a9981f16d63484d229f0775b77755 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 10:12:48 -0700 Subject: [PATCH 19/32] Updates to summarize methods for scalars --- .../kpi_forecasting/models/scalar_forecast.py | 221 +++++++++++++++--- 1 file changed, 183 insertions(+), 38 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 0a79c208..ff4c3375 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from datetime import datetime import re from typing import Dict, List @@ -7,8 +8,10 @@ import numpy as np import pandas as pd +from kpi_forecasting import pandas_extras as pdx from kpi_forecasting.configs.model_inputs import parse_scalar_adjustments from kpi_forecasting.models.base_forecast import BaseForecast +from kpi_forecasting.metric_hub import ForecastDataPull @dataclass @@ -33,6 +36,15 @@ def __post_init__(self) -> None: """ super().__post_init__() + # For forecast data, we need to overwrite the start and end date based on the dates + ## of the forecast data we've pulled in __post_init + if isinstance(self.metric_hub, ForecastDataPull): + self.start_date = pd.to_datetime(self.metric_hub.forecast_start_date) + self.end_date = pd.to_datetime(self.observed_df["submission_date"].max()) + self.dates_to_predict = pd.DataFrame( + {"submission_date": pd.date_range(self.start_date, self.end_date).date} + ) + # Get the list of adjustments for the metric slug being forecasted. That ## slug must be a key in scalar_adjustments.yaml; otherwise, this will raise a KeyError self.scalar_adjustments = parse_scalar_adjustments( @@ -47,9 +59,9 @@ def __post_init__(self) -> None: # Set up the columns to be used to join the observed_df to the forecast_df in subsequent ## methods - self.join_columns = self.combination_df.columns.to_list() + ["submission_date"] + self.join_columns = self.combination_df.columns.to_list() - # Rename the value column to the metric slug name, to enable supporting a formula with + # Rename the value column to the metric alias name, to enable supporting a formula with ## covariates in the future self.observed_df.rename(columns={"value": self.metric_hub.alias}, inplace=True) @@ -96,12 +108,15 @@ def _add_scalar_columns(self) -> None: adj_df = scalar_adjustment.adjustments_dataframe.rename( columns={"value": f"scalar_{scalar_adjustment.name}"} ) - + self.forecast_df["submission_date"] = pd.to_datetime( + self.forecast_df["submission_date"] + ) + adj_df["start_date"] = pd.to_datetime(adj_df["start_date"]) # Merge asof to align values based on start dates and dimensions self.forecast_df = pd.merge_asof( self.forecast_df.sort_values("submission_date"), adj_df.sort_values("start_date"), - by=[self.combination_df.columns], + by=self.join_columns, left_on="submission_date", right_on="start_date", direction="backward", @@ -129,37 +144,48 @@ def _fit(self) -> None: ## where the forecast is a scalar * previously observed data pop_dict = self._parse_formula_for_over_period_changes() if pop_dict: - for metric, period in pop_dict: + for metric, period in pop_dict.items(): metric_pop_name = f"{metric}_{period}" # Create date column in the forecast_df with the specified date offset ## in order to merge in observed data from that period offset = self.period_names_map[period] - self.forecast_df[f"{metric_pop_name}_date"] = pd.to_datetime( + self.forecast_df[f"{metric_pop_name}_date"] = ( self.forecast_df["submission_date"] - offset ) + self.forecast_df[f"{metric_pop_name}_date"] = pd.to_datetime( + self.forecast_df[f"{metric_pop_name}_date"] + ) + self.observed_df["submission_date"] = pd.to_datetime( + self.observed_df["submission_date"] + ) + # Merge observed data to be used in adjustments - self.forecast_df.merge( - self.observed_df[[*self.join_columns, metric]], + self.forecast_df = self.forecast_df.merge( + self.observed_df[ + [*self.join_columns, "submission_date", metric] + ].rename(columns={"submission_date": "join_date"}), how="left", - left_on=f"{metric_pop_name}_date", - right_on="submission_date", - inplace=True, + left_on=[*self.join_columns, f"{metric_pop_name}_date"], + right_on=[*self.join_columns, "join_date"], ) # Remove unneeded date column - self.forecast_df.drop(columns=[f"{metric_pop_name}_date"], inplace=True) + self.forecast_df.drop( + columns=[f"{metric_pop_name}_date", "join_date"], inplace=True + ) # For cases where period-over-period change isn't defined, copy over the observed_df values into ## the forecast_df. Check for values in the forecast period and raise an error if it's filled with ## nan. else: - self.forecast_df.merge( - self.observed_df[[*self.join_columns, metric]], + self.forecast_df = self.forecast_df.merge( + self.observed_df[ + [*self.join_columns, self.metric_hub.alias, "submission_date"] + ], how="left", - on="submission_date", - inplace=True, + on=[*self.join_columns, "submission_date"], ) # The forecast data should have no nan values if ( @@ -197,20 +223,130 @@ def _predict(self) -> None: [c for c in self.forecast_df.columns if "scalar" in c] ].to_dict(orient="records") - def _summarize(self, period: str) -> pd.DataFrame: + self.observed_df.rename(columns={self.metric_hub.alias: "value"}, inplace=True) + + def _summarize( + self, + period: str, + ) -> pd.DataFrame: """ - In cases where no summarization is required, adds the expected columns to a summary DataFrame. + Calculate summary metrics for `forecast_df` over a given period, and add metadata. Args: - period (str): Aggregation period that should be consistent with the aggregation period of - the observed data. + period (str): The period for aggregation. + numpy_aggregations (List[str]): List of numpy aggregation functions. + percentiles (List[int]): List of percentiles. + + Returns: + pd.DataFrame: The summarized dataframe. + """ + + df_list = [] + for _, segment_row in self.combination_df.iterrows(): + # find indices in observed_df for rows that exactly match segment dict + segment_historical_indices = ( + self.observed_df[segment_row.index.to_list()] == segment_row + ).all(axis=1) + + segment_forecast_indices = ( + self.forecast_df[segment_row.index.to_list()] == segment_row + ).all(axis=1) + + # aggregate metric to the correct date period (day, month, year) + observed_summarized = pdx.aggregate_to_period( + ( + self.observed_df.loc[ + (segment_historical_indices) + & ( + pd.to_datetime(self.observed_df["submission_date"]) + < self.start_date + ), + ["submission_date", "value"], + ].copy() + ), + period, + ) + forecast_agg = pdx.aggregate_to_period( + ( + self.forecast_df.loc[ + (segment_forecast_indices), + ["submission_date", "value"], + ].copy() + ), + period, + ) + + # find periods of overlap between observed and forecasted data + forecast_with_overlap = forecast_agg.merge( + observed_summarized, + on="submission_date", + how="left", + suffixes=("_forecast", "_observed"), + ).fillna(0) + forecast_with_overlap["value"] = forecast_with_overlap[ + ["value_forecast", "value_observed"] + ].sum(axis=1) + + # add datasource-specific metadata columns + forecast_with_overlap["source"] = "forecast" + observed_summarized["source"] = "historical" + + # create a single dataframe that contains observed and forecasted data + df = pd.concat([observed_summarized, forecast_with_overlap]) + + # add summary metadata columns + df["aggregation_period"] = period.lower() + + # add the expected percentile fields to the df + df["value_low"] = df["value"] + df["value_mid"] = df["value"] + df["value_high"] = df["value"] + + # reorder columns to make interpretation easier + df = df[ + [ + "submission_date", + "aggregation_period", + "source", + "value", + "value_low", + "value_mid", + "value_high", + ] + ] + + # add segment columns to table + for dim, value in zip(segment_row.index, segment_row.values): + df[dim] = value + + # add Metric Hub metadata columns + df["metric_alias"] = self.metric_hub.alias.lower() + df["metric_hub_app_name"] = self.metric_hub.app_name.lower() + df["metric_hub_slug"] = self.metric_hub.slug.lower() + df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date) + df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date) + df["metric_collected_at"] = self.collected_at + + # add forecast model metadata columns + df["forecast_start_date"] = self.start_date + df["forecast_end_date"] = self.end_date + df["forecast_trained_at"] = self.trained_at + df["forecast_predicted_at"] = self.predicted_at + + df_list.append(df.copy()) + + return pd.concat(df_list) + + def _add_summary_metadata(self, periods: List[str] | str): + """ + In cases where no summarization is required, adds the expected columns to a summary DataFrame. """ - if isinstance(period, list): - if len(period) > 1: + if isinstance(periods, list): + if len(periods) > 1: raise ValueError( "Can only supply one aggregation period when not summarizing results." ) - period = period[0] + period = periods[0] df = self.forecast_df.copy() df["source"] = np.where( @@ -228,7 +364,7 @@ def _summarize(self, period: str) -> pd.DataFrame: # add Metric Hub metadata columns df["metric_alias"] = self.metric_hub.alias.lower() df["metric_hub_app_name"] = self.metric_hub.app_name.lower() - df["metric_hub_slug"] = self.metric_hub.slug.lower() + df["metric_hub_slug"] = self.metric_hub.alias.lower() df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date) df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date) df["metric_collected_at"] = self.collected_at @@ -239,25 +375,37 @@ def _summarize(self, period: str) -> pd.DataFrame: df["forecast_trained_at"] = self.trained_at df["forecast_predicted_at"] = self.predicted_at + # add other value percentile columns expected from Prophet-based forecasts. Include just the + ## value as these percentiles. Coule be replaced in the future with the option to pass multiple + ## scenarios to scalar forecasts. + df["value_low"] = df["value"] + df["value_mid"] = df["value"] + df["value_high"] = df["value"] + + self.summary_df = df + + def predict(self) -> None: + """Generate a forecast from `start_date` to `end_date`.""" + print(f"Forecasting from {self.start_date} to {self.end_date}.", flush=True) + self._set_seed() + self.predicted_at = datetime.utcnow() + self._predict() + def summarize( self, requires_summarization: bool = True, periods: List[str] | str = ["day", "month"], - numpy_aggregations: List[str] = ["mean"], - percentiles: List[int] = [10, 50, 90], ) -> None: """ There are cases where forecasts created by this class do not require summarization (e.g. the scalar adjustment was made to a prior forecast) """ if not requires_summarization: - self._summarize(periods) + self._add_summary_metadata(self, periods) else: - # If summarization is required, use the summarization method in the BaseForecast class - self.summary_df = pd.concat( - [self._summarize(i, numpy_aggregations, percentiles) for i in periods] - ) + # If summarization is required, use the summarization method + self.summary_df = pd.concat([self._summarize(i) for i in periods]) def write_results( self, @@ -265,8 +413,6 @@ def write_results( dataset: str, table: str, write_disposition: str = "WRITE_APPEND", - components_table: str = "", - components_dataset: str = "", ) -> None: """ Write `self.summary_df` to Big Query. @@ -287,13 +433,13 @@ def write_results( client = bigquery.Client(project=project) schema = [ bigquery.SchemaField("submission_date", bq_types.DATE), - *[ - bigquery.SchemaField(k, bq_types.STRING) - for k in self.metric_hub.segments.keys() - ], + *[bigquery.SchemaField(k, bq_types.STRING) for k in self.join_columns], bigquery.SchemaField("aggregation_period", bq_types.STRING), bigquery.SchemaField("source", bq_types.STRING), bigquery.SchemaField("value", bq_types.FLOAT), + bigquery.SchemaField("value_low", bq_types.FLOAT), + bigquery.SchemaField("value_mid", bq_types.FLOAT), + bigquery.SchemaField("value_high", bq_types.FLOAT), bigquery.SchemaField("metric_alias", bq_types.STRING), bigquery.SchemaField("metric_hub_app_name", bq_types.STRING), bigquery.SchemaField("metric_hub_slug", bq_types.STRING), @@ -304,7 +450,6 @@ def write_results( bigquery.SchemaField("forecast_end_date", bq_types.DATE), bigquery.SchemaField("forecast_trained_at", bq_types.TIMESTAMP), bigquery.SchemaField("forecast_predicted_at", bq_types.TIMESTAMP), - bigquery.SchemaField("forecast_parameters", bq_types.STRING), ] job = client.load_table_from_dataframe( dataframe=self.summary_df, From ba6c544ef6127f7ed3d3a1b488dcb4240dc82c90 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 10:13:17 -0700 Subject: [PATCH 20/32] Ad clicks scalar model config --- .../model_inputs/scalar_adjustments.yaml | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml index 6905fda4..a9e3e5e3 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml @@ -103,3 +103,34 @@ search_forecasting_revenue_per_ad_click: adjustments: - start_date: "2024-01-01" value: 1.04 + +search_forecasting_ad_clicks: + - name: "project_denali" + description: "Estimate of ad click impact from Project Denali on mobile in May 2024." + adjustments: + - forecast_start_date: "2024-05-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-05-01" + value: 1.15 + - start_date: "2024-06-01" + value: 1.0 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-05-01" + value: 1.45 + - start_date: "2024-06-01" + value: 1.0 From df6bac2d4d1f69b56234ec6a67902a96b895ccbb Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 10:28:46 -0700 Subject: [PATCH 21/32] Rename config --- ...tments.yaml => search_forecasting_ad_clicks_adjustmented.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename jobs/kpi-forecasting/kpi_forecasting/configs/{search_forecasting_ad_clicks_adjustments.yaml => search_forecasting_ad_clicks_adjustmented.yaml} (100%) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustments.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustmented.yaml similarity index 100% rename from jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustments.yaml rename to jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustmented.yaml From 04d956e9e824bee4e791391e1b1c2b6879c1ce38 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 11:36:01 -0700 Subject: [PATCH 22/32] Rename config again --- ...justmented.yaml => search_forecasting_ad_clicks_adjusted.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename jobs/kpi-forecasting/kpi_forecasting/configs/{search_forecasting_ad_clicks_adjustmented.yaml => search_forecasting_ad_clicks_adjusted.yaml} (100%) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustmented.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjusted.yaml similarity index 100% rename from jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjustmented.yaml rename to jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjusted.yaml From 6fde34d6310c20fb289f6c0b50f5025ae6a5eab1 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 11:46:09 -0700 Subject: [PATCH 23/32] Fix error in inputs --- jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index ff4c3375..19290563 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -401,7 +401,7 @@ def summarize( scalar adjustment was made to a prior forecast) """ if not requires_summarization: - self._add_summary_metadata(self, periods) + self._add_summary_metadata(periods) else: # If summarization is required, use the summarization method From fd27d5acc0a1a3fc317b9e373dcd044f8de21376 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 12:50:58 -0700 Subject: [PATCH 24/32] Fix start date for monthly forecasts --- .../kpi_forecasting/models/scalar_forecast.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 19290563..b29ec587 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -45,6 +45,9 @@ def __post_init__(self) -> None: {"submission_date": pd.date_range(self.start_date, self.end_date).date} ) + elif all(pd.to_datetime(self.observed_df["submission_date"]).dt.day == 1): + self.start_date = self._default_start_date_monthly + # Get the list of adjustments for the metric slug being forecasted. That ## slug must be a key in scalar_adjustments.yaml; otherwise, this will raise a KeyError self.scalar_adjustments = parse_scalar_adjustments( @@ -79,6 +82,12 @@ def period_names_map(self) -> Dict[str, pd.DateOffset]: """ return {"YOY": pd.DateOffset(years=1), "MOM": pd.DateOffset(months=1)} + @property + def _default_start_date_monthly(self) -> str: + """The first day after the last date in the observed dataset.""" + return self.observed_df["submission_date"].max() + pd.DateOffset(months=1) + + def _parse_formula_for_over_period_changes(self) -> Dict | None: """ Find period-over-period metric specifications in provided formula. If present, create a dict that @@ -348,7 +357,13 @@ def _add_summary_metadata(self, periods: List[str] | str): ) period = periods[0] - df = self.forecast_df.copy() + union_cols = [ + "submission_date", + *self.join_columns, + "value", + ] + df = pd.concat([self.forecast_df[union_cols], self.observed_df[union_cols]]) + df["source"] = np.where( df["submission_date"] < self.start_date, "historical", @@ -360,6 +375,7 @@ def _add_summary_metadata(self, periods: List[str] | str): "forecast", ) + df["submission_date"] = pd.to_datetime(df["submission_date"]) df["aggregation_period"] = period # add Metric Hub metadata columns df["metric_alias"] = self.metric_hub.alias.lower() @@ -370,7 +386,7 @@ def _add_summary_metadata(self, periods: List[str] | str): df["metric_collected_at"] = self.collected_at # add forecast model metadata columns - df["forecast_start_date"] = self.start_date + df["forecast_start_date"] = pd.to_datetime(self.start_date) df["forecast_end_date"] = self.end_date df["forecast_trained_at"] = self.trained_at df["forecast_predicted_at"] = self.predicted_at @@ -382,7 +398,7 @@ def _add_summary_metadata(self, periods: List[str] | str): df["value_mid"] = df["value"] df["value_high"] = df["value"] - self.summary_df = df + self.summary_df = df.dropna(subset=["value"]) def predict(self) -> None: """Generate a forecast from `start_date` to `end_date`.""" From 809fa2c9c58b98964d9f894c9e7242953ae84da6 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Thu, 18 Jul 2024 15:08:16 -0700 Subject: [PATCH 25/32] Config update --- .../model_inputs/scalar_adjustments.yaml | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml index a9e3e5e3..45ac6026 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml @@ -103,6 +103,32 @@ search_forecasting_revenue_per_ad_click: adjustments: - start_date: "2024-01-01" value: 1.04 + - forecast_start_date: "2024-06-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.03 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.04 search_forecasting_ad_clicks: - name: "project_denali" From 86dff3fcf86f43d387a8d4976a8880bd8caad221 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Fri, 26 Jul 2024 04:30:29 -0700 Subject: [PATCH 26/32] Remove forecast adjustment methods --- jobs/kpi-forecasting/kpi_forecasting.py | 4 ++-- .../kpi_forecasting/models/scalar_forecast.py | 16 +++++----------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting.py b/jobs/kpi-forecasting/kpi_forecasting.py index b16bd929..81f2ceb7 100644 --- a/jobs/kpi-forecasting/kpi_forecasting.py +++ b/jobs/kpi-forecasting/kpi_forecasting.py @@ -18,9 +18,9 @@ def main() -> None: config = YAML(filepath=CLI().args.config).data model_type = config.forecast_model.model_type - if hasattr(config, "metric_hub"): + if "metric_hub" in dir(config): data_puller = MetricHub(**config.metric_hub) - elif hasattr(config, "forecast_data_pull"): + elif "forecast_data_pull" in dir(config): data_puller = ForecastDataPull(**config.forecast_data_pull) else: raise KeyError("No metric_hub or forecast_data_pull key in config to pull data.") diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index b29ec587..365f2d9e 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -36,16 +36,11 @@ def __post_init__(self) -> None: """ super().__post_init__() - # For forecast data, we need to overwrite the start and end date based on the dates - ## of the forecast data we've pulled in __post_init - if isinstance(self.metric_hub, ForecastDataPull): - self.start_date = pd.to_datetime(self.metric_hub.forecast_start_date) - self.end_date = pd.to_datetime(self.observed_df["submission_date"].max()) - self.dates_to_predict = pd.DataFrame( - {"submission_date": pd.date_range(self.start_date, self.end_date).date} - ) + # For monthly-level data, must adjust the start date to the first full month after the + ## observed df's last date. Otherwise, the first forecast date will be the first day after + ## rather than the first month after the historical data - elif all(pd.to_datetime(self.observed_df["submission_date"]).dt.day == 1): + if all(pd.to_datetime(self.observed_df["submission_date"]).dt.day == 1): self.start_date = self._default_start_date_monthly # Get the list of adjustments for the metric slug being forecasted. That @@ -54,8 +49,7 @@ def __post_init__(self) -> None: self.metric_hub.slug, self.start_date ) - # Construct a DataFrame containing all combination of segment values - ## in the observed_df + # Construct a DataFrame containing all combination of segment values in the observed_df self.combination_df = self.observed_df[ self.metric_hub.segments.keys() ].drop_duplicates() From 4b5b5e6f617cdfec608836288b8cc3664ff92281 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Fri, 26 Jul 2024 04:33:20 -0700 Subject: [PATCH 27/32] Add tests --- .../kpi_forecasting/models/scalar_forecast.py | 2 - .../tests/test_scalar_forecast.py | 116 ++++++++++++++++++ 2 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 365f2d9e..06ca463b 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -11,7 +11,6 @@ from kpi_forecasting import pandas_extras as pdx from kpi_forecasting.configs.model_inputs import parse_scalar_adjustments from kpi_forecasting.models.base_forecast import BaseForecast -from kpi_forecasting.metric_hub import ForecastDataPull @dataclass @@ -81,7 +80,6 @@ def _default_start_date_monthly(self) -> str: """The first day after the last date in the observed dataset.""" return self.observed_df["submission_date"].max() + pd.DateOffset(months=1) - def _parse_formula_for_over_period_changes(self) -> Dict | None: """ Find period-over-period metric specifications in provided formula. If present, create a dict that diff --git a/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py new file mode 100644 index 00000000..e08065da --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py @@ -0,0 +1,116 @@ +import pytest +from unittest.mock import patch, MagicMock +import pandas as pd +from kpi_forecasting.models.scalar_forecast import ScalarForecast + + +@pytest.fixture +def setup_forecast(): + observed_df = pd.DataFrame( + { + "submission_date": pd.date_range(start="2020-01-01", periods=6, freq="M"), + "value": [100, 150, 200, 250, 300, 350], + "segment": ["A", "A", "A", "B", "B", "B"], + } + ) + dates_to_predict = pd.DataFrame( + {"submission_date": pd.date_range(start="2020-07-01", periods=6, freq="M")} + ) + metric_hub = MagicMock() + metric_hub.slug = "metric_slug" + metric_hub.segments = {"segment": ["A", "B"]} + metric_hub.alias = "metric_alias" + metric_hub.app_name = "app_name" + metric_hub.min_date = "2019-01-01" + metric_hub.max_date = "2020-06-01" + + start_date = "2020-07-01" + end_date = "2020-12-31" + scalar_adjustments = [MagicMock()] + parameters = MagicMock() + parameters.formula = "metric:YOY + metric2:MOM" + + forecast = ScalarForecast( + observed_df=observed_df, + dates_to_predict=dates_to_predict, + metric_hub=metric_hub, + start_date=start_date, + end_date=end_date, + scalar_adjustments=scalar_adjustments, + parameters=parameters, + ) + + return forecast + + +def test_post_init(setup_forecast): + forecast = setup_forecast + assert forecast.start_date == "2020-07-01" + assert len(forecast.scalar_adjustments) == 1 + assert list(forecast.combination_df.columns) == ["segment"] + + +def test_period_names_map(setup_forecast): + forecast = setup_forecast + assert forecast.period_names_map == { + "YOY": pd.DateOffset(years=1), + "MOM": pd.DateOffset(months=1), + } + + +def test_parse_formula_for_over_period_changes(setup_forecast): + forecast = setup_forecast + result = forecast._parse_formula_for_over_period_changes() + assert result == {"metric": "YOY", "metric2": "MOM"} + + +def test_add_scalar_columns(setup_forecast): + forecast = setup_forecast + forecast.forecast + _df = forecast.dates_to_predict.merge( + forecast.combination_df, how="cross" + ) + forecast._add_scalar_columns() + assert "scalar_mock" in forecast.forecast_df.columns + + +def test_fit(setup_forecast): + forecast = setup_forecast + with patch.object( + forecast, "_parse_formula_for_over_period_changes", return_value=None + ), patch.object(forecast, "_add_scalar_columns"): + forecast._fit() + assert forecast.metric_hub.alias in forecast.forecast_df.columns + assert not forecast.forecast_df[forecast.metric_hub.alias].isnull().any() + + +def test_predict(setup_forecast): + forecast = setup_forecast + with patch.object(forecast, "_set_seed"), patch.object(forecast, "_predict"): + forecast.predict() + assert forecast.predicted_at is not None + + +def test_summarize(setup_forecast): + forecast = setup_forecast + with patch.object( + forecast, "_summarize", return_value=pd.DataFrame() + ), patch.object(forecast, "_add_summary_metadata"): + forecast.summarize(requires_summarization=False) + assert forecast.summary_df is not None + + +@patch("bigquery.Client") +def test_write_results(mock_client, setup_forecast): + forecast = setup_forecast + mock_client_instance = mock_client.return_value + mock_load_job = MagicMock() + mock_client_instance.load_table_from_dataframe.return_value = mock_load_job + mock_load_job.result.return_value = None + + forecast.summary_df = pd.DataFrame( + {"submission_date": ["2020-07-01"], "value": [100]} + ) + + forecast.write_results("project", "dataset", "table") + mock_client_instance.load_table_from_dataframe.assert_called_once() From df69c0fde04af6040a227b7f4f117e7ed4999ef1 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Fri, 26 Jul 2024 04:40:55 -0700 Subject: [PATCH 28/32] Update tests --- .../kpi_forecasting/tests/test_scalar_forecast.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py index e08065da..2a790236 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py @@ -31,15 +31,17 @@ def setup_forecast(): parameters.formula = "metric:YOY + metric2:MOM" forecast = ScalarForecast( - observed_df=observed_df, - dates_to_predict=dates_to_predict, - metric_hub=metric_hub, + model_type="scalar", + parameters=parameters, + use_holidays=False, start_date=start_date, end_date=end_date, - scalar_adjustments=scalar_adjustments, - parameters=parameters, + metric_hub=metric_hub ) + forecast.observed_df = observed_df + forecast.scalar_adjustments = scalar_adjustments + return forecast From 3f1ece7c53813ed8fd38551e9f48591e9d28ce96 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Fri, 26 Jul 2024 04:43:41 -0700 Subject: [PATCH 29/32] Remove unneeded variable --- .../kpi_forecasting/tests/test_scalar_forecast.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py index 2a790236..cc24c8a7 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py @@ -13,9 +13,7 @@ def setup_forecast(): "segment": ["A", "A", "A", "B", "B", "B"], } ) - dates_to_predict = pd.DataFrame( - {"submission_date": pd.date_range(start="2020-07-01", periods=6, freq="M")} - ) + metric_hub = MagicMock() metric_hub.slug = "metric_slug" metric_hub.segments = {"segment": ["A", "B"]} @@ -36,7 +34,7 @@ def setup_forecast(): use_holidays=False, start_date=start_date, end_date=end_date, - metric_hub=metric_hub + metric_hub=metric_hub, ) forecast.observed_df = observed_df @@ -69,9 +67,7 @@ def test_parse_formula_for_over_period_changes(setup_forecast): def test_add_scalar_columns(setup_forecast): forecast = setup_forecast forecast.forecast - _df = forecast.dates_to_predict.merge( - forecast.combination_df, how="cross" - ) + _df = forecast.dates_to_predict.merge(forecast.combination_df, how="cross") forecast._add_scalar_columns() assert "scalar_mock" in forecast.forecast_df.columns From b75c46a10b6b6178b3318cf785d7f6f0e0077010 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Wed, 7 Aug 2024 15:35:20 -0700 Subject: [PATCH 30/32] Fixes error in pulling training data from Prophet model --- .../kpi_forecasting/models/funnel_forecast.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py index f41ac68a..3ff7dc64 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py @@ -405,8 +405,18 @@ def _predict(self, segment_settings: SegmentModelSettings) -> pd.DataFrame: # error rates and how components resulted in those predictions. The `fillna` # call will fill the missing y values for forecasted dates, where only yhat # is available. + + segment_historical_indices = ( + self.observed_df[list(segment_settings.segment)] + == pd.Series(segment_settings.segment) + ).all(axis=1) + + observed_y = self.observed_df.loc[(segment_historical_indices)].rename( + columns=self.column_names_map + )[["ds", "y"]] + observed_y["ds"] = pd.to_datetime(observed_y["ds"]) components_df = components_df.merge( - segment_settings.segment_model.history[["ds", "y"]], + observed_y, on="ds", how="left", ).fillna(0) From 18dead8c1b543e3f9119740247c81e133b9b3446 Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Wed, 14 Aug 2024 07:30:33 -0700 Subject: [PATCH 31/32] Adjust post_init for testing --- .../kpi_forecasting/models/scalar_forecast.py | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 06ca463b..82b8bd6c 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -42,16 +42,18 @@ def __post_init__(self) -> None: if all(pd.to_datetime(self.observed_df["submission_date"]).dt.day == 1): self.start_date = self._default_start_date_monthly + + if self.metric_hub is None: + # this is used to avoid the code below for testing purposes + return + # Get the list of adjustments for the metric slug being forecasted. That ## slug must be a key in scalar_adjustments.yaml; otherwise, this will raise a KeyError self.scalar_adjustments = parse_scalar_adjustments( self.metric_hub.slug, self.start_date ) - # Construct a DataFrame containing all combination of segment values in the observed_df - self.combination_df = self.observed_df[ - self.metric_hub.segments.keys() - ].drop_duplicates() + self._prep_class_dataframes(self.observed_df, self.metric_hub.segments.keys()) # Set up the columns to be used to join the observed_df to the forecast_df in subsequent ## methods @@ -61,10 +63,6 @@ def __post_init__(self) -> None: ## covariates in the future self.observed_df.rename(columns={"value": self.metric_hub.alias}, inplace=True) - # Cross join to the dates_to_predict DataFrame to create a DataFrame that contains a row - ## for each forecast date for each segment - self.forecast_df = self.dates_to_predict.merge(self.combination_df, how="cross") - @property def period_names_map(self) -> Dict[str, pd.DateOffset]: """ @@ -80,6 +78,26 @@ def _default_start_date_monthly(self) -> str: """The first day after the last date in the observed dataset.""" return self.observed_df["submission_date"].max() + pd.DateOffset(months=1) + def _prep_class_dataframes(self, observed_df: pd.DataFrame, segment_column_list: List) -> None: + """ + Prepares the dataframes necessary to identify segment combinations and hold results + of scalar forecasting. + + Args: + observed_df (pd.DataFrame): dataframe containing observed data used to model + must contain columns specified in the keys of the segments section of the config + segment_column_list (list): list of columns of observed_df to use to determine segments + """ + + # Construct a DataFrame containing all combination of segment values in the observed_df + self.combination_df = observed_df[ + segment_column_list + ].drop_duplicates() + + # Cross join to the dates_to_predict DataFrame to create a DataFrame that contains a row + ## for each forecast date for each segment + self.forecast_df = self.dates_to_predict.merge(self.combination_df, how="cross") + def _parse_formula_for_over_period_changes(self) -> Dict | None: """ Find period-over-period metric specifications in provided formula. If present, create a dict that From c619059d4072ce606e8f12513f1e66708c9f002c Mon Sep 17 00:00:00 2001 From: m-d-bowerman Date: Wed, 14 Aug 2024 07:32:13 -0700 Subject: [PATCH 32/32] Ruff format changes --- .../kpi_forecasting/models/scalar_forecast.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py index 82b8bd6c..ac03f895 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -42,11 +42,10 @@ def __post_init__(self) -> None: if all(pd.to_datetime(self.observed_df["submission_date"]).dt.day == 1): self.start_date = self._default_start_date_monthly - if self.metric_hub is None: # this is used to avoid the code below for testing purposes return - + # Get the list of adjustments for the metric slug being forecasted. That ## slug must be a key in scalar_adjustments.yaml; otherwise, this will raise a KeyError self.scalar_adjustments = parse_scalar_adjustments( @@ -78,7 +77,9 @@ def _default_start_date_monthly(self) -> str: """The first day after the last date in the observed dataset.""" return self.observed_df["submission_date"].max() + pd.DateOffset(months=1) - def _prep_class_dataframes(self, observed_df: pd.DataFrame, segment_column_list: List) -> None: + def _prep_class_dataframes( + self, observed_df: pd.DataFrame, segment_column_list: List + ) -> None: """ Prepares the dataframes necessary to identify segment combinations and hold results of scalar forecasting. @@ -90,9 +91,7 @@ def _prep_class_dataframes(self, observed_df: pd.DataFrame, segment_column_list: """ # Construct a DataFrame containing all combination of segment values in the observed_df - self.combination_df = observed_df[ - segment_column_list - ].drop_duplicates() + self.combination_df = observed_df[segment_column_list].drop_duplicates() # Cross join to the dates_to_predict DataFrame to create a DataFrame that contains a row ## for each forecast date for each segment