Skip to content

Commit

Permalink
feat: 增加 insert_overwrite strategy 的支持,支持创建 auto-partition 表
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech committed Dec 10, 2024
1 parent 1ab110c commit 6c9b961
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 58 deletions.
7 changes: 6 additions & 1 deletion dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dbt.adapters.maxcompute.relation import MaxComputeRelation
from dbt.adapters.events.logging import AdapterLogger

from dbt.adapters.maxcompute.relation_configs._partition import PartitionConfig
from dbt.adapters.maxcompute.utils import is_schema_not_found

logger = AdapterLogger("MaxCompute")
Expand Down Expand Up @@ -421,7 +422,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "insert_overwrite"]

@available.parse_none
def load_dataframe(
Expand Down Expand Up @@ -515,3 +516,7 @@ def run_security_sql(

logger.debug(f"Normalized dict: {normalized_dict}")
return normalized_dict

@available
def parse_partition_by(self, raw_partition_by: Any) -> Optional[PartitionConfig]:
return PartitionConfig.parse(raw_partition_by)
10 changes: 10 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dbt.adapters.maxcompute.relation_configs._base import MaxComputeBaseRelationConfig

from dbt.adapters.maxcompute.relation_configs._partition import (
PartitionConfig,
)

from dbt.adapters.maxcompute.relation_configs._policies import (
MaxComputeQuotePolicy,
MaxComputeIncludePolicy,
)
68 changes: 68 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass
from typing import Optional, Dict, TYPE_CHECKING

from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import RelationConfigBase
from odps.models.table import Table as MaxComputeTable
from typing_extensions import Self

from dbt.adapters.maxcompute.relation_configs._policies import (
MaxComputeIncludePolicy,
MaxComputeQuotePolicy,
)
from dbt.adapters.contracts.relation import ComponentName, RelationConfig

if TYPE_CHECKING:
# Indirectly imported via agate_helper, which is lazy loaded further downfile.
# Used by mypy for earlier type hints.
import agate


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class MaxComputeBaseRelationConfig(RelationConfigBase):
@classmethod
def include_policy(cls) -> Policy:
return MaxComputeIncludePolicy()

@classmethod
def quote_policy(cls) -> Policy:
return MaxComputeQuotePolicy()

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
relation_config_dict = cls.parse_relation_config(relation_config)
relation = cls.from_dict(relation_config_dict)
return relation

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict:
raise NotImplementedError(
"`parse_model_node()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def from_mc_table(cls, table: MaxComputeTable) -> Self:
relation_config = cls.parse_mc_table(table)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_mc_table(cls, table: MaxComputeTable) -> Dict:
raise NotImplementedError("`parse_mc_table()` is not implemented for this relation type")

@classmethod
def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
if cls.include_policy().get_part(component) and value:
if cls.quote_policy().get_part(component):
return f'"{value}"'
return value.lower()
return None

@classmethod
def _get_first_row(cls, results: "agate.Table") -> "agate.Row":
try:
return results.rows[0]
except IndexError:
import agate

return agate.Row(values=set())
68 changes: 68 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

import dbt_common.exceptions
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.dataclass_schema import dbtClassMixin, ValidationError
from odps.models.table import Table as MaxComputeTable


@dataclass
class PartitionConfig(dbtClassMixin):
field: str
data_type: str = "string"
granularity: str = "day"
copy_partitions: bool = False

@classmethod
def auto_partition(self) -> bool:
return self.data_type in ["timestamp", "date", "datetime", "timestamp_ntz"]

def render(self, alias: Optional[str] = None):
column: str = self.field
if alias:
column = f"{alias}.{column}"
return column

def render_wrapped(self, alias: Optional[str] = None):
"""Wrap the partitioning column when time involved to ensure it is properly cast to matching time."""
# if data type is going to be truncated, no need to wrap
return self.render(alias)

@classmethod
def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]:
if raw_partition_by is None:
return None
try:
cls.validate(raw_partition_by)
return cls.from_dict(
{
key: (value.lower() if isinstance(value, str) else value)
for key, value in raw_partition_by.items()
}
)
except ValidationError as exc:
raise dbt_common.exceptions.base.DbtValidationError(
"Could not parse partition config"
) from exc
except TypeError:
raise dbt_common.exceptions.CompilationError(
f"Invalid partition_by config:\n"
f" Got: {raw_partition_by}\n"
f' Expected a dictionary with "field" and "data_type" keys'
)

@classmethod
def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]:
"""
Parse model node into a raw config for `PartitionConfig.parse`
"""
config_dict: Dict[str, Any] = relation_config.config.extra.get("partition_by")
return config_dict

@classmethod
def parse_mc_table(cls, table: MaxComputeTable) -> Dict[str, Any]:
"""
Parse the MC Table object into a raw config for `PartitionConfig.parse`
"""
return {}
16 changes: 16 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy


class MaxComputeIncludePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
class MaxComputeQuotePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@

{% materialization incremental, adapter='maxcompute' -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
-- Not support yet
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'merge' %}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
Expand All @@ -17,18 +24,15 @@
{%- else -%}
{%- set unique_key_list = [] -%}
{%- endif -%}
{%- set transaction_table = unique_key_list|length > 0 -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}

{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{%- set default_incremental_strategy = 'append' -%}
{% if transaction_table %}
{%- set default_incremental_strategy = 'merge' -%}
{% if config.get('incremental_strategy')=='append' %}

{% if unique_key_list|length > 0 and config.get('incremental_strategy')=='append' %}
{% do exceptions.raise_compiler_error('append strategy is not supported for incremental models with a unique key when using MaxCompute') %}
{% endif %}
{% endif %}
{% set incremental_strategy = config.get('incremental_strategy') or default_incremental_strategy %}


-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
Expand All @@ -49,9 +53,9 @@
{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = create_table_as_internal(False, target_relation, sql, transaction_table) %}
{% set build_sql = create_table_as_internal(False, target_relation, sql, True, partition_by=partition_by) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as_internal(False, intermediate_relation, sql, transaction_table) %}
{% set build_sql = create_table_as_internal(False, intermediate_relation, sql, True, partition_by=partition_by) %}
{% set need_swap = true %}
{% else %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
Expand All @@ -21,35 +22,33 @@
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}
merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
{% for column_name in update_columns -%}
DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{% endif %}
when matched then update set
{% for column_name in update_columns -%}
DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}

when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});
when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});

{% else %}
INSERT INTO {{ target }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ source }}
{% endif %}
{% endmacro %}


{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% macro maxcompute__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

Expand Down Expand Up @@ -89,14 +88,9 @@
select {{ dest_cols_csv }}
from {{ source }}
)

{%- endmacro %}


{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%}
{{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates, include_sql_header) }}
{%- endmacro %}

{% macro maxcompute__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{#-- The only time include_sql_header is True: --#}
{#-- BigQuery + insert_overwrite strategy + "static" partitions config --#}
Expand All @@ -109,18 +103,16 @@

{{ sql_header if sql_header is not none and include_sql_header }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on FALSE

when not matched by source
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
then delete
{% call statement("main") %}
{% if predicates %}
DELETE FROM {{ target }} where True
AND {{ predicates | join(' AND ') }};
{% else %}
TRUNCATE TABLE {{ target }};
{% endif %}
{% endcall %}

when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});
INSERT INTO {{ target }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ source }}
{% endmacro %}
9 changes: 8 additions & 1 deletion dbt/include/maxcompute/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{%- endmacro %}


{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16) -%}
{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16, partition_by=none) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
{%- set is_delta = is_transactional and primary_keys is not none and primary_keys|length > 0 -%}
Expand All @@ -28,6 +28,13 @@
{%- endfor -%})
{%- endif -%}
)
{% if partition_by -%}
{%- if partition_by.auto_partition -%}
auto partitioned by (trunc_time(`{{ partition_by.field }}`, "{{ partition_by.granularity }}"))
{%- else -%}
partitioned by (`{{ partition_by.field }}`)
{%- endif -%}
{%- endif -%}
{%- if is_transactional -%}
{%- if is_delta -%}
tblproperties("transactional"="true", "write.bucket.num"="{{ delta_table_bucket_num }}")
Expand Down
15 changes: 15 additions & 0 deletions tests/functional/adapter/incremental/test_insert_overwrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates


class TestIncrementalPredicatesInsertOverwrite(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_predicates": ["id != 2"],
"+incremental_strategy": "insert_overwrite",
}
}

pass
Loading

0 comments on commit 6c9b961

Please sign in to comment.