diff --git a/dbt/adapters/maxcompute/__version__.py b/dbt/adapters/maxcompute/__version__.py index 2789c83..da9c747 100644 --- a/dbt/adapters/maxcompute/__version__.py +++ b/dbt/adapters/maxcompute/__version__.py @@ -1 +1 @@ -version = "1.8.0-alpha.13" +version = "1.9.0-alpha" diff --git a/dbt/adapters/maxcompute/impl.py b/dbt/adapters/maxcompute/impl.py index a699651..6624a9c 100644 --- a/dbt/adapters/maxcompute/impl.py +++ b/dbt/adapters/maxcompute/impl.py @@ -424,7 +424,15 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "merge", "delete+insert", "insert_overwrite"] + return [ + "append", + "merge", + "delete+insert", + "insert_overwrite", + "microbatch", + "bq_microbatch", + "bq_insert_overwrite", + ] @available.parse_none def load_dataframe( diff --git a/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql b/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql new file mode 100644 index 0000000..aae7739 --- /dev/null +++ b/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql @@ -0,0 +1,87 @@ +{% macro mc_generate_incremental_insert_overwrite_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists +) %} + {% if partition_by is none %} + {% set missing_partition_msg -%} + The 'bq_insert_overwrite' strategy requires the `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% if partition_by.fields|length != 1 %} + {% set missing_partition_msg -%} + The 'bq_insert_overwrite' strategy requires the `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% set build_sql = mc_insert_overwrite_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists + ) %} + + {{ return(build_sql) }} + +{% endmacro %} + +{% macro mc_insert_overwrite_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists +) %} + {% if partitions is not none and partitions != [] %} {# static #} + {{ mc_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists) }} + {% else %} {# dynamic #} + {{ mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }} + {% endif %} +{% endmacro %} + +{% macro mc_static_insert_overwrite_sql( + tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists +) %} + + {% set predicate -%} + {{ partition_by.render(False) }} in ({{ partitions | join (', ') }}) + {%- endset %} + + {%- set source_sql -%} + ( + {% if tmp_relation_exists -%} + select * from {{ tmp_relation }} + {%- else -%} + {{sql}} + {%- endif %} + ) + {%- endset -%} + + {%- call statement('main') -%} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {%- endcall -%} + + {%- if tmp_relation_exists -%} + -- 2. clean up the temp table + drop table if exists {{ tmp_relation }}; + {%- endif -%} +{% endmacro %} + +{% macro mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) %} + {% set predicate -%} + {{partition_by.render(False)}} in (select distinct {{partition_by.render(False)}} from {{ tmp_relation }}) + {%- endset %} + + {%- set source_sql -%} + ( + select * from {{ tmp_relation }} + ) + {%- endset -%} + {% if not tmp_relation_exists %} + {%- call statement('create_tmp_relation') -%} + {{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }} + {%- endcall -%} + {% else %} + -- 1. temp table already exists, we used it to check for schema changes + {% endif %} + -- 3. run the merge statement + {%- call statement('main') -%} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {%- endcall -%} + -- 4. clean up the temp table + drop table if exists {{ tmp_relation }} +{% endmacro %} diff --git a/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/microbatch.sql b/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/microbatch.sql new file mode 100644 index 0000000..1ad9f46 --- /dev/null +++ b/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/microbatch.sql @@ -0,0 +1,28 @@ +{% macro mc_validate_microbatch_config(config) %} + {% if config.get("partition_by") is none %} + {% set missing_partition_msg -%} + The 'microbatch' strategy requires a `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% if config.get("partition_by").granularity != config.get('batch_size') %} + {% set invalid_partition_by_granularity_msg -%} + The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`. + Got: + `batch_size`: {{ config.get('batch_size') }} + `partition_by.granularity`: {{ config.get("partition_by").granularity }} + {%- endset %} + {% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %} + {% endif %} +{% endmacro %} + +{% macro mc_generate_microbatch_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists +) %} + {% set build_sql = mc_insert_overwrite_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists + ) %} + + {{ return(build_sql) }} +{% endmacro %} diff --git a/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql b/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql index 0536e43..e3b66fd 100644 --- a/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql +++ b/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql @@ -2,10 +2,11 @@ {% materialization incremental, adapter='maxcompute' -%} {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} - -- Not support yet {%- set partitions = config.get('partitions', none) -%} + {%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%} + {%- set cluster_by = config.get('cluster_by', none) -%} - {% set incremental_strategy = config.get('incremental_strategy') or 'merge' %} + {%- set incremental_strategy = config.get('incremental_strategy') or 'merge' -%} -- relations {%- set existing_relation = load_cached_relation(this) -%} @@ -45,50 +46,45 @@ {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set to_drop = [] %} + {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} - {% set build_sql = create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by) %} + {%- call statement('main') -%} + {{ create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by) }} + {%- endcall -%} {% elif full_refresh_mode %} - {% set build_sql = create_table_as_internal(False, intermediate_relation, sql, True, partition_config=partition_by) %} - {% set need_swap = true %} + {% do log("Hard refreshing " ~ existing_relation) %} + {{ adapter.drop_relation(existing_relation) }} + {%- call statement('main') -%} + {{ create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by) }} + {%- endcall -%} {% else %} - {% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %} - {% set contract_config = config.get('contract') %} - {% if not contract_config or not contract_config.enforced %} - {% do adapter.expand_target_column_types( - from_relation=temp_relation, - to_relation=target_relation) %} + {% set tmp_relation_exists = false %} + {% if on_schema_change != 'ignore' %} + {#-- Check first, since otherwise we may not build a temp table --#} + {#-- Python always needs to create a temp table --#} + {%- call statement('create_tmp_relation') -%} + {{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }} + {%- endcall -%} + {% set tmp_relation_exists = true %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} {% endif %} - {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} - {% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} + {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} - {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} - {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} - {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} - {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} + {% set build_sql = mc_generate_incremental_build_sql( + incremental_strategy, temp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates + ) %} - {% endif %} - - {% call statement("main") %} + {% call statement("main") %} {{ build_sql }} - {% endcall %} - - {% if need_swap %} - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% do adapter.rename_relation(intermediate_relation, target_relation) %} - {% do to_drop.append(backup_relation) %} + {% endcall %} {% endif %} + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} @@ -98,21 +94,36 @@ {% do create_indexes(target_relation) %} {% endif %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {% do adapter.commit() %} - - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} + {{ run_hooks(post_hooks) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} + {%- if tmp_relation_exists -%} + {{ adapter.drop_relation(tmp_relation) }} + {%- endif -%} {{ return({'relations': [target_relation]}) }} - {%- endmaterialization %} +{% macro mc_generate_incremental_build_sql( + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates +) %} + {% if strategy == 'bq_insert_overwrite' %} + {% set build_sql = mc_generate_incremental_insert_overwrite_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists + ) %} + {% elif strategy == 'bq_microbatch' %} + {% set build_sql = mc_generate_microbatch_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists + ) %} + {% else %} {# strategy == 'dbt origin' #} + {%- call statement('create_tmp_relation') -%} + {{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }} + {%- endcall -%} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, strategy) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} + {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} + {% endif %} + {{ return(build_sql) }} +{% endmacro %} {% macro get_quoted_list(column_names) %} {% set quoted = [] %} @@ -121,3 +132,12 @@ {%- endfor %} {{ return(quoted) }} {% endmacro %} + +{% macro maxcompute__get_incremental_microbatch_sql(arg_dict) %} + + {% if arg_dict["unique_key"] %} + {% do return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) %} + {% else %} + {{ exceptions.raise_compiler_error("dbt-maxcompute 'microbatch' requires a `unique_key` config") }} + {% endif %} +{% endmacro %} diff --git a/dbt/include/maxcompute/macros/materializations/raw.sql b/dbt/include/maxcompute/macros/materializations/raw.sql new file mode 100644 index 0000000..301b682 --- /dev/null +++ b/dbt/include/maxcompute/macros/materializations/raw.sql @@ -0,0 +1,6 @@ +{% materialization raw, adapter='maxcompute' -%} + {% call statement("main") %} + {{ sql }} + {% endcall %} + {{ return({'relations': []}) }} +{%- endmaterialization %} diff --git a/dev-requirements.txt b/dev-requirements.txt index f453408..a09eba3 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -2,7 +2,7 @@ git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@1.9.latest#egg=dbt-core&subdirectory=core # dev ddtrace==2.3.0 diff --git a/setup.py b/setup.py index 1a4d652..2816801 100644 --- a/setup.py +++ b/setup.py @@ -50,8 +50,8 @@ def _dbt_maxcompute_version() -> str: packages=find_namespace_packages(include=["dbt", "dbt.*"]), include_package_data=True, install_requires=[ - "dbt-common>=1.0.4,<2.0", - "dbt-adapters>=1.1.1,<2.0", + "dbt-common>=1.10,<2.0", + "dbt-adapters>=1.7,<2.0", "pyodps>=0.12.0", # latest "pandas>=0.17.0", # add dbt-core to ensure backwards compatibility of installation, this is not a functional dependency diff --git a/tests/functional/adapter/incremental/test_incremental.py b/tests/functional/adapter/incremental/test_incremental.py index aa086f2..4d57e8b 100644 --- a/tests/functional/adapter/incremental/test_incremental.py +++ b/tests/functional/adapter/incremental/test_incremental.py @@ -9,15 +9,36 @@ BaseIncrementalOnSchemaChange, ) from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch +from dbt.tests.util import run_dbt class TestMergeExcludeColumnsMaxCompute(BaseMergeExcludeColumns): pass +_input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, TIMESTAMP'2024-12-30 00:00:00' as event_time +union all +select 2 as id, TIMESTAMP'2024-12-31 00:00:00' as event_time +union all +select 3 as id, TIMESTAMP'2025-01-01 00:00:00' as event_time +""" + + @pytest.mark.skip(reason="MaxCompute Api not support freeze time.") class TestMicrobatchMaxCompute(BaseMicrobatch): - pass + + @pytest.fixture(scope="class") + def input_model_sql(self) -> str: + """ + This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}. + event_time is a required configuration of this input + """ + return _input_model_sql + + def test_run_with_event_time(self, project, insert_two_rows_sql): + run_dbt(["run"]) class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange): diff --git a/tests/functional/maxcompute/test_incremental_maxcompute.py b/tests/functional/maxcompute/test_incremental_maxcompute.py new file mode 100644 index 0000000..2cb97ec --- /dev/null +++ b/tests/functional/maxcompute/test_incremental_maxcompute.py @@ -0,0 +1,82 @@ +import pytest +from dbt.tests.util import ( + run_dbt, +) + +seeds_base_csv = """ +id,name,some_date +1,Easton,1981-05-20T06:46:51 +2,Lillian,1978-09-03T18:10:33 +3,Jeremiah,1982-03-11T03:59:51 +4,Nolan,1976-05-06T20:21:35 +5,Hannah,1982-06-23T05:41:26 +6,Eleanor,1991-08-10T23:12:21 +7,Lily,1971-03-29T14:58:02 +8,Jonathan,1988-02-26T02:55:24 +9,Adrian,1994-02-09T13:14:23 +10,Nora,1976-03-01T16:51:39 +""".lstrip() + +models__sql = """ +{{ config( + materialized='incremental', + partition_by={"fields": "some_date", "data_types": "timestamp"}, + partitions=["TIMESTAMP'2024-10-10 00:00:00'", "TIMESTAMP'2025-01-01 00:00:00'"], + incremental_strategy='bq_insert_overwrite' +) }} +select * from {{ source('raw', 'seed') }} +{% if is_incremental() %} + where {{ dbt.datediff("some_date", "TIMESTAMP'2000-01-01 00:00:00'", 'day') }} > 0 +{% endif %} +""".lstrip() + +schema_base_yml = """ +version: 2 +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: seed + identifier: "{{ var('seed_name', 'base') }}" +""" + + +class BaseTestIncrementalBqInsertOverwrite: + + @pytest.fixture(scope="class") + def seeds(self): + return { + "base.csv": seeds_base_csv, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": models__sql, + "schema.yml": schema_base_yml, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "base", + } + + def test_base(self, project): + # seed command + results = run_dbt(["seed"]) + # seed result length + print(results) + # run command + results = run_dbt() + # run result length + print(results) + project.run_sql( + "insert into base values(11,'Nora',TIMESTAMP'2024-12-30 00:00:00')", fetch="all" + ) + results = run_dbt() + print(results) + + +class TestIncrementalBqInsertOverwrite(BaseTestIncrementalBqInsertOverwrite): + pass diff --git a/tests/functional/maxcompute/test_raw.py b/tests/functional/maxcompute/test_raw.py new file mode 100644 index 0000000..1a07ec1 --- /dev/null +++ b/tests/functional/maxcompute/test_raw.py @@ -0,0 +1,26 @@ +import pytest +from dbt.tests.util import ( + run_dbt, +) + +models__sql = """ +{{ config( + materialized='raw' +) }} +create table test(c1 bigint) lifecycle 1; +""".lstrip() + + +class BaseTestRaw: + @pytest.fixture(scope="class") + def models(self): + return {"model.sql": models__sql} + + def test_base(self, project): + results = run_dbt() + # run result length + print(results) + + +class TestRaw(BaseTestRaw): + pass