Skip to content

Commit

Permalink
fix: 修复 bq_insert_overwrite 模式的一些问题
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech committed Jan 15, 2025
1 parent 6e4fcb7 commit cf2edbe
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 49 deletions.
12 changes: 12 additions & 0 deletions dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dbt.adapters.contracts.relation import RelationType
from dbt.adapters.protocol import AdapterConfig
from dbt.adapters.sql import SQLAdapter
from dbt.context.providers import RuntimeConfigObject
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import AttrDict
Expand Down Expand Up @@ -554,3 +555,14 @@ def mc_render_raw_columns_constraints(
rendered_column_constraints.append(" ".join(rendered_column_constraint))

return rendered_column_constraints

@available
def run_raw_sql(self, sql: str, configs: RuntimeConfigObject) -> None:
hints = {}
default_schema = None
if configs is not None:
default_schema = configs.get("schema")
sql_hints = configs.get("sql_hints")
if sql_hints:
hints.update(sql_hints)
self.get_odps_client().execute_sql(sql=sql, hints=hints, default_schema=default_schema)
21 changes: 21 additions & 0 deletions dbt/adapters/maxcompute/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,27 @@ def remove_comments(input_string):
# operation = remove_comments(operation)
parameters = param_normalization(parameters)
operation = replace_sql_placeholders(operation, parameters)
def parse_settings(sql):
properties = {}
index = 0

while True:
end = sql.find(';', index)
if end == -1:
break
s = sql[index:end]
if re.match(r'(?i)^\s*SET\s+.*=.*?\s*$', s):
# handle one setting
i = s.lower().find('set')
pair_string = s[i + 3:]
pair = pair_string.split('=')
properties[pair[0].strip()] = pair[1].strip()
index = end + 1
else:
# break if there is no settings before
break

return properties

# retry ten times, each time wait for 10 seconds
retry_times = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,62 +26,46 @@
{% macro mc_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{{ mc_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists) }}
{% else %} {# dynamic #}
{{ mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }}
{% endif %}
{% endmacro %}

{% macro mc_static_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% set predicate -%}
{{ partition_by.render(False) }} in ({{ partitions | join (', ') }})
{%- endset %}

{%- set source_sql -%}
(
{% if tmp_relation_exists -%}
select * from {{ tmp_relation }}
{%- else -%}
{{sql}}
{%- endif %}
)
{%- endset -%}

{%- call statement('main') -%}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{%- endcall -%}

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}
{% endmacro %}

{% macro mc_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) %}
{% set predicate -%}
{{partition_by.render(False)}} in (select distinct {{partition_by.render(False)}} from {{ tmp_relation }})
{%- endset %}

{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}
{% if not tmp_relation_exists %}
{%- call statement('create_tmp_relation') -%}
{{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by) }}
{%- endcall -%}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
-- 3. run the merge statement
{%- call statement('main') -%}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% if partitions is not none and partitions != [] %} {# static #}
{{ mc_static_insert_overwrite_merge_sql(target_relation, tmp_relation, partition_by, partitions) }}
{% else %} {# dynamic #}
{{ mc_dynamic_insert_overwrite_sql(target_relation, tmp_relation, partition_by) }}
{% endif %}
{%- endcall -%}
-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
{% endmacro %}

{% macro mc_static_insert_overwrite_merge_sql(target, source, partition_by, partitions) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none and include_sql_header }}

{%- call statement('drop_static_partition') -%}
DELETE FROM {{ target }}
WHERE {{ partition_by.render(False) }} in ({{ partitions | join(',') }})
{%- endcall -%}

INSERT OVERWRITE TABLE {{ target }} PARTITION({{ partition_by.render(False) }})
(
SELECT *
FROM {{ source }}
WHERE {{ partition_by.render(False) }} in ({{ partitions | join(',') }})
)
{% endmacro %}

{% macro mc_dynamic_insert_overwrite_sql(target, source, partition_by) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none and include_sql_header }}
INSERT OVERWRITE TABLE {{ target }} PARTITION({{ partition_by.render(False) }})
(
SELECT *
FROM {{ source }}
)
{% endmacro %}
2 changes: 1 addition & 1 deletion dbt/include/maxcompute/macros/materializations/raw.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% materialization raw, adapter='maxcompute' -%}
{{ adapter.run_raw_sql(sql, config) }}
{% call statement("main") %}
{{ sql }}
{% endcall %}
{{ return({'relations': []}) }}
{%- endmaterialization %}

0 comments on commit cf2edbe

Please sign in to comment.