Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 增加 insert_overwrite strategy 的支持,支持创建 auto-partition 表 #4

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dbt.adapters.maxcompute.relation import MaxComputeRelation
from dbt.adapters.events.logging import AdapterLogger

from dbt.adapters.maxcompute.relation_configs._partition import PartitionConfig
from dbt.adapters.maxcompute.utils import is_schema_not_found

logger = AdapterLogger("MaxCompute")
Expand Down Expand Up @@ -421,7 +422,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "insert_overwrite"]

@available.parse_none
def load_dataframe(
Expand Down Expand Up @@ -515,3 +516,7 @@ def run_security_sql(

logger.debug(f"Normalized dict: {normalized_dict}")
return normalized_dict

@available
def parse_partition_by(self, raw_partition_by: Any) -> Optional[PartitionConfig]:
return PartitionConfig.parse(raw_partition_by)
10 changes: 10 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dbt.adapters.maxcompute.relation_configs._base import MaxComputeBaseRelationConfig

from dbt.adapters.maxcompute.relation_configs._partition import (
PartitionConfig,
)

from dbt.adapters.maxcompute.relation_configs._policies import (
MaxComputeQuotePolicy,
MaxComputeIncludePolicy,
)
68 changes: 68 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass
from typing import Optional, Dict, TYPE_CHECKING

from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import RelationConfigBase
from odps.models.table import Table as MaxComputeTable
from typing_extensions import Self

from dbt.adapters.maxcompute.relation_configs._policies import (
MaxComputeIncludePolicy,
MaxComputeQuotePolicy,
)
from dbt.adapters.contracts.relation import ComponentName, RelationConfig

if TYPE_CHECKING:
# Indirectly imported via agate_helper, which is lazy loaded further downfile.
# Used by mypy for earlier type hints.
import agate


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class MaxComputeBaseRelationConfig(RelationConfigBase):
@classmethod
def include_policy(cls) -> Policy:
return MaxComputeIncludePolicy()

@classmethod
def quote_policy(cls) -> Policy:
return MaxComputeQuotePolicy()

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
relation_config_dict = cls.parse_relation_config(relation_config)
relation = cls.from_dict(relation_config_dict)
return relation

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict:
raise NotImplementedError(
"`parse_model_node()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def from_mc_table(cls, table: MaxComputeTable) -> Self:
relation_config = cls.parse_mc_table(table)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_mc_table(cls, table: MaxComputeTable) -> Dict:
raise NotImplementedError("`parse_mc_table()` is not implemented for this relation type")

@classmethod
def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
if cls.include_policy().get_part(component) and value:
if cls.quote_policy().get_part(component):
return f'"{value}"'
return value.lower()
return None

@classmethod
def _get_first_row(cls, results: "agate.Table") -> "agate.Row":
try:
return results.rows[0]
except IndexError:
import agate

return agate.Row(values=set())
68 changes: 68 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

import dbt_common.exceptions
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.dataclass_schema import dbtClassMixin, ValidationError
from odps.models.table import Table as MaxComputeTable


@dataclass
class PartitionConfig(dbtClassMixin):
field: str
data_type: str = "string"
granularity: str = "day"
copy_partitions: bool = False

@classmethod
def auto_partition(self) -> bool:
return self.data_type in ["timestamp", "date", "datetime", "timestamp_ntz"]

def render(self, alias: Optional[str] = None):
column: str = self.field
if alias:
column = f"{alias}.{column}"
return column

def render_wrapped(self, alias: Optional[str] = None):
"""Wrap the partitioning column when time involved to ensure it is properly cast to matching time."""
# if data type is going to be truncated, no need to wrap
return self.render(alias)

@classmethod
def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]:
if raw_partition_by is None:
return None
try:
cls.validate(raw_partition_by)
return cls.from_dict(
{
key: (value.lower() if isinstance(value, str) else value)
for key, value in raw_partition_by.items()
}
)
except ValidationError as exc:
raise dbt_common.exceptions.base.DbtValidationError(
"Could not parse partition config"
) from exc
except TypeError:
raise dbt_common.exceptions.CompilationError(
f"Invalid partition_by config:\n"
f" Got: {raw_partition_by}\n"
f' Expected a dictionary with "field" and "data_type" keys'
)

@classmethod
def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]:
"""
Parse model node into a raw config for `PartitionConfig.parse`
"""
config_dict: Dict[str, Any] = relation_config.config.extra.get("partition_by")
return config_dict

@classmethod
def parse_mc_table(cls, table: MaxComputeTable) -> Dict[str, Any]:
"""
Parse the MC Table object into a raw config for `PartitionConfig.parse`
"""
return {}
16 changes: 16 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy


class MaxComputeIncludePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
class MaxComputeQuotePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@

{% materialization incremental, adapter='maxcompute' -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
-- Not support yet
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'merge' %}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
Expand All @@ -17,18 +24,15 @@
{%- else -%}
{%- set unique_key_list = [] -%}
{%- endif -%}
{%- set transaction_table = unique_key_list|length > 0 -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}

{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{%- set default_incremental_strategy = 'append' -%}
{% if transaction_table %}
{%- set default_incremental_strategy = 'merge' -%}
{% if config.get('incremental_strategy')=='append' %}

{% if unique_key_list|length > 0 and config.get('incremental_strategy')=='append' %}
{% do exceptions.raise_compiler_error('append strategy is not supported for incremental models with a unique key when using MaxCompute') %}
{% endif %}
{% endif %}
{% set incremental_strategy = config.get('incremental_strategy') or default_incremental_strategy %}


-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
Expand All @@ -49,9 +53,9 @@
{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = create_table_as_internal(False, target_relation, sql, transaction_table) %}
{% set build_sql = create_table_as_internal(False, target_relation, sql, True, partition_by=partition_by) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as_internal(False, intermediate_relation, sql, transaction_table) %}
{% set build_sql = create_table_as_internal(False, intermediate_relation, sql, True, partition_by=partition_by) %}
{% set need_swap = true %}
{% else %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
Expand All @@ -21,35 +22,33 @@
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}
merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
{% for column_name in update_columns -%}
DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{% endif %}
when matched then update set
{% for column_name in update_columns -%}
DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}

when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});
when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});

{% else %}
INSERT INTO {{ target }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ source }}
{% endif %}
{% endmacro %}


{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% macro maxcompute__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

Expand Down Expand Up @@ -89,14 +88,9 @@
select {{ dest_cols_csv }}
from {{ source }}
)

{%- endmacro %}


{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%}
{{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates, include_sql_header) }}
{%- endmacro %}

{% macro maxcompute__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{#-- The only time include_sql_header is True: --#}
{#-- BigQuery + insert_overwrite strategy + "static" partitions config --#}
Expand All @@ -109,18 +103,16 @@

{{ sql_header if sql_header is not none and include_sql_header }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on FALSE

when not matched by source
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
then delete
{% call statement("main") %}
{% if predicates %}
DELETE FROM {{ target }} where True
AND {{ predicates | join(' AND ') }};
{% else %}
TRUNCATE TABLE {{ target }};
{% endif %}
{% endcall %}

when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});
INSERT INTO {{ target }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ source }}
{% endmacro %}
9 changes: 8 additions & 1 deletion dbt/include/maxcompute/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{%- endmacro %}


{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16) -%}
{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16, partition_by=none) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
{%- set is_delta = is_transactional and primary_keys is not none and primary_keys|length > 0 -%}
Expand All @@ -28,6 +28,13 @@
{%- endfor -%})
{%- endif -%}
)
{% if partition_by -%}
{%- if partition_by.auto_partition -%}
auto partitioned by (trunc_time(`{{ partition_by.field }}`, "{{ partition_by.granularity }}"))
{%- else -%}
partitioned by (`{{ partition_by.field }}`)
{%- endif -%}
{%- endif -%}
{%- if is_transactional -%}
{%- if is_delta -%}
tblproperties("transactional"="true", "write.bucket.num"="{{ delta_table_bucket_num }}")
Expand Down
15 changes: 15 additions & 0 deletions tests/functional/adapter/incremental/test_insert_overwrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates


class TestIncrementalPredicatesInsertOverwrite(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_predicates": ["id != 2"],
"+incremental_strategy": "insert_overwrite",
}
}

pass
Loading
Loading