Skip to content

Commit

Permalink
Feature: 增强类型系统,更新约束支持矩阵,增量策略新增多种选项,修复多项逻辑,并通过官方提供的 58 个测试。 (#2)
Browse files Browse the repository at this point in the history
* refactor(maxcompute): 更新版本号并优化表类型判断逻辑

- 将版本号从 1.8.0-alpha8 修改为1.8.0-dev
- 优化表类型判断逻辑,增加对虚拟视图的处理
- 添加 MaxCompute 特定的文档生成测试用例

* feat(maxcompute): 支持增量同步和数据类型转换

* feat(maxcompute): 优化数据类型映射并添加列类型测试

- 在 MaxComputeColumn 类中添加对更多数据类型的映射,包括 REAL 和各种整数类型
- 实现 is_integer 和 is_numeric 方法,以支持更多数值类型
- 在 tests/functional/adapter 目录下添加 test_column_types.py 文件,实现对 MaxCompute 列类型的测试

* feat(maxcompute): 实现 get_relation 方法并添加测试用例

- 在 MaxComputeAdapter 类中实现 get_relation 方法,用于获取关系对象- 添加测试用例文件 test_concurrency.py 和 test_relations.py,验证 MaxCompute 适配器功能- 优化 drop_schema 日志输出,使用 database 替代 project- 修复 execute_sql 方法,仅执行非空 SQL 语句

* test:为 MaxCompute 添加 alias 测试

为 MaxCompute 实现的别名测试添加了必要的 SQL 宏定义。这些宏包括:

- string_literal:用于处理字符串字面量的通用宏
- expect_value:用于断言查询结果的宏

这些宏在测试中将帮助处理特定的数据库适配器行为和期望值比较。此外,还为每个测试类添加了 macros 作为 pytest 的 class scope fixture。

* test:为 MaxCompute 添加 dbt-debug 测试

* test:为 MaxCompute 添加 cache 测试

* test:为 MaxCompute 添加 TestIncrementalOnSchemaChange 测试

* feat(maxcompute): 实现 delta表支持

- 新增 create_table_as_internal 宏,支持创建带主键的交易表
- 实现 incremental 材化,包括 merge 和 delete+insert 策略
- 添加 delta 表相关的测试用例
- 优化快照功能,使用新的 create_table_as_internal 宏
- 调整约束测试,使其适用于 maxcompute

* checkstyle

* fix(maxcompute): 修复数据类型和日期处理函数的大小写问题

- 在 array_construct 宏中将 data_type 转为小写
- 在 date_trunc、dateadd、datediff 和 last_day 宏中将 datepart转为小写
- 修复文档生成测试中的数据类型期望值
  • Loading branch information
dingxin-tech authored Nov 28, 2024
1 parent fe33701 commit 09b1afa
Show file tree
Hide file tree
Showing 30 changed files with 1,132 additions and 99 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/maxcompute/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0-alpha8"
version = "1.8.0-dev"
49 changes: 41 additions & 8 deletions dbt/adapters/maxcompute/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ class MaxComputeColumn(Column):
comment: str = ""

TYPE_LABELS = {
"TEXT": "string",
"TEXT": "STRING",
"INTEGER": "INT",
"BOOL": "BOOLEAN",
"NUMERIC": "DECIMAL",
"REAL": "FLOAT",
}

@property
Expand All @@ -26,19 +30,48 @@ def literal(self, value):

@classmethod
def numeric_type(cls, dtype: str, precision: Any, scale: Any) -> str:
# MaxCompute makes life much harder if precision + scale are specified
# even if they're fed in here, just return the data type by itself
return dtype
return "DECIMAL({}, {})".format(precision, scale)

def is_string(self) -> bool:
return self.dtype.lower() in [
"string" "text",
lower = self.dtype.lower()
if lower.startswith("char") or lower.startswith("varchar"):
return True
return lower in [
"string",
"text",
"character varying",
"character",
"char" "varchar",
"char",
"varchar",
]

def string_type(cls, size: int) -> str:
def is_integer(self) -> bool:
return self.dtype.lower() in [
# real types
"tinyint",
"smallint",
"integer",
"bigint",
"smallserial",
"serial",
"bigserial",
# aliases
"int",
"int2",
"int4",
"int8",
"serial2",
"serial4",
"serial8",
]

def is_numeric(self) -> bool:
lower = self.dtype.lower()
if lower.startswith("decimal") or lower.startswith("numeric"):
return True
return lower in ["numeric", "decimal"]

def string_type(cls, size: int = 0) -> str:
return "string"

def can_expand_to(self: Self, other_column: Self) -> bool:
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/maxcompute/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
"odps.sql.submit.mode": "script",
"odps.sql.allow.cartesian": "true",
"odps.sql.timezone": "GMT",
"odps.sql.allow.schema.evolution": "true",
}
72 changes: 51 additions & 21 deletions dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import AttrDict
from odps import ODPS
from odps.errors import ODPSError, NoSuchObject

from dbt.adapters.maxcompute import MaxComputeConnectionManager
Expand Down Expand Up @@ -61,8 +62,8 @@ class MaxComputeAdapter(SQLAdapter):
CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.NOT_ENFORCED,
ConstraintType.primary_key: ConstraintSupport.NOT_ENFORCED,
ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.primary_key: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
}

Expand All @@ -77,7 +78,7 @@ def __init__(self, config, mp_context: SpawnContext) -> None:
super().__init__(config, mp_context)
self.connections: MaxComputeConnectionManager = self.connections

def get_odps_client(self):
def get_odps_client(self) -> ODPS:
conn = self.connections.get_thread_connection()
return conn.handle.odps

Expand Down Expand Up @@ -109,6 +110,16 @@ def support_namespace_schema(self, project: str):
###
# Implementations of abstract methods
###
def get_relation(
self, database: str, schema: str, identifier: str
) -> Optional[MaxComputeRelation]:
odpsTable = self.get_odps_client().get_table(identifier, database, schema)
try:
odpsTable.reload()
except NoSuchObject:
return None
return MaxComputeRelation.from_odps_table(odpsTable)

@classmethod
def date_function(cls) -> str:
return "current_timestamp()"
Expand All @@ -121,6 +132,8 @@ def drop_relation(self, relation: MaxComputeRelation) -> None:
is_cached = self._schema_is_cached(relation.database, relation.schema)
if is_cached:
self.cache_dropped(relation)
if relation.table is None:
return
self.get_odps_client().delete_table(
relation.identifier, relation.project, True, relation.schema
)
Expand Down Expand Up @@ -177,7 +190,8 @@ def execute_macro(
kwargs=kwargs,
needs_conn=needs_conn,
)
self.connections.execute(str(sql))
if sql is not None and str(sql).strip() != "None":
self.connections.execute(str(sql))
return sql

def create_schema(self, relation: MaxComputeRelation) -> None:
Expand All @@ -196,7 +210,7 @@ def create_schema(self, relation: MaxComputeRelation) -> None:
raise e

def drop_schema(self, relation: MaxComputeRelation) -> None:
logger.debug(f"drop_schema: '{relation.project}.{relation.schema}'")
logger.debug(f"drop_schema: '{relation.database}.{relation.schema}'")

# Although the odps client has a check schema exist method, it will have a considerable delay,
# so that it is impossible to judge how many seconds it should wait.
Expand Down Expand Up @@ -302,18 +316,21 @@ def _get_one_catalog_by_relations(
sql_rows = []

for relation in relations:
odps_table = self.get_odps_table_by_relation(relation)
odps_table = self.get_odps_table_by_relation(relation, 10)
table_database = relation.project
table_schema = relation.schema
table_name = relation.table

if odps_table or odps_table.is_materialized_view:
if not odps_table:
continue

if odps_table.is_virtual_view or odps_table.is_materialized_view:
table_type = "VIEW"
else:
table_type = "TABLE"
table_comment = odps_table.comment
table_owner = odps_table.owner
column_index = 0
column_index = 1
for column in odps_table.table_schema.simple_columns:
column_name = column.name
column_type = column.type.name
Expand Down Expand Up @@ -348,7 +365,8 @@ def convert_text_type(cls, agate_table: "agate.Table", col_idx: int) -> str:

@classmethod
def convert_number_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
return "decimal"
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return "decimal" if decimals else "bigint"

@classmethod
def convert_integer_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
Expand Down Expand Up @@ -394,9 +412,16 @@ def string_add_sql(
raise DbtRuntimeError(f'Got an unexpected location value of "{location}"')

def validate_sql(self, sql: str) -> AdapterResponse:
res = self.connections.execute(sql)
validate_sql = "explain " + sql
res = self.connections.execute(validate_sql)
return res[0]

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "merge", "delete+insert"]

@available.parse_none
def load_dataframe(
self,
Expand All @@ -415,17 +440,22 @@ def load_dataframe(
if isinstance(column_type, agate.data_types.date_time.DateTime):
timestamp_columns.append(agate_table.column_names[i])

print(timestamp_columns)

pd_dataframe = pd.read_csv(
file_path, delimiter=field_delimiter, parse_dates=timestamp_columns
)

self.get_odps_client().write_table(
table_name,
pd_dataframe,
project=database,
schema=schema,
create_table=False,
create_partition=False,
)
# make sure target table exist
for i in range(10):
try:
self.get_odps_client().write_table(
table_name,
pd_dataframe,
project=database,
schema=schema,
create_table=False,
create_partition=False,
)
break
except NoSuchObject:
logger.info(f"Table {database}.{schema}.{table_name} does not exist, retrying...")
time.sleep(10)
continue
3 changes: 3 additions & 0 deletions dbt/adapters/maxcompute/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def cursor(self, *args, **kwargs):
**kwargs,
)

def cancel(self):
self.close()


logger = AdapterLogger("MaxCompute")

Expand Down
63 changes: 1 addition & 62 deletions dbt/include/maxcompute/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,76 +14,15 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
ALTER TABLE {{ from_relation.render() }}
RENAME TO {{ to_relation.identifier }};
{% else -%}
ALTER VIEW {{ from_relation.database }}.{{ from_relation.schema }}.{{ from_relation.identifier }}
ALTER VIEW {{ from_relation.render() }}
RENAME TO {{ to_relation.identifier }};
{% endif -%}
{% endmacro %}

{% macro maxcompute__alter_column_type(relation, column_name, new_column_type) -%}
ALTER TABLE {{ relation.render() }}
CHANGE {{ column_name }} {{ column_name }} {{ new_column_type }};
{% endmacro %}

{% macro maxcompute__copy_grants() -%}
{{ return(True) }}
{% endmacro %}

/* {# override dbt/include/global_project/macros/relations/table/create.sql #} */
{% macro maxcompute__create_table_as(temporary, relation, sql) -%}
{% set is_transactional = config.get('transactional') -%}

{%- if is_transactional -%}
{{ create_transactional_table_as(temporary, relation, sql) }}

{%- else -%}
CREATE TABLE IF NOT EXISTS {{ relation.render() }}
{% if temporary %}
LIFECYCLE 1
{% endif %}
AS (
{{ sql }}
)
;
{%- endif %}
{% endmacro %}


/* {# override dbt/include/global_project/macros/relations/view/create.sql #} */
{% macro maxcompute__create_view_as(relation, sql) -%}
CREATE OR REPLACE VIEW {{ relation.render() }} AS ({{ sql }});
{% endmacro %}

{% macro create_transactional_table_as(temporary, relation, sql) -%}
{% call statement('create_table', auto_begin=False) -%}
create table {{ relation.render() }}
{{ get_schema_from_query(sql) }}
tblproperties("transactional"="true")
{% if temporary %}
LIFECYCLE 1
{% endif %}
;
{% endcall %}
insert into {{ relation.render() }}
(
{{ sql }}
);
{% endmacro %}

{% macro get_schema_from_query(sql) -%}
(
{% set model_columns = model.columns %}
{% for c in get_column_schema_from_query(sql) -%}
{{ c.name }} {{ c.dtype }}
{% if model_columns and c.name in model_columns -%}
{{ "COMMENT" }} '{{ model_columns[c.name].description }}'
{%- endif %}
{{ "," if not loop.last or raw_model_constraints }}

{% endfor %}
)
{%- endmacro %}


{% macro maxcompute__current_timestamp() -%}
current_timestamp()
{%- endmacro %}
25 changes: 25 additions & 0 deletions dbt/include/maxcompute/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro maxcompute__alter_column_type(relation, column_name, new_column_type) -%}
alter table {{ relation.render() }} change column {{ adapter.quote(column_name) }} {{ adapter.quote(column_name) }} {{ new_column_type }};
{% endmacro %}


{% macro maxcompute__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns is not none and add_columns|length > 0%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render() }} add columns
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %};
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% if remove_columns is not none and remove_columns|length > 0%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render() }} drop columns
{% for column in remove_columns %}
{{ column.name }} {{ ',' if not loop.last }}
{% endfor %};
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}
Loading

0 comments on commit 09b1afa

Please sign in to comment.