Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: 增强类型系统,更新约束支持矩阵,增量策略新增多种选项,修复多项逻辑,并通过官方提供的 58 个测试。 #2

Merged
merged 11 commits into from
Nov 28, 2024
Merged
2 changes: 1 addition & 1 deletion dbt/adapters/maxcompute/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0-alpha8"
version = "1.8.0-dev"
49 changes: 41 additions & 8 deletions dbt/adapters/maxcompute/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ class MaxComputeColumn(Column):
comment: str = ""

TYPE_LABELS = {
"TEXT": "string",
"TEXT": "STRING",
"INTEGER": "INT",
"BOOL": "BOOLEAN",
"NUMERIC": "DECIMAL",
"REAL": "FLOAT",
}

@property
Expand All @@ -26,19 +30,48 @@ def literal(self, value):

@classmethod
def numeric_type(cls, dtype: str, precision: Any, scale: Any) -> str:
# MaxCompute makes life much harder if precision + scale are specified
# even if they're fed in here, just return the data type by itself
return dtype
return "DECIMAL({}, {})".format(precision, scale)

def is_string(self) -> bool:
return self.dtype.lower() in [
"string" "text",
lower = self.dtype.lower()
if lower.startswith("char") or lower.startswith("varchar"):
return True
return lower in [
"string",
"text",
"character varying",
"character",
"char" "varchar",
"char",
"varchar",
]

def string_type(cls, size: int) -> str:
def is_integer(self) -> bool:
return self.dtype.lower() in [
# real types
"tinyint",
"smallint",
"integer",
"bigint",
"smallserial",
"serial",
"bigserial",
# aliases
"int",
"int2",
"int4",
"int8",
"serial2",
"serial4",
"serial8",
]

def is_numeric(self) -> bool:
lower = self.dtype.lower()
if lower.startswith("decimal") or lower.startswith("numeric"):
return True
return lower in ["numeric", "decimal"]

def string_type(cls, size: int = 0) -> str:
return "string"

def can_expand_to(self: Self, other_column: Self) -> bool:
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/maxcompute/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
"odps.sql.submit.mode": "script",
"odps.sql.allow.cartesian": "true",
"odps.sql.timezone": "GMT",
"odps.sql.allow.schema.evolution": "true",
}
72 changes: 51 additions & 21 deletions dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import AttrDict
from odps import ODPS
from odps.errors import ODPSError, NoSuchObject

from dbt.adapters.maxcompute import MaxComputeConnectionManager
Expand Down Expand Up @@ -61,8 +62,8 @@ class MaxComputeAdapter(SQLAdapter):
CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.NOT_ENFORCED,
ConstraintType.primary_key: ConstraintSupport.NOT_ENFORCED,
ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.primary_key: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
}

Expand All @@ -77,7 +78,7 @@ def __init__(self, config, mp_context: SpawnContext) -> None:
super().__init__(config, mp_context)
self.connections: MaxComputeConnectionManager = self.connections

def get_odps_client(self):
def get_odps_client(self) -> ODPS:
conn = self.connections.get_thread_connection()
return conn.handle.odps

Expand Down Expand Up @@ -109,6 +110,16 @@ def support_namespace_schema(self, project: str):
###
# Implementations of abstract methods
###
def get_relation(
self, database: str, schema: str, identifier: str
) -> Optional[MaxComputeRelation]:
odpsTable = self.get_odps_client().get_table(identifier, database, schema)
try:
odpsTable.reload()
except NoSuchObject:
return None
return MaxComputeRelation.from_odps_table(odpsTable)

@classmethod
def date_function(cls) -> str:
return "current_timestamp()"
Expand All @@ -121,6 +132,8 @@ def drop_relation(self, relation: MaxComputeRelation) -> None:
is_cached = self._schema_is_cached(relation.database, relation.schema)
if is_cached:
self.cache_dropped(relation)
if relation.table is None:
return
self.get_odps_client().delete_table(
relation.identifier, relation.project, True, relation.schema
)
Expand Down Expand Up @@ -177,7 +190,8 @@ def execute_macro(
kwargs=kwargs,
needs_conn=needs_conn,
)
self.connections.execute(str(sql))
if sql is not None and str(sql).strip() != "None":
self.connections.execute(str(sql))
return sql

def create_schema(self, relation: MaxComputeRelation) -> None:
Expand All @@ -196,7 +210,7 @@ def create_schema(self, relation: MaxComputeRelation) -> None:
raise e

def drop_schema(self, relation: MaxComputeRelation) -> None:
logger.debug(f"drop_schema: '{relation.project}.{relation.schema}'")
logger.debug(f"drop_schema: '{relation.database}.{relation.schema}'")

# Although the odps client has a check schema exist method, it will have a considerable delay,
# so that it is impossible to judge how many seconds it should wait.
Expand Down Expand Up @@ -302,18 +316,21 @@ def _get_one_catalog_by_relations(
sql_rows = []

for relation in relations:
odps_table = self.get_odps_table_by_relation(relation)
odps_table = self.get_odps_table_by_relation(relation, 10)
table_database = relation.project
table_schema = relation.schema
table_name = relation.table

if odps_table or odps_table.is_materialized_view:
if not odps_table:
continue

if odps_table.is_virtual_view or odps_table.is_materialized_view:
table_type = "VIEW"
else:
table_type = "TABLE"
table_comment = odps_table.comment
table_owner = odps_table.owner
column_index = 0
column_index = 1
for column in odps_table.table_schema.simple_columns:
column_name = column.name
column_type = column.type.name
Expand Down Expand Up @@ -348,7 +365,8 @@ def convert_text_type(cls, agate_table: "agate.Table", col_idx: int) -> str:

@classmethod
def convert_number_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
return "decimal"
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return "decimal" if decimals else "bigint"

@classmethod
def convert_integer_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
Expand Down Expand Up @@ -394,9 +412,16 @@ def string_add_sql(
raise DbtRuntimeError(f'Got an unexpected location value of "{location}"')

def validate_sql(self, sql: str) -> AdapterResponse:
res = self.connections.execute(sql)
validate_sql = "explain " + sql
res = self.connections.execute(validate_sql)
return res[0]

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "merge", "delete+insert"]

@available.parse_none
def load_dataframe(
self,
Expand All @@ -415,17 +440,22 @@ def load_dataframe(
if isinstance(column_type, agate.data_types.date_time.DateTime):
timestamp_columns.append(agate_table.column_names[i])

print(timestamp_columns)

pd_dataframe = pd.read_csv(
file_path, delimiter=field_delimiter, parse_dates=timestamp_columns
)

self.get_odps_client().write_table(
table_name,
pd_dataframe,
project=database,
schema=schema,
create_table=False,
create_partition=False,
)
# make sure target table exist
for i in range(10):
try:
self.get_odps_client().write_table(
table_name,
pd_dataframe,
project=database,
schema=schema,
create_table=False,
create_partition=False,
)
break
except NoSuchObject:
logger.info(f"Table {database}.{schema}.{table_name} does not exist, retrying...")
time.sleep(10)
continue
3 changes: 3 additions & 0 deletions dbt/adapters/maxcompute/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def cursor(self, *args, **kwargs):
**kwargs,
)

def cancel(self):
self.close()


logger = AdapterLogger("MaxCompute")

Expand Down
63 changes: 1 addition & 62 deletions dbt/include/maxcompute/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,76 +14,15 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
ALTER TABLE {{ from_relation.render() }}
RENAME TO {{ to_relation.identifier }};
{% else -%}
ALTER VIEW {{ from_relation.database }}.{{ from_relation.schema }}.{{ from_relation.identifier }}
ALTER VIEW {{ from_relation.render() }}
RENAME TO {{ to_relation.identifier }};
{% endif -%}
{% endmacro %}

{% macro maxcompute__alter_column_type(relation, column_name, new_column_type) -%}
ALTER TABLE {{ relation.render() }}
CHANGE {{ column_name }} {{ column_name }} {{ new_column_type }};
{% endmacro %}

{% macro maxcompute__copy_grants() -%}
{{ return(True) }}
{% endmacro %}

/* {# override dbt/include/global_project/macros/relations/table/create.sql #} */
{% macro maxcompute__create_table_as(temporary, relation, sql) -%}
{% set is_transactional = config.get('transactional') -%}

{%- if is_transactional -%}
{{ create_transactional_table_as(temporary, relation, sql) }}

{%- else -%}
CREATE TABLE IF NOT EXISTS {{ relation.render() }}
{% if temporary %}
LIFECYCLE 1
{% endif %}
AS (
{{ sql }}
)
;
{%- endif %}
{% endmacro %}


/* {# override dbt/include/global_project/macros/relations/view/create.sql #} */
{% macro maxcompute__create_view_as(relation, sql) -%}
CREATE OR REPLACE VIEW {{ relation.render() }} AS ({{ sql }});
{% endmacro %}

{% macro create_transactional_table_as(temporary, relation, sql) -%}
{% call statement('create_table', auto_begin=False) -%}
create table {{ relation.render() }}
{{ get_schema_from_query(sql) }}
tblproperties("transactional"="true")
{% if temporary %}
LIFECYCLE 1
{% endif %}
;
{% endcall %}
insert into {{ relation.render() }}
(
{{ sql }}
);
{% endmacro %}

{% macro get_schema_from_query(sql) -%}
(
{% set model_columns = model.columns %}
{% for c in get_column_schema_from_query(sql) -%}
{{ c.name }} {{ c.dtype }}
{% if model_columns and c.name in model_columns -%}
{{ "COMMENT" }} '{{ model_columns[c.name].description }}'
{%- endif %}
{{ "," if not loop.last or raw_model_constraints }}

{% endfor %}
)
{%- endmacro %}


{% macro maxcompute__current_timestamp() -%}
current_timestamp()
{%- endmacro %}
25 changes: 25 additions & 0 deletions dbt/include/maxcompute/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro maxcompute__alter_column_type(relation, column_name, new_column_type) -%}
alter table {{ relation.render() }} change column {{ adapter.quote(column_name) }} {{ adapter.quote(column_name) }} {{ new_column_type }};
{% endmacro %}


{% macro maxcompute__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns is not none and add_columns|length > 0%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render() }} add columns
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %};
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% if remove_columns is not none and remove_columns|length > 0%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render() }} drop columns
{% for column in remove_columns %}
{{ column.name }} {{ ',' if not loop.last }}
{% endfor %};
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}
Loading