diff --git a/dbt/adapters/maxcompute/impl.py b/dbt/adapters/maxcompute/impl.py index e8d823f..46588e2 100644 --- a/dbt/adapters/maxcompute/impl.py +++ b/dbt/adapters/maxcompute/impl.py @@ -31,6 +31,7 @@ from dbt.adapters.maxcompute.relation import MaxComputeRelation from dbt.adapters.events.logging import AdapterLogger +from dbt.adapters.maxcompute.relation_configs._partition import PartitionConfig from dbt.adapters.maxcompute.utils import is_schema_not_found logger = AdapterLogger("MaxCompute") @@ -421,7 +422,7 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "merge", "delete+insert"] + return ["append", "merge", "delete+insert", "insert_overwrite"] @available.parse_none def load_dataframe( @@ -515,3 +516,7 @@ def run_security_sql( logger.debug(f"Normalized dict: {normalized_dict}") return normalized_dict + + @available + def parse_partition_by(self, raw_partition_by: Any) -> Optional[PartitionConfig]: + return PartitionConfig.parse(raw_partition_by) diff --git a/dbt/adapters/maxcompute/relation_configs/__init__.py b/dbt/adapters/maxcompute/relation_configs/__init__.py new file mode 100644 index 0000000..ce56b01 --- /dev/null +++ b/dbt/adapters/maxcompute/relation_configs/__init__.py @@ -0,0 +1,10 @@ +from dbt.adapters.maxcompute.relation_configs._base import MaxComputeBaseRelationConfig + +from dbt.adapters.maxcompute.relation_configs._partition import ( + PartitionConfig, +) + +from dbt.adapters.maxcompute.relation_configs._policies import ( + MaxComputeQuotePolicy, + MaxComputeIncludePolicy, +) diff --git a/dbt/adapters/maxcompute/relation_configs/_base.py b/dbt/adapters/maxcompute/relation_configs/_base.py new file mode 100644 index 0000000..2a77508 --- /dev/null +++ b/dbt/adapters/maxcompute/relation_configs/_base.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass +from typing import Optional, Dict, TYPE_CHECKING + +from dbt.adapters.base.relation import Policy +from dbt.adapters.relation_configs import RelationConfigBase +from odps.models.table import Table as MaxComputeTable +from typing_extensions import Self + +from dbt.adapters.maxcompute.relation_configs._policies import ( + MaxComputeIncludePolicy, + MaxComputeQuotePolicy, +) +from dbt.adapters.contracts.relation import ComponentName, RelationConfig + +if TYPE_CHECKING: + # Indirectly imported via agate_helper, which is lazy loaded further downfile. + # Used by mypy for earlier type hints. + import agate + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class MaxComputeBaseRelationConfig(RelationConfigBase): + @classmethod + def include_policy(cls) -> Policy: + return MaxComputeIncludePolicy() + + @classmethod + def quote_policy(cls) -> Policy: + return MaxComputeQuotePolicy() + + @classmethod + def from_relation_config(cls, relation_config: RelationConfig) -> Self: + relation_config_dict = cls.parse_relation_config(relation_config) + relation = cls.from_dict(relation_config_dict) + return relation + + @classmethod + def parse_relation_config(cls, relation_config: RelationConfig) -> Dict: + raise NotImplementedError( + "`parse_model_node()` needs to be implemented on this RelationConfigBase instance" + ) + + @classmethod + def from_mc_table(cls, table: MaxComputeTable) -> Self: + relation_config = cls.parse_mc_table(table) + relation = cls.from_dict(relation_config) + return relation + + @classmethod + def parse_mc_table(cls, table: MaxComputeTable) -> Dict: + raise NotImplementedError("`parse_mc_table()` is not implemented for this relation type") + + @classmethod + def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]: + if cls.include_policy().get_part(component) and value: + if cls.quote_policy().get_part(component): + return f'"{value}"' + return value.lower() + return None + + @classmethod + def _get_first_row(cls, results: "agate.Table") -> "agate.Row": + try: + return results.rows[0] + except IndexError: + import agate + + return agate.Row(values=set()) diff --git a/dbt/adapters/maxcompute/relation_configs/_partition.py b/dbt/adapters/maxcompute/relation_configs/_partition.py new file mode 100644 index 0000000..87e5881 --- /dev/null +++ b/dbt/adapters/maxcompute/relation_configs/_partition.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional + +import dbt_common.exceptions +from dbt.adapters.contracts.relation import RelationConfig +from dbt_common.dataclass_schema import dbtClassMixin, ValidationError +from odps.models.table import Table as MaxComputeTable + + +@dataclass +class PartitionConfig(dbtClassMixin): + field: str + data_type: str = "string" + granularity: str = "day" + copy_partitions: bool = False + + @classmethod + def auto_partition(self) -> bool: + return self.data_type in ["timestamp", "date", "datetime", "timestamp_ntz"] + + def render(self, alias: Optional[str] = None): + column: str = self.field + if alias: + column = f"{alias}.{column}" + return column + + def render_wrapped(self, alias: Optional[str] = None): + """Wrap the partitioning column when time involved to ensure it is properly cast to matching time.""" + # if data type is going to be truncated, no need to wrap + return self.render(alias) + + @classmethod + def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]: + if raw_partition_by is None: + return None + try: + cls.validate(raw_partition_by) + return cls.from_dict( + { + key: (value.lower() if isinstance(value, str) else value) + for key, value in raw_partition_by.items() + } + ) + except ValidationError as exc: + raise dbt_common.exceptions.base.DbtValidationError( + "Could not parse partition config" + ) from exc + except TypeError: + raise dbt_common.exceptions.CompilationError( + f"Invalid partition_by config:\n" + f" Got: {raw_partition_by}\n" + f' Expected a dictionary with "field" and "data_type" keys' + ) + + @classmethod + def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]: + """ + Parse model node into a raw config for `PartitionConfig.parse` + """ + config_dict: Dict[str, Any] = relation_config.config.extra.get("partition_by") + return config_dict + + @classmethod + def parse_mc_table(cls, table: MaxComputeTable) -> Dict[str, Any]: + """ + Parse the MC Table object into a raw config for `PartitionConfig.parse` + """ + return {} diff --git a/dbt/adapters/maxcompute/relation_configs/_policies.py b/dbt/adapters/maxcompute/relation_configs/_policies.py new file mode 100644 index 0000000..4a934d7 --- /dev/null +++ b/dbt/adapters/maxcompute/relation_configs/_policies.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass + +from dbt.adapters.base.relation import Policy + + +class MaxComputeIncludePolicy(Policy): + database: bool = True + schema: bool = True + identifier: bool = True + + +@dataclass +class MaxComputeQuotePolicy(Policy): + database: bool = True + schema: bool = True + identifier: bool = True diff --git a/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql b/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql index 40ee6e1..054c926 100644 --- a/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql +++ b/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql @@ -1,5 +1,12 @@ {% materialization incremental, adapter='maxcompute' -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} + -- Not support yet + {%- set partitions = config.get('partitions', none) -%} + {%- set cluster_by = config.get('cluster_by', none) -%} + {% set incremental_strategy = config.get('incremental_strategy') or 'merge' %} + -- relations {%- set existing_relation = load_cached_relation(this) -%} {%- set target_relation = this.incorporate(type='table') -%} @@ -17,18 +24,15 @@ {%- else -%} {%- set unique_key_list = [] -%} {%- endif -%} - {%- set transaction_table = unique_key_list|length > 0 -%} - {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} + + {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} - {%- set default_incremental_strategy = 'append' -%} - {% if transaction_table %} - {%- set default_incremental_strategy = 'merge' -%} - {% if config.get('incremental_strategy')=='append' %} + + {% if unique_key_list|length > 0 and config.get('incremental_strategy')=='append' %} {% do exceptions.raise_compiler_error('append strategy is not supported for incremental models with a unique key when using MaxCompute') %} - {% endif %} {% endif %} - {% set incremental_strategy = config.get('incremental_strategy') or default_incremental_strategy %} + -- the temp_ and backup_ relations should not already exist in the database; get_relation -- will return None in that case. Otherwise, we get a relation that we can drop @@ -49,9 +53,9 @@ {% set to_drop = [] %} {% if existing_relation is none %} - {% set build_sql = create_table_as_internal(False, target_relation, sql, transaction_table) %} + {% set build_sql = create_table_as_internal(False, target_relation, sql, True, partition_by=partition_by) %} {% elif full_refresh_mode %} - {% set build_sql = create_table_as_internal(False, intermediate_relation, sql, transaction_table) %} + {% set build_sql = create_table_as_internal(False, intermediate_relation, sql, True, partition_by=partition_by) %} {% set need_swap = true %} {% else %} {% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %} diff --git a/dbt/include/maxcompute/macros/materializations/incremental/merge.sql b/dbt/include/maxcompute/macros/materializations/incremental/merge.sql index 64133d6..f5f314c 100644 --- a/dbt/include/maxcompute/macros/materializations/incremental/merge.sql +++ b/dbt/include/maxcompute/macros/materializations/incremental/merge.sql @@ -7,6 +7,7 @@ {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} {%- set sql_header = config.get('sql_header', none) -%} + {{ sql_header if sql_header is not none }} {% if unique_key %} {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} {% for key in unique_key %} @@ -21,35 +22,33 @@ {% endset %} {% do predicates.append(unique_key_match) %} {% endif %} - {% else %} - {% do predicates.append('FALSE') %} - {% endif %} - - {{ sql_header if sql_header is not none }} - merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE - on {{"(" ~ predicates | join(") and (") ~ ")"}} + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{"(" ~ predicates | join(") and (") ~ ")"}} - {% if unique_key %} - when matched then update set - {% for column_name in update_columns -%} - DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} - {%- if not loop.last %}, {%- endif %} - {%- endfor %} - {% endif %} + when matched then update set + {% for column_name in update_columns -%} + DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} - when not matched then insert - ({{ dest_cols_csv }}) - values ( - {% for column in dest_cols_names %} - DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} - {% endfor %}); + when not matched then insert + ({{ dest_cols_csv }}) + values ( + {% for column in dest_cols_names %} + DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} + {% endfor %}); + {% else %} + INSERT INTO {{ target }} ({{ dest_cols_csv }}) + SELECT {{ dest_cols_csv }} + FROM {{ source }} + {% endif %} {% endmacro %} -{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} +{% macro maxcompute__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} @@ -89,14 +88,9 @@ select {{ dest_cols_csv }} from {{ source }} ) - {%- endmacro %} -{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%} - {{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates, include_sql_header) }} -{%- endmacro %} - {% macro maxcompute__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} {#-- The only time include_sql_header is True: --#} {#-- BigQuery + insert_overwrite strategy + "static" partitions config --#} @@ -109,18 +103,16 @@ {{ sql_header if sql_header is not none and include_sql_header }} - merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE - on FALSE - - when not matched by source - {% if predicates %} and {{ predicates | join(' and ') }} {% endif %} - then delete + {% call statement("main") %} + {% if predicates %} + DELETE FROM {{ target }} where True + AND {{ predicates | join(' AND ') }}; + {% else %} + TRUNCATE TABLE {{ target }}; + {% endif %} + {% endcall %} - when not matched then insert - ({{ dest_cols_csv }}) - values ( - {% for column in dest_cols_names %} - DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} - {% endfor %}); + INSERT INTO {{ target }} ({{ dest_cols_csv }}) + SELECT {{ dest_cols_csv }} + FROM {{ source }} {% endmacro %} diff --git a/dbt/include/maxcompute/macros/relations/table/create.sql b/dbt/include/maxcompute/macros/relations/table/create.sql index 547bf0c..7e8a30e 100644 --- a/dbt/include/maxcompute/macros/relations/table/create.sql +++ b/dbt/include/maxcompute/macros/relations/table/create.sql @@ -6,7 +6,7 @@ {%- endmacro %} -{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16) -%} +{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16, partition_by=none) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none }} {%- set is_delta = is_transactional and primary_keys is not none and primary_keys|length > 0 -%} @@ -28,6 +28,13 @@ {%- endfor -%}) {%- endif -%} ) + {% if partition_by -%} + {%- if partition_by.auto_partition -%} + auto partitioned by (trunc_time(`{{ partition_by.field }}`, "{{ partition_by.granularity }}")) + {%- else -%} + partitioned by (`{{ partition_by.field }}`) + {%- endif -%} + {%- endif -%} {%- if is_transactional -%} {%- if is_delta -%} tblproperties("transactional"="true", "write.bucket.num"="{{ delta_table_bucket_num }}") diff --git a/tests/functional/adapter/test_incremental.py b/tests/functional/adapter/incremental/test_incremental.py similarity index 100% rename from tests/functional/adapter/test_incremental.py rename to tests/functional/adapter/incremental/test_incremental.py diff --git a/tests/functional/adapter/incremental/test_insert_overwrite.py b/tests/functional/adapter/incremental/test_insert_overwrite.py new file mode 100644 index 0000000..19cbc65 --- /dev/null +++ b/tests/functional/adapter/incremental/test_insert_overwrite.py @@ -0,0 +1,15 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates + + +class TestIncrementalPredicatesInsertOverwrite(BaseIncrementalPredicates): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+incremental_predicates": ["id != 2"], + "+incremental_strategy": "insert_overwrite", + } + } + + pass diff --git a/tests/pyodps/timestamp_test.py b/tests/pyodps/timestamp_test.py index 74ad028..78df14a 100644 --- a/tests/pyodps/timestamp_test.py +++ b/tests/pyodps/timestamp_test.py @@ -5,11 +5,11 @@ from odps import ODPS, options import pandas as pd -seeds__data_datediff_csv = """string_text,length_expression,output -abcdef,3,def -fishtown,4,town -december,5,ember -december,0, +seeds__data_datediff_csv = """string_text,length_expression,output,timestamp_ntz +abcdef,3,def,1981-05-20T06:46:51 +fishtown,4,town,1981-05-20T06:46:51 +december,5,ember,1981-05-20T06:46:51 +december,0,,1981-05-20T06:46:51 """ @@ -22,7 +22,7 @@ def test_load_table_from_file(self): print(pd_dataframe) o.write_table( - "data_right", pd_dataframe, create_table=False, create_partition=False, lifecycle=1 + "timestamp_ntz_test", pd_dataframe, create_table=False, create_partition=False, lifecycle=1 ) # AttributeError: 'pyarrow.lib.DataType' object has no attribute 'tz'