From c541ca248d02208fac9b86adb766f0191bf35f71 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 14:24:44 -0400 Subject: [PATCH] first pass --- dbt/adapters/base/impl.py | 48 ++++++++++++++++++++++++++++++ dbt/adapters/contracts/relation.py | 5 +--- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index c6091887..1c165b5c 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1283,6 +1283,54 @@ def calculate_freshness( } return adapter_response, freshness + def calculate_freshness_from_metadata_batch( + self, + sources: List[BaseRelation], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], List[FreshnessResponse]]: + assert len(sources) > 0 + + # TODO: what should information_schema here be? + kwargs: Dict[str, Any] = { + "information_schema": sources[0].information_schema_only(), + "relations": sources, + } + result = self.execute_macro( + GET_RELATION_LAST_MODIFIED_MACRO_NAME, + kwargs=kwargs, + macro_resolver=macro_resolver, + ) + + adapter_response, table = result.response, result.table # type: ignore[attr-defined] + + fresnhess_responses = [] + # TODO: refactor most of this to reuse internals from calculate_freshness_from_metadata + for row in table: + try: + last_modified_val = get_column_value_uncased("last_modified", row) + snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) + except Exception: + raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) + + if last_modified_val is None: + # Interpret missing value as "infinitely long ago" + max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + else: + max_loaded_at = _utc(last_modified_val, None, "last_modified") + + snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") + + age = (snapshotted_at - max_loaded_at).total_seconds() + + freshness: FreshnessResponse = { + "max_loaded_at": max_loaded_at, + "snapshotted_at": snapshotted_at, + "age": age, + } + fresnhess_responses.append(freshness) + + return adapter_response, fresnhess_responses + def calculate_freshness_from_metadata( self, source: BaseRelation, diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index b9836d06..3028bd0f 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -38,7 +38,7 @@ class MaterializationConfig(Mapping, ABC): on_schema_change: Optional[str] on_configuration_change: OnConfigurationChangeOption contract: MaterializationContract - _extra: Dict[str, Any] + extra: Dict[str, Any] def __contains__(self, item): ... @@ -48,7 +48,6 @@ def __delitem__(self, key): class RelationConfig(Protocol): - resource_type: str name: str database: str schema: str @@ -56,8 +55,6 @@ class RelationConfig(Protocol): compiled_code: Optional[str] quoting_dict: Dict[str, bool] config: Optional[MaterializationConfig] - meta: Dict[str, Any] - tags: List[str] class ComponentName(StrEnum):