Skip to content

Commit

Permalink
feat(maxcompute): 添加克隆表功能并优化表创建逻辑
Browse files Browse the repository at this point in the history
- 新增克隆表功能,支持在 MaxCompute 中创建表的副本
- 优化表创建逻辑,移除不必要的 if not exists 判断
- 添加更新表注释和列注释的功能
- 新增测试用例以验证克隆表和注释功能
  • Loading branch information
dingxin-tech committed Dec 3, 2024
1 parent 186525a commit e085e98
Show file tree
Hide file tree
Showing 27 changed files with 1,186 additions and 28 deletions.
15 changes: 0 additions & 15 deletions .idea/dbt-maxcompute.iml

This file was deleted.

2 changes: 1 addition & 1 deletion dbt/adapters/maxcompute/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0-alpha.9"
version = "1.8.0-dev"
13 changes: 8 additions & 5 deletions dbt/adapters/maxcompute/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Dict, Any

from dbt_common.exceptions import DbtConfigError, DbtRuntimeError
from dbt.adapters.contracts.connection import Credentials, AdapterResponse
Expand Down Expand Up @@ -77,10 +76,6 @@ def get_response(cls, cursor):
logger.debug("Current instance id is " + cursor._instance.id)
return AdapterResponse(_message="OK")

def set_query_header(self, query_header_context: Dict[str, Any]) -> None:
# no query header will better
pass

@contextmanager
def exception_handler(self, sql: str):
try:
Expand All @@ -100,6 +95,14 @@ def exception_handler(self, sql: str):
def cancel(self, connection):
connection.handle.cancel()

def begin(self):
logger.debug("Trigger beginning transaction, actually do nothing...")

# FIXME: Sometimes the number of commits is greater than the number of begins.
# It should be a problem with the micro, which can be reproduced through the test of dbt_show.
def commit(self):
logger.debug("Committing transaction, actually do nothing...")

def add_begin_query(self):
pass

Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def load_dataframe(
pd_dataframe = pd.read_csv(
file_path, delimiter=field_delimiter, parse_dates=timestamp_columns
)
logger.debug(f"Load csv to table {database}.{schema}.{table_name}")
# make sure target table exist
for i in range(10):
try:
Expand Down
7 changes: 4 additions & 3 deletions dbt/adapters/maxcompute/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def remove_comments(input_string):
result = re.sub(r"/\*[^+].*?\*/", "", input_string, flags=re.DOTALL)
return result

operation = remove_comments(operation)
# operation = remove_comments(operation)
parameters = param_normalization(parameters)
operation = replace_sql_placeholders(operation, parameters)

# retry three times, each time wait for 10 seconds
retry_times = 3
# retry ten times, each time wait for 10 seconds
retry_times = 10
for i in range(retry_times):
try:
super().execute(operation)
Expand All @@ -78,6 +78,7 @@ def remove_comments(input_string):
or e.code == "ODPS-0110061"
or e.code == "ODPS-0130131"
or e.code == "ODPS-0420111"
or e.code == "ODPS-0130071"
):
if i == retry_times - 1:
raise e
Expand Down
16 changes: 16 additions & 0 deletions dbt/include/maxcompute/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% macro maxcompute__alter_column_comment(relation, column_dict) %}
{% set existing_columns = adapter.get_columns_in_relation(relation) | map(attribute="name") | list %}
{% for column_name in column_dict if (column_name in existing_columns) %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = quote_and_escape(comment) %}
alter table {{ relation.render() }} change column {{ column_name }} comment {{ escaped_comment }};
{% endfor %}
{% endmacro %}

{% macro maxcompute__alter_relation_comment(relation, relation_comment) -%}
{% if relation.is_table -%}
alter table {{relation.render()}} set comment {{quote_and_escape(relation_comment)}};
{% else -%}
{{ exceptions.raise_compiler_error("MaxCompute not support set comment to view") }}
{% endif -%}
{% endmacro %}
8 changes: 8 additions & 0 deletions dbt/include/maxcompute/macros/materializations/clone.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{% macro maxcompute__can_clone_table() %}
{{ return(True) }}
{% endmacro %}


{% macro maxcompute__create_or_replace_clone(this_relation, defer_relation) %}
clone table {{ this_relation.render() }} to {{ defer_relation.render() }} if exists overwrite;
{% endmacro %}
10 changes: 10 additions & 0 deletions dbt/include/maxcompute/macros/materializations/hooks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{% macro run_hooks(hooks, inside_transaction=True) %}
{% for hook in hooks | selectattr('transaction', 'equalto', inside_transaction) %}
{% set rendered = render(hook.get('sql')) | trim %}
{% if (rendered | length) > 0 %}
{% call statement(auto_begin=inside_transaction) %}
{{ rendered }}
{% endcall %}
{% endif %}
{% endfor %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
{%- endmacro %}



{% macro build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set temp_relation = make_temp_relation(target_relation) %}

Expand Down
9 changes: 7 additions & 2 deletions dbt/include/maxcompute/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{%- set is_delta = is_transactional and primary_keys is not none and primary_keys|length > 0 -%}

{% call statement('create_table', auto_begin=False) -%}
create table if not exists {{ relation.render() }} (
create table {{ relation.render() }} (
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced and (not temporary) %}
{{ get_assert_columns_equivalent(sql) }}
Expand Down Expand Up @@ -57,12 +57,17 @@
{{ c.name }} {{ c.dtype }}
{% if primary_keys and c.name in primary_keys -%}not null{%- endif %}
{% if model_columns and c.name in model_columns -%}
{{ "COMMENT" }} '{{ model_columns[c.name].description }}'
{{ "COMMENT" }} {{ quote_and_escape(model_columns[c.name].description) }}
{%- endif %}
{{ "," if not loop.last }}
{% endfor %}
{%- endmacro %}

{% macro quote_and_escape(input_string) %}
{% set escaped_string = input_string | replace("'", "\\'") %}
'{{ escaped_string }}'
{% endmacro %}

-- Compared to get_table_columns_and_constraints, only the surrounding brackets are deleted
{% macro get_table_columns_and_constraints_without_brackets() -%}
{# loop through user_provided_columns to create DDL with data types and constraints #}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/maxcompute/macros/relations/view/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
create or replace view {{ relation.render() }}
create view {{ relation.render() }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
Expand Down
16 changes: 16 additions & 0 deletions tests/functional/adapter/data/seed_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
drop table if exists {schema}.on_model_hook;

create table {schema}.on_model_hook (
test_state string, -- start|end
target_dbname string,
target_host string,
target_name string,
target_schema string,
target_type string,
target_user string,
target_pass string,
target_threads int,
run_started_at string,
invocation_id string,
thread_id string
);
16 changes: 16 additions & 0 deletions tests/functional/adapter/data/seed_run.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
drop table if exists {schema}.on_run_hook;

create table {schema}.on_run_hook (
test_state string, -- start|end
target_dbname string,
target_host string,
target_name string,
target_schema string,
target_type string,
target_user string,
target_pass string,
target_threads int,
run_started_at string,
invocation_id string,
thread_id string
);
25 changes: 25 additions & 0 deletions tests/functional/adapter/test_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest
from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.tests.adapter.catalog.relation_types import CatalogRelationTypes


class TestCatalogAdapter(CatalogRelationTypes):

@pytest.fixture(scope="class", autouse=True)
def models(self):
from dbt.tests.adapter.catalog import files

yield {"my_table.sql": files.MY_TABLE, "my_view.sql": files.MY_VIEW}

@pytest.mark.parametrize(
"node_name,relation_type",
[
("seed.test.my_seed", "TABLE"),
("model.test.my_table", "TABLE"),
("model.test.my_view", "VIEW"),
],
)
def test_relation_types_populate_correctly(
self, docs: CatalogArtifact, node_name: str, relation_type: str
):
super().test_relation_types_populate_correctly(docs, node_name, relation_type)
27 changes: 27 additions & 0 deletions tests/functional/adapter/test_dbt_clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest
from dbt.tests.adapter.dbt_clone.test_dbt_clone import (
BaseClonePossible,
BaseCloneSameTargetAndState,
)


class TestMaxComputeClonePossible(BaseClonePossible):
@pytest.fixture(autouse=True)
def clean_up(self, project):
yield
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=f"{project.test_schema}_seeds"
)
project.adapter.drop_schema(relation)

relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
project.adapter.drop_schema(relation)

pass


class TestCloneSameTargetAndState(BaseCloneSameTargetAndState):
pass
17 changes: 17 additions & 0 deletions tests/functional/adapter/test_dbt_show.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dbt.tests.adapter.dbt_show.test_dbt_show import (
BaseShowSqlHeader,
BaseShowLimit,
BaseShowDoesNotHandleDoubleLimit,
)


class TestPostgresShowSqlHeader(BaseShowSqlHeader):
pass


class TestPostgresShowLimit(BaseShowLimit):
pass


class TestShowDoesNotHandleDoubleLimit(BaseShowDoesNotHandleDoubleLimit):
pass
9 changes: 9 additions & 0 deletions tests/functional/adapter/test_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt.tests.adapter.empty.test_empty import BaseTestEmpty, BaseTestEmptyInlineSourceRef


class TestEmpty(BaseTestEmpty):
pass


class TestEmptyInlineSourceRef(BaseTestEmptyInlineSourceRef):
pass
74 changes: 74 additions & 0 deletions tests/functional/adapter/test_ephemeral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from dbt.tests.adapter.ephemeral.test_ephemeral import (
BaseEphemeral,
BaseEphemeralMulti,
BaseEphemeralNested,
BaseEphemeralErrorHandling,
)
import os
import re
from dbt.tests.util import check_relations_equal, run_dbt


# change test project to actually project, here is 'dingxin'
class TestEphemeralMulti(BaseEphemeralMulti):
def test_ephemeral_multi(self, project):
run_dbt(["seed"])
results = run_dbt(["run"])
assert len(results) == 3

check_relations_equal(project.adapter, ["seed", "dependent"])
check_relations_equal(project.adapter, ["seed", "double_dependent"])
check_relations_equal(project.adapter, ["seed", "super_dependent"])
assert os.path.exists("./target/run/test/models/double_dependent.sql")
with open("./target/run/test/models/double_dependent.sql", "r") as fp:
sql_file = fp.read()

sql_file = re.sub(r"\d+", "", sql_file)
expected_sql = (
"create view `dingxin`.`test_test_ephemeral`.`double_dependent__dbt_tmp` as ("
"with __dbt__cte__base as ("
"select * from test_test_ephemeral.seed"
"), __dbt__cte__base_copy as ("
"select * from __dbt__cte__base"
")-- base_copy just pulls from base. Make sure the listed"
"-- graph of CTEs all share the same dbt_cte__base cte"
"select * from __dbt__cte__base where gender = 'Male'"
"union all"
"select * from __dbt__cte__base_copy where gender = 'Female'"
");"
)
sql_file = "".join(sql_file.split())
expected_sql = "".join(expected_sql.split())
assert sql_file == expected_sql

pass


class TestEphemeralNested(BaseEphemeralNested):
def test_ephemeral_nested(self, project):
results = run_dbt(["run"])
assert len(results) == 2
assert os.path.exists("./target/run/test/models/root_view.sql")
with open("./target/run/test/models/root_view.sql", "r") as fp:
sql_file = fp.read()

sql_file = re.sub(r"\d+", "", sql_file)
expected_sql = (
"create view `dingxin`.`test_test_ephemeral`.`root_view__dbt_tmp` as ("
"with __dbt__cte__ephemeral_level_two as ("
"select * from `dingxin`.`test_test_ephemeral`.`source_table`"
"), __dbt__cte__ephemeral as ("
"select * from __dbt__cte__ephemeral_level_two"
")select * from __dbt__cte__ephemeral"
");"
)

sql_file = "".join(sql_file.split())
expected_sql = "".join(expected_sql.split())
assert sql_file == expected_sql

pass


class TestEphemeralErrorHandling(BaseEphemeralErrorHandling):
pass
9 changes: 9 additions & 0 deletions tests/functional/adapter/test_grants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import pytest
from dbt.tests.adapter.grants.test_seed_grants import BaseSeedGrants


@pytest.mark.skip(
reason="Please use webconsole/pyodps for permission operations. Dbt will not perform any modification operations."
)
class TestSeedGrants(BaseSeedGrants):
pass
Loading

0 comments on commit e085e98

Please sign in to comment.