Skip to content

Commit

Permalink
Feature: 升级dbt版本到 1.9,并支持新的增量物化策略 (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech authored Jan 3, 2025
1 parent 3ff913c commit 60faed5
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 49 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/maxcompute/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0-alpha.13"
version = "1.9.0-alpha"
10 changes: 9 additions & 1 deletion dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,15 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "merge", "delete+insert", "insert_overwrite"]
return [
"append",
"merge",
"delete+insert",
"insert_overwrite",
"microbatch",
"bq_microbatch",
"bq_insert_overwrite",
]

@available.parse_none
def load_dataframe(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{% macro mc_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
The 'bq_insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% if partition_by.fields|length != 1 %}
{% set missing_partition_msg -%}
The 'bq_insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = mc_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{{ return(build_sql) }}

{% endmacro %}

{% macro mc_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{{ mc_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists) }}
{% else %} {# dynamic #}
{{ mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }}
{% endif %}
{% endmacro %}

{% macro mc_static_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% set predicate -%}
{{ partition_by.render(False) }} in ({{ partitions | join (', ') }})
{%- endset %}

{%- set source_sql -%}
(
{% if tmp_relation_exists -%}
select * from {{ tmp_relation }}
{%- else -%}
{{sql}}
{%- endif %}
)
{%- endset -%}

{%- call statement('main') -%}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{%- endcall -%}

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}
{% endmacro %}

{% macro mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) %}
{% set predicate -%}
{{partition_by.render(False)}} in (select distinct {{partition_by.render(False)}} from {{ tmp_relation }})
{%- endset %}

{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}
{% if not tmp_relation_exists %}
{%- call statement('create_tmp_relation') -%}
{{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }}
{%- endcall -%}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
-- 3. run the merge statement
{%- call statement('main') -%}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{%- endcall -%}
-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{% macro mc_validate_microbatch_config(config) %}
{% if config.get("partition_by") is none %}
{% set missing_partition_msg -%}
The 'microbatch' strategy requires a `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% if config.get("partition_by").granularity != config.get('batch_size') %}
{% set invalid_partition_by_granularity_msg -%}
The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`.
Got:
`batch_size`: {{ config.get('batch_size') }}
`partition_by.granularity`: {{ config.get("partition_by").granularity }}
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %}
{% endif %}
{% endmacro %}

{% macro mc_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% set build_sql = mc_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{{ return(build_sql) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
{% materialization incremental, adapter='maxcompute' -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
-- Not support yet
{%- set partitions = config.get('partitions', none) -%}
{%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%}

{%- set cluster_by = config.get('cluster_by', none) -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'merge' %}
{%- set incremental_strategy = config.get('incremental_strategy') or 'merge' -%}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
Expand Down Expand Up @@ -45,50 +46,45 @@
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by) %}
{%- call statement('main') -%}
{{ create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by) }}
{%- endcall -%}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as_internal(False, intermediate_relation, sql, True, partition_config=partition_by) %}
{% set need_swap = true %}
{% do log("Hard refreshing " ~ existing_relation) %}
{{ adapter.drop_relation(existing_relation) }}
{%- call statement('main') -%}
{{ create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by) }}
{%- endcall -%}
{% else %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
{% set contract_config = config.get('contract') %}
{% if not contract_config or not contract_config.enforced %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{% set tmp_relation_exists = false %}
{% if on_schema_change != 'ignore' %}
{#-- Check first, since otherwise we may not build a temp table --#}
{#-- Python always needs to create a temp table --#}
{%- call statement('create_tmp_relation') -%}
{{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }}
{%- endcall -%}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% endif %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}

{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{% set build_sql = mc_generate_incremental_build_sql(
incremental_strategy, temp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates
) %}

{% endif %}

{% call statement("main") %}
{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endcall %}
{% endif %}


{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

Expand All @@ -98,21 +94,36 @@
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}
{{ run_hooks(post_hooks) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{%- if tmp_relation_exists -%}
{{ adapter.drop_relation(tmp_relation) }}
{%- endif -%}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

{% macro mc_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates
) %}
{% if strategy == 'bq_insert_overwrite' %}
{% set build_sql = mc_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% elif strategy == 'bq_microbatch' %}
{% set build_sql = mc_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% else %} {# strategy == 'dbt origin' #}
{%- call statement('create_tmp_relation') -%}
{{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }}
{%- endcall -%}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{% endif %}
{{ return(build_sql) }}
{% endmacro %}

{% macro get_quoted_list(column_names) %}
{% set quoted = [] %}
Expand All @@ -121,3 +132,12 @@
{%- endfor %}
{{ return(quoted) }}
{% endmacro %}

{% macro maxcompute__get_incremental_microbatch_sql(arg_dict) %}

{% if arg_dict["unique_key"] %}
{% do return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) %}
{% else %}
{{ exceptions.raise_compiler_error("dbt-maxcompute 'microbatch' requires a `unique_key` config") }}
{% endif %}
{% endmacro %}
6 changes: 6 additions & 0 deletions dbt/include/maxcompute/macros/materializations/raw.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% materialization raw, adapter='maxcompute' -%}
{% call statement("main") %}
{{ sql }}
{% endcall %}
{{ return({'relations': []}) }}
{%- endmaterialization %}
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@1.9.latest#egg=dbt-core&subdirectory=core

# dev
ddtrace==2.3.0
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def _dbt_maxcompute_version() -> str:
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
include_package_data=True,
install_requires=[
"dbt-common>=1.0.4,<2.0",
"dbt-adapters>=1.1.1,<2.0",
"dbt-common>=1.10,<2.0",
"dbt-adapters>=1.7,<2.0",
"pyodps>=0.12.0", # latest
"pandas>=0.17.0",
# add dbt-core to ensure backwards compatibility of installation, this is not a functional dependency
Expand Down
23 changes: 22 additions & 1 deletion tests/functional/adapter/incremental/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,36 @@
BaseIncrementalOnSchemaChange,
)
from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch
from dbt.tests.util import run_dbt


class TestMergeExcludeColumnsMaxCompute(BaseMergeExcludeColumns):
pass


_input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP'2024-12-30 00:00:00' as event_time
union all
select 2 as id, TIMESTAMP'2024-12-31 00:00:00' as event_time
union all
select 3 as id, TIMESTAMP'2025-01-01 00:00:00' as event_time
"""


@pytest.mark.skip(reason="MaxCompute Api not support freeze time.")
class TestMicrobatchMaxCompute(BaseMicrobatch):
pass

@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
"""
This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}.
event_time is a required configuration of this input
"""
return _input_model_sql

def test_run_with_event_time(self, project, insert_two_rows_sql):
run_dbt(["run"])


class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange):
Expand Down
Loading

0 comments on commit 60faed5

Please sign in to comment.