From 2beba0bc68b6c5b0aecddc2f69f660e0260c9a26 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 1 Mar 2024 10:48:30 -0500 Subject: [PATCH 01/19] extend RelationConfig and MaterializationConfig --- dbt/adapters/contracts/relation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index 3028bd0f..b9836d06 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -38,7 +38,7 @@ class MaterializationConfig(Mapping, ABC): on_schema_change: Optional[str] on_configuration_change: OnConfigurationChangeOption contract: MaterializationContract - extra: Dict[str, Any] + _extra: Dict[str, Any] def __contains__(self, item): ... @@ -48,6 +48,7 @@ def __delitem__(self, key): class RelationConfig(Protocol): + resource_type: str name: str database: str schema: str @@ -55,6 +56,8 @@ class RelationConfig(Protocol): compiled_code: Optional[str] quoting_dict: Dict[str, bool] config: Optional[MaterializationConfig] + meta: Dict[str, Any] + tags: List[str] class ComponentName(StrEnum): From 727093c0c230edc4d1746ba0efe90fc903fbd5ba Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 14:24:44 -0400 Subject: [PATCH 02/19] first pass --- dbt/adapters/base/impl.py | 48 ++++++++++++++++++++++++++++++ dbt/adapters/contracts/relation.py | 5 +--- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index c6091887..ca6603d6 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1283,6 +1283,54 @@ def calculate_freshness( } return adapter_response, freshness + def calculate_freshness_from_metadata_batch( + self, + sources: List[BaseRelation], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], List[FreshnessResponse]]: + assert len(sources) > 0 + + # TODO: what should information_schema here be? + kwargs: Dict[str, Any] = { + "information_schema": sources[0].information_schema_only(), + "relations": sources, + } + result = self.execute_macro( + GET_RELATION_LAST_MODIFIED_MACRO_NAME, + kwargs=kwargs, + macro_resolver=macro_resolver, + ) + + adapter_response, table = result.response, result.table # type: ignore[attr-defined] + + freshness_responses = [] + # TODO: refactor most of this to reuse internals from calculate_freshness_from_metadata + for row in table: + try: + last_modified_val = get_column_value_uncased("last_modified", row) + snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) + except Exception: + raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) + + if last_modified_val is None: + # Interpret missing value as "infinitely long ago" + max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + else: + max_loaded_at = _utc(last_modified_val, None, "last_modified") + + snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") + + age = (snapshotted_at - max_loaded_at).total_seconds() + + freshness: FreshnessResponse = { + "max_loaded_at": max_loaded_at, + "snapshotted_at": snapshotted_at, + "age": age, + } + freshness_responses.append(freshness) + + return adapter_response, freshness_responses + def calculate_freshness_from_metadata( self, source: BaseRelation, diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index b9836d06..3028bd0f 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -38,7 +38,7 @@ class MaterializationConfig(Mapping, ABC): on_schema_change: Optional[str] on_configuration_change: OnConfigurationChangeOption contract: MaterializationContract - _extra: Dict[str, Any] + extra: Dict[str, Any] def __contains__(self, item): ... @@ -48,7 +48,6 @@ def __delitem__(self, key): class RelationConfig(Protocol): - resource_type: str name: str database: str schema: str @@ -56,8 +55,6 @@ class RelationConfig(Protocol): compiled_code: Optional[str] quoting_dict: Dict[str, bool] config: Optional[MaterializationConfig] - meta: Dict[str, Any] - tags: List[str] class ComponentName(StrEnum): From ffaf80874711aa53b82f12819ae4af5705ce2bdf Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 18:15:16 -0400 Subject: [PATCH 03/19] accept information schema in calculate_freshness_from_metadata_batch --- dbt/adapters/base/impl.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index ca6603d6..ca355899 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1286,25 +1286,21 @@ def calculate_freshness( def calculate_freshness_from_metadata_batch( self, sources: List[BaseRelation], + information_schema: InformationSchema, macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], List[FreshnessResponse]]: - assert len(sources) > 0 - - # TODO: what should information_schema here be? - kwargs: Dict[str, Any] = { - "information_schema": sources[0].information_schema_only(), - "relations": sources, - } result = self.execute_macro( GET_RELATION_LAST_MODIFIED_MACRO_NAME, - kwargs=kwargs, + kwargs={ + "information_schema": information_schema, + "relations": sources, + }, macro_resolver=macro_resolver, ) - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - freshness_responses = [] # TODO: refactor most of this to reuse internals from calculate_freshness_from_metadata + freshness_responses = [] for row in table: try: last_modified_val = get_column_value_uncased("last_modified", row) From b66644a64b35321721116a3e1c0106454619363f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 18:19:31 -0400 Subject: [PATCH 04/19] implement calculate_freshness_from_metadata in terms of calculate_freshness_from_metadata_batch --- dbt/adapters/base/impl.py | 39 ++++++--------------------------------- 1 file changed, 6 insertions(+), 33 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index ca355899..808f29f2 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1332,41 +1332,14 @@ def calculate_freshness_from_metadata( source: BaseRelation, macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: - kwargs: Dict[str, Any] = { - "information_schema": source.information_schema_only(), - "relations": [source], - } - result = self.execute_macro( - GET_RELATION_LAST_MODIFIED_MACRO_NAME, - kwargs=kwargs, - macro_resolver=macro_resolver, - ) - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - - try: - row = table[0] - last_modified_val = get_column_value_uncased("last_modified", row) - snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) - except Exception: - raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) - if last_modified_val is None: - # Interpret missing value as "infinitely long ago" - max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - else: - max_loaded_at = _utc(last_modified_val, None, "last_modified") - - snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") - - age = (snapshotted_at - max_loaded_at).total_seconds() - - freshness: FreshnessResponse = { - "max_loaded_at": max_loaded_at, - "snapshotted_at": snapshotted_at, - "age": age, - } + adapter_response, freshness_responses = self.calculate_freshness_from_metadata_batch( + sources=[source], + information_schema=source.information_schema_only(), + macro_resolver=macro_resolver + ) - return adapter_response, freshness + return adapter_response, freshness_responses[0] def pre_model_hook(self, config: Mapping[str, Any]) -> Any: """A hook for running some operation before the model materialization From 54670e05f063e020b360c65d72cfee0cda809753 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 11 Mar 2024 18:21:07 -0400 Subject: [PATCH 05/19] add TableLastModifiedMetadataBatch capability --- dbt/adapters/capability.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/adapters/capability.py b/dbt/adapters/capability.py index 745cb27a..ac427513 100644 --- a/dbt/adapters/capability.py +++ b/dbt/adapters/capability.py @@ -13,6 +13,9 @@ class Capability(str, Enum): TableLastModifiedMetadata = "TableLastModifiedMetadata" """Indicates support for determining the time of the last table modification by querying database metadata.""" + TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch" + """Indicates support for determining the time of the last table modification by querying database metadata in batch.""" + class Support(str, Enum): Unknown = "Unknown" From 8046a3ea6038ea5afd014facb501f2ed7ba978d6 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 13 Mar 2024 19:22:00 -0400 Subject: [PATCH 06/19] return batched freshness results keyed by (schema, identifier) --- dbt/adapters/base/impl.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 808f29f2..72edc9b2 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -421,7 +421,9 @@ def _get_cache_schemas(self, relation_configs: Iterable[RelationConfig]) -> Set[ populate. """ return { - self.Relation.create_from(quoting=self.config, relation_config=relation_config).without_identifier() + self.Relation.create_from( + quoting=self.config, relation_config=relation_config + ).without_identifier() for relation_config in relation_configs } @@ -1284,11 +1286,11 @@ def calculate_freshness( return adapter_response, freshness def calculate_freshness_from_metadata_batch( - self, + self, sources: List[BaseRelation], information_schema: InformationSchema, macro_resolver: Optional[MacroResolverProtocol] = None, - ) -> Tuple[Optional[AdapterResponse], List[FreshnessResponse]]: + ) -> Tuple[Optional[AdapterResponse], Dict[Tuple[str, str], FreshnessResponse]]: result = self.execute_macro( GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs={ @@ -1299,12 +1301,13 @@ def calculate_freshness_from_metadata_batch( ) adapter_response, table = result.response, result.table # type: ignore[attr-defined] - # TODO: refactor most of this to reuse internals from calculate_freshness_from_metadata - freshness_responses = [] + freshness_responses: Dict[Tuple[str, str], FreshnessResponse] = {} for row in table: - try: + try: last_modified_val = get_column_value_uncased("last_modified", row) snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) + identifier = get_column_value_uncased("identifier", row) + schema = get_column_value_uncased("schema", row) except Exception: raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) @@ -1323,23 +1326,21 @@ def calculate_freshness_from_metadata_batch( "snapshotted_at": snapshotted_at, "age": age, } - freshness_responses.append(freshness) + freshness_responses[(schema, identifier)] = freshness return adapter_response, freshness_responses - + def calculate_freshness_from_metadata( self, source: BaseRelation, macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: - adapter_response, freshness_responses = self.calculate_freshness_from_metadata_batch( sources=[source], information_schema=source.information_schema_only(), - macro_resolver=macro_resolver + macro_resolver=macro_resolver, ) - - return adapter_response, freshness_responses[0] + return adapter_response, list(freshness_responses.values())[0] def pre_model_hook(self, config: Mapping[str, Any]) -> Any: """A hook for running some operation before the model materialization From 9af8c858e77a9e0f05a3e1c0162ddb86499f1219 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 16:44:41 -0400 Subject: [PATCH 07/19] handle queries across information_schema in calculate_freshness_from_metadata_batch --- dbt/adapters/base/impl.py | 92 ++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 36 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 9608c970..3f3ca8e2 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1290,45 +1290,65 @@ def calculate_freshness( def calculate_freshness_from_metadata_batch( self, sources: List[BaseRelation], - information_schema: InformationSchema, macro_resolver: Optional[MacroResolverProtocol] = None, - ) -> Tuple[Optional[AdapterResponse], Dict[Tuple[str, str], FreshnessResponse]]: - result = self.execute_macro( - GET_RELATION_LAST_MODIFIED_MACRO_NAME, - kwargs={ - "information_schema": information_schema, - "relations": sources, - }, - macro_resolver=macro_resolver, - ) - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - - freshness_responses: Dict[Tuple[str, str], FreshnessResponse] = {} - for row in table: - try: - last_modified_val = get_column_value_uncased("last_modified", row) - snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) - identifier = get_column_value_uncased("identifier", row) - schema = get_column_value_uncased("schema", row) - except Exception: - raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) - - if last_modified_val is None: - # Interpret missing value as "infinitely long ago" - max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - else: - max_loaded_at = _utc(last_modified_val, None, "last_modified") - - snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") + ) -> Tuple[Optional[AdapterResponse], Dict[BaseRelation, FreshnessResponse]]: + # Track schema, identifiers of sources for lookup from batch query + schema_identifier_to_source = { + (source.schema.lower(), source.identifier.lower()): source for source in sources + } - age = (snapshotted_at - max_loaded_at).total_seconds() + # Group metadata sources by information schema -- one query per information schema will be necessary + information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} + for source in sources: + information_schema = source.information_schema_only() + if information_schema not in information_schema_to_metadata_sources: + information_schema_to_metadata_sources[information_schema] = [source] + else: + information_schema_to_metadata_sources[information_schema].append(source) + + freshness_responses: Dict[BaseRelation, FreshnessResponse] = {} + for ( + information_schema, + sources_for_information_schema, + ) in information_schema_to_metadata_sources.items(): + result = self.execute_macro( + GET_RELATION_LAST_MODIFIED_MACRO_NAME, + kwargs={ + "information_schema": information_schema, + "relations": sources_for_information_schema, + }, + macro_resolver=macro_resolver, + ) + adapter_response, table = result.response, result.table # type: ignore[attr-defined] - freshness: FreshnessResponse = { - "max_loaded_at": max_loaded_at, - "snapshotted_at": snapshotted_at, - "age": age, - } - freshness_responses[(schema, identifier)] = freshness + for row in table: + try: + last_modified_val = get_column_value_uncased("last_modified", row) + snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) + identifier = get_column_value_uncased("identifier", row) + schema = get_column_value_uncased("schema", row) + except Exception: + raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) + + if last_modified_val is None: + # Interpret missing value as "infinitely long ago" + max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + else: + max_loaded_at = _utc(last_modified_val, None, "last_modified") + + snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") + + age = (snapshotted_at - max_loaded_at).total_seconds() + + freshness: FreshnessResponse = { + "max_loaded_at": max_loaded_at, + "snapshotted_at": snapshotted_at, + "age": age, + } + source_relation_for_result = schema_identifier_to_source[ + (schema.lower(), identifier.lower()) + ] + freshness_responses[source_relation_for_result] = freshness return adapter_response, freshness_responses From ed81529b761f83b208b4c4dc3f8798283bfda16f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 16:55:07 -0400 Subject: [PATCH 08/19] refactor _create_freshness_response --- dbt/adapters/base/impl.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 3f3ca8e2..a102b679 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1330,25 +1330,11 @@ def calculate_freshness_from_metadata_batch( except Exception: raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) - if last_modified_val is None: - # Interpret missing value as "infinitely long ago" - max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - else: - max_loaded_at = _utc(last_modified_val, None, "last_modified") - - snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") - - age = (snapshotted_at - max_loaded_at).total_seconds() - - freshness: FreshnessResponse = { - "max_loaded_at": max_loaded_at, - "snapshotted_at": snapshotted_at, - "age": age, - } + freshness_response = self._create_freshness_response(last_modified_val, snapshotted_at_val) source_relation_for_result = schema_identifier_to_source[ (schema.lower(), identifier.lower()) ] - freshness_responses[source_relation_for_result] = freshness + freshness_responses[source_relation_for_result] = freshness_response return adapter_response, freshness_responses @@ -1359,11 +1345,27 @@ def calculate_freshness_from_metadata( ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: adapter_response, freshness_responses = self.calculate_freshness_from_metadata_batch( sources=[source], - information_schema=source.information_schema_only(), macro_resolver=macro_resolver, ) return adapter_response, list(freshness_responses.values())[0] + def _create_freshness_response(self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]) -> FreshnessResponse: + if last_modified is None: + # Interpret missing value as "infinitely long ago" + max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + else: + max_loaded_at = _utc(last_modified, None, "last_modified") + + snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at") + age = (snapshotted_at - max_loaded_at).total_seconds() + freshness: FreshnessResponse = { + "max_loaded_at": max_loaded_at, + "snapshotted_at": snapshotted_at, + "age": age, + } + + return freshness + def pre_model_hook(self, config: Mapping[str, Any]) -> Any: """A hook for running some operation before the model materialization runs. The hook can assume it has a connection available. From c44779a5bf385c02184fa3b851a79ce446611300 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 17:53:48 -0400 Subject: [PATCH 09/19] make schema_identifier_to_source type-safe --- dbt/adapters/base/impl.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index a102b679..09bf9d70 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1294,7 +1294,11 @@ def calculate_freshness_from_metadata_batch( ) -> Tuple[Optional[AdapterResponse], Dict[BaseRelation, FreshnessResponse]]: # Track schema, identifiers of sources for lookup from batch query schema_identifier_to_source = { - (source.schema.lower(), source.identifier.lower()): source for source in sources + ( + source.path.get_lowered_part(ComponentName.Schema), + source.path.get_lowered_part(ComponentName.Identifier), + ): source + for source in sources } # Group metadata sources by information schema -- one query per information schema will be necessary @@ -1330,7 +1334,9 @@ def calculate_freshness_from_metadata_batch( except Exception: raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) - freshness_response = self._create_freshness_response(last_modified_val, snapshotted_at_val) + freshness_response = self._create_freshness_response( + last_modified_val, snapshotted_at_val + ) source_relation_for_result = schema_identifier_to_source[ (schema.lower(), identifier.lower()) ] @@ -1349,7 +1355,9 @@ def calculate_freshness_from_metadata( ) return adapter_response, list(freshness_responses.values())[0] - def _create_freshness_response(self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]) -> FreshnessResponse: + def _create_freshness_response( + self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime] + ) -> FreshnessResponse: if last_modified is None: # Interpret missing value as "infinitely long ago" max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) From 62be8dfc58ceebda48434340016d763b7f6a497c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 18:02:09 -0400 Subject: [PATCH 10/19] update TableLastModifiedMetadataBatch description --- dbt/adapters/capability.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/capability.py b/dbt/adapters/capability.py index ac427513..305604c7 100644 --- a/dbt/adapters/capability.py +++ b/dbt/adapters/capability.py @@ -14,7 +14,7 @@ class Capability(str, Enum): """Indicates support for determining the time of the last table modification by querying database metadata.""" TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch" - """Indicates support for determining the time of the last table modification by querying database metadata in batch.""" + """Indicates support for performantly determining the time of the last table modification by querying database metadata in batch.""" class Support(str, Enum): From 9a6c8292aa280c7db3bfaea9295c4c047ec91d29 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 25 Mar 2024 18:06:17 -0400 Subject: [PATCH 11/19] changelog entry --- .changes/unreleased/Features-20240325-180611.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240325-180611.yaml diff --git a/.changes/unreleased/Features-20240325-180611.yaml b/.changes/unreleased/Features-20240325-180611.yaml new file mode 100644 index 00000000..2299d2ee --- /dev/null +++ b/.changes/unreleased/Features-20240325-180611.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch +time: 2024-03-25T18:06:11.816163-04:00 +custom: + Author: michelleark + Issue: "138" From 17701ad7c612abec02afcb3477491bb325ccd991 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 28 Mar 2024 10:30:59 -0700 Subject: [PATCH 12/19] refactor to _get_catalog_relations_by_info_schema, _parse_freshness_row --- dbt/adapters/base/impl.py | 51 ++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 09bf9d70..177cf663 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1291,7 +1291,7 @@ def calculate_freshness_from_metadata_batch( self, sources: List[BaseRelation], macro_resolver: Optional[MacroResolverProtocol] = None, - ) -> Tuple[Optional[AdapterResponse], Dict[BaseRelation, FreshnessResponse]]: + ) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]: # Track schema, identifiers of sources for lookup from batch query schema_identifier_to_source = { ( @@ -1302,19 +1302,14 @@ def calculate_freshness_from_metadata_batch( } # Group metadata sources by information schema -- one query per information schema will be necessary - information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {} - for source in sources: - information_schema = source.information_schema_only() - if information_schema not in information_schema_to_metadata_sources: - information_schema_to_metadata_sources[information_schema] = [source] - else: - information_schema_to_metadata_sources[information_schema].append(source) + sources_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = self._get_catalog_relations_by_info_schema(sources) freshness_responses: Dict[BaseRelation, FreshnessResponse] = {} + adapter_responses: List[Optional[AdapterResponse]] = [] for ( information_schema, sources_for_information_schema, - ) in information_schema_to_metadata_sources.items(): + ) in sources_by_info_schema.items(): result = self.execute_macro( GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs={ @@ -1324,35 +1319,25 @@ def calculate_freshness_from_metadata_batch( macro_resolver=macro_resolver, ) adapter_response, table = result.response, result.table # type: ignore[attr-defined] + adapter_responses.append(adapter_response) for row in table: - try: - last_modified_val = get_column_value_uncased("last_modified", row) - snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) - identifier = get_column_value_uncased("identifier", row) - schema = get_column_value_uncased("schema", row) - except Exception: - raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) - - freshness_response = self._create_freshness_response( - last_modified_val, snapshotted_at_val - ) - source_relation_for_result = schema_identifier_to_source[ - (schema.lower(), identifier.lower()) - ] + raw_relation, freshness_response = self._parse_freshness_row(row, table) + source_relation_for_result = schema_identifier_to_source[raw_relation] freshness_responses[source_relation_for_result] = freshness_response - return adapter_response, freshness_responses + return adapter_responses, freshness_responses def calculate_freshness_from_metadata( self, source: BaseRelation, macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: - adapter_response, freshness_responses = self.calculate_freshness_from_metadata_batch( + adapter_responses, freshness_responses = self.calculate_freshness_from_metadata_batch( sources=[source], macro_resolver=macro_resolver, ) + adapter_response = adapter_responses[0] if adapter_responses else None return adapter_response, list(freshness_responses.values())[0] def _create_freshness_response( @@ -1374,6 +1359,22 @@ def _create_freshness_response( return freshness + def _parse_freshness_row(self, row: agate.Row, table: agate.Table) -> Tuple[Any, FreshnessResponse]: + try: + last_modified_val = get_column_value_uncased("last_modified", row) + snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) + identifier = get_column_value_uncased("identifier", row) + schema = get_column_value_uncased("schema", row) + except Exception: + raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) + + freshness_response = self._create_freshness_response( + last_modified_val, + snapshotted_at_val + ) + raw_relation = schema.lower(), identifier.lower() + return raw_relation, freshness_response + def pre_model_hook(self, config: Mapping[str, Any]) -> Any: """A hook for running some operation before the model materialization runs. The hook can assume it has a connection available. From 511cf14d2fcc42d2628e21b4797407f5cb8c9f58 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 2 Apr 2024 15:58:29 -0700 Subject: [PATCH 13/19] sanitize raw_relation for freshness batch calculation --- dbt/adapters/base/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 177cf663..11c34167 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1372,7 +1372,7 @@ def _parse_freshness_row(self, row: agate.Row, table: agate.Table) -> Tuple[Any, last_modified_val, snapshotted_at_val ) - raw_relation = schema.lower(), identifier.lower() + raw_relation = schema.lower().strip(), identifier.lower().strip() return raw_relation, freshness_response def pre_model_hook(self, config: Mapping[str, Any]) -> Any: From 01f27a57fd43710663e29a0b068ea56691fe8eeb Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 3 Apr 2024 17:02:50 -0700 Subject: [PATCH 14/19] ensure a connection is open if possible prior to executing macro --- dbt/adapters/base/impl.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 11c34167..1dd2d7c2 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1100,6 +1100,12 @@ def execute_macro( macro_function = CallableMacroGenerator(macro, macro_context) + # A connection may or may not be required for a given macro execution + connection = self.connections.get_if_exists() + # If a connection exists, ensure it is open prior to executing a macro + if connection: + self.connections.open(connection) + with self.connections.exception_handler(f"macro {macro_name}"): result = macro_function(**kwargs) return result From 154e0667f2f8fdbbdf2c8788551d681f3b7b2270 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 3 Apr 2024 17:21:05 -0700 Subject: [PATCH 15/19] fix agate typing --- dbt/adapters/base/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 2635197e..c0162531 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1375,7 +1375,7 @@ def _create_freshness_response( return freshness - def _parse_freshness_row(self, row: agate.Row, table: agate.Table) -> Tuple[Any, FreshnessResponse]: + def _parse_freshness_row(self, row: "agate.Row", table: "agate.Table") -> Tuple[Any, FreshnessResponse]: try: last_modified_val = get_column_value_uncased("last_modified", row) snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) From 8c707ccf1e84dfe215ab44913fefd0b9406b3896 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 3 Apr 2024 17:28:21 -0700 Subject: [PATCH 16/19] lazy load agate_helper --- dbt/adapters/base/impl.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index c0162531..591d75a1 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1376,6 +1376,8 @@ def _create_freshness_response( return freshness def _parse_freshness_row(self, row: "agate.Row", table: "agate.Table") -> Tuple[Any, FreshnessResponse]: + from dbt_common.clients.agate_helper import get_column_value_uncased + try: last_modified_val = get_column_value_uncased("last_modified", row) snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) From 8a1deace2bbe67d286e2cb44f0d155734609a2e8 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 4 Apr 2024 10:36:09 -0700 Subject: [PATCH 17/19] add needs_conn to BaseAdapter.execute_macro --- dbt/adapters/base/impl.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 591d75a1..a5157ee4 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1062,6 +1062,7 @@ def execute_macro( project: Optional[str] = None, context_override: Optional[Dict[str, Any]] = None, kwargs: Optional[Dict[str, Any]] = None, + needs_conn: bool = False, ) -> AttrDict: """Look macro_name up in the manifest and execute its results. @@ -1074,6 +1075,10 @@ def execute_macro( execution context. :param kwargs: An optional dict of keyword args used to pass to the macro. + : param needs_conn: A boolean that indicates whether the specified macro + requires an open connection to execute. If needs_conn is True, a + connection is expected and opened if necessary. Otherwise (and by default), + no connection is expected prior to executing the macro. """ if kwargs is None: @@ -1106,10 +1111,8 @@ def execute_macro( macro_function = CallableMacroGenerator(macro, macro_context) - # A connection may or may not be required for a given macro execution - connection = self.connections.get_if_exists() - # If a connection exists, ensure it is open prior to executing a macro - if connection: + if needs_conn: + connection = self.connections.get_thread_connection() self.connections.open(connection) with self.connections.exception_handler(f"macro {macro_name}"): @@ -1333,6 +1336,7 @@ def calculate_freshness_from_metadata_batch( "relations": sources_for_information_schema, }, macro_resolver=macro_resolver, + needs_conn=True, ) adapter_response, table = result.response, result.table # type: ignore[attr-defined] adapter_responses.append(adapter_response) From f3dcac26a9ed76d3dfafbf31818c57b1a3e3f419 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 4 Apr 2024 11:23:11 -0700 Subject: [PATCH 18/19] docstring --- dbt/adapters/base/impl.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index a5157ee4..f24e673d 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1311,6 +1311,20 @@ def calculate_freshness_from_metadata_batch( sources: List[BaseRelation], macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]: + """ + Given a list of sources (BaseRelations), calculate the metadata-based freshness in batch. + This method should _not_ execute a warehouse query per source, but rather batch up + the sources into as few requests as possible to minimize the number of roundtrips required + to compute metadata-based freshness for each input source. + + :param sources: The list of sources to calculate metadata-based freshness for + :param macro_resolver: An optional macro_resolver to use for get_relation_last_modified + :return: a tuple where: + * the first element is a list of optional AdapterResponses indicating the response + for each request the method made to compute the freshness for the provided sources. + * the second element is a dictionary mapping an input source BaseRelation to a FreshnessResponse, + if it was possible to calculate a FreshnessResponse for the source. + """ # Track schema, identifiers of sources for lookup from batch query schema_identifier_to_source = { ( From 0bbf7ed9fb074f0eed37d8ce1882778cd11853da Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 11 Apr 2024 16:14:39 -0700 Subject: [PATCH 19/19] cleanup adapter_responses parsing in calculate_freshness_from_metadata_batch --- dbt/adapters/base/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index f24e673d..3abe5e09 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1372,7 +1372,7 @@ def calculate_freshness_from_metadata( macro_resolver=macro_resolver, ) adapter_response = adapter_responses[0] if adapter_responses else None - return adapter_response, list(freshness_responses.values())[0] + return adapter_response, freshness_responses[source] def _create_freshness_response( self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]