From cf2edbeb56c8d664498fa9109e5e561cd8a622e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BC=8E=E6=98=95?= Date: Wed, 15 Jan 2025 17:16:02 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20bq=5Finsert=5Foverw?= =?UTF-8?q?rite=20=E6=A8=A1=E5=BC=8F=E7=9A=84=E4=B8=80=E4=BA=9B=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbt/adapters/maxcompute/impl.py | 12 +++ dbt/adapters/maxcompute/wrapper.py | 21 +++++ .../insert_overwrite.sql | 80 ++++++++----------- .../macros/materializations/raw.sql | 2 +- 4 files changed, 66 insertions(+), 49 deletions(-) diff --git a/dbt/adapters/maxcompute/impl.py b/dbt/adapters/maxcompute/impl.py index 6624a9c..9301f2a 100644 --- a/dbt/adapters/maxcompute/impl.py +++ b/dbt/adapters/maxcompute/impl.py @@ -20,6 +20,7 @@ from dbt.adapters.contracts.relation import RelationType from dbt.adapters.protocol import AdapterConfig from dbt.adapters.sql import SQLAdapter +from dbt.context.providers import RuntimeConfigObject from dbt_common.contracts.constraints import ConstraintType from dbt_common.exceptions import DbtRuntimeError from dbt_common.utils import AttrDict @@ -554,3 +555,14 @@ def mc_render_raw_columns_constraints( rendered_column_constraints.append(" ".join(rendered_column_constraint)) return rendered_column_constraints + + @available + def run_raw_sql(self, sql: str, configs: RuntimeConfigObject) -> None: + hints = {} + default_schema = None + if configs is not None: + default_schema = configs.get("schema") + sql_hints = configs.get("sql_hints") + if sql_hints: + hints.update(sql_hints) + self.get_odps_client().execute_sql(sql=sql, hints=hints, default_schema=default_schema) \ No newline at end of file diff --git a/dbt/adapters/maxcompute/wrapper.py b/dbt/adapters/maxcompute/wrapper.py index 299addc..6635473 100644 --- a/dbt/adapters/maxcompute/wrapper.py +++ b/dbt/adapters/maxcompute/wrapper.py @@ -62,6 +62,27 @@ def remove_comments(input_string): # operation = remove_comments(operation) parameters = param_normalization(parameters) operation = replace_sql_placeholders(operation, parameters) + def parse_settings(sql): + properties = {} + index = 0 + + while True: + end = sql.find(';', index) + if end == -1: + break + s = sql[index:end] + if re.match(r'(?i)^\s*SET\s+.*=.*?\s*$', s): + # handle one setting + i = s.lower().find('set') + pair_string = s[i + 3:] + pair = pair_string.split('=') + properties[pair[0].strip()] = pair[1].strip() + index = end + 1 + else: + # break if there is no settings before + break + + return properties # retry ten times, each time wait for 10 seconds retry_times = 10 diff --git a/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql b/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql index aae7739..269e7ca 100644 --- a/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql +++ b/dbt/include/maxcompute/macros/materializations/incremental/bq_incremental_strategy/insert_overwrite.sql @@ -26,62 +26,46 @@ {% macro mc_insert_overwrite_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists ) %} - {% if partitions is not none and partitions != [] %} {# static #} - {{ mc_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists) }} - {% else %} {# dynamic #} - {{ mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }} - {% endif %} -{% endmacro %} - -{% macro mc_static_insert_overwrite_sql( - tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists -) %} - - {% set predicate -%} - {{ partition_by.render(False) }} in ({{ partitions | join (', ') }}) - {%- endset %} - - {%- set source_sql -%} - ( - {% if tmp_relation_exists -%} - select * from {{ tmp_relation }} - {%- else -%} - {{sql}} - {%- endif %} - ) - {%- endset -%} - - {%- call statement('main') -%} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; - {%- endcall -%} - - {%- if tmp_relation_exists -%} - -- 2. clean up the temp table - drop table if exists {{ tmp_relation }}; - {%- endif -%} -{% endmacro %} - -{% macro mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) %} - {% set predicate -%} - {{partition_by.render(False)}} in (select distinct {{partition_by.render(False)}} from {{ tmp_relation }}) - {%- endset %} - - {%- set source_sql -%} - ( - select * from {{ tmp_relation }} - ) - {%- endset -%} {% if not tmp_relation_exists %} {%- call statement('create_tmp_relation') -%} {{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }} {%- endcall -%} - {% else %} - -- 1. temp table already exists, we used it to check for schema changes {% endif %} -- 3. run the merge statement {%- call statement('main') -%} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% if partitions is not none and partitions != [] %} {# static #} + {{ mc_static_insert_overwrite_merge_sql(target_relation, tmp_relation, partition_by, partitions) }} + {% else %} {# dynamic #} + {{ mc_dynamic_insert_overwrite_sql(target_relation, tmp_relation, partition_by) }} + {% endif %} {%- endcall -%} -- 4. clean up the temp table drop table if exists {{ tmp_relation }} {% endmacro %} + +{% macro mc_static_insert_overwrite_merge_sql(target, source, partition_by, partitions) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {{ sql_header if sql_header is not none and include_sql_header }} + + {%- call statement('drop_static_partition') -%} + DELETE FROM {{ target }} + WHERE {{ partition_by.render(False) }} in ({{ partitions | join(',') }}) + {%- endcall -%} + + INSERT OVERWRITE TABLE {{ target }} PARTITION({{ partition_by.render(False) }}) + ( + SELECT * + FROM {{ source }} + WHERE {{ partition_by.render(False) }} in ({{ partitions | join(',') }}) + ) +{% endmacro %} + +{% macro mc_dynamic_insert_overwrite_sql(target, source, partition_by) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {{ sql_header if sql_header is not none and include_sql_header }} + INSERT OVERWRITE TABLE {{ target }} PARTITION({{ partition_by.render(False) }}) + ( + SELECT * + FROM {{ source }} + ) +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/maxcompute/macros/materializations/raw.sql b/dbt/include/maxcompute/macros/materializations/raw.sql index 301b682..9606889 100644 --- a/dbt/include/maxcompute/macros/materializations/raw.sql +++ b/dbt/include/maxcompute/macros/materializations/raw.sql @@ -1,6 +1,6 @@ {% materialization raw, adapter='maxcompute' -%} + {{ adapter.run_raw_sql(sql, config) }} {% call statement("main") %} - {{ sql }} {% endcall %} {{ return({'relations': []}) }} {%- endmaterialization %}