diff --git a/dbt/adapters/maxcompute/__version__.py b/dbt/adapters/maxcompute/__version__.py index 62a1011..72ebff9 100644 --- a/dbt/adapters/maxcompute/__version__.py +++ b/dbt/adapters/maxcompute/__version__.py @@ -1 +1 @@ -version = "1.8.0-alpha8" +version = "1.8.0-dev" diff --git a/dbt/adapters/maxcompute/column.py b/dbt/adapters/maxcompute/column.py index 9425c3d..782ea24 100644 --- a/dbt/adapters/maxcompute/column.py +++ b/dbt/adapters/maxcompute/column.py @@ -14,7 +14,11 @@ class MaxComputeColumn(Column): comment: str = "" TYPE_LABELS = { - "TEXT": "string", + "TEXT": "STRING", + "INTEGER": "INT", + "BOOL": "BOOLEAN", + "NUMERIC": "DECIMAL", + "REAL": "FLOAT", } @property @@ -26,19 +30,48 @@ def literal(self, value): @classmethod def numeric_type(cls, dtype: str, precision: Any, scale: Any) -> str: - # MaxCompute makes life much harder if precision + scale are specified - # even if they're fed in here, just return the data type by itself - return dtype + return "DECIMAL({}, {})".format(precision, scale) def is_string(self) -> bool: - return self.dtype.lower() in [ - "string" "text", + lower = self.dtype.lower() + if lower.startswith("char") or lower.startswith("varchar"): + return True + return lower in [ + "string", + "text", "character varying", "character", - "char" "varchar", + "char", + "varchar", ] - def string_type(cls, size: int) -> str: + def is_integer(self) -> bool: + return self.dtype.lower() in [ + # real types + "tinyint", + "smallint", + "integer", + "bigint", + "smallserial", + "serial", + "bigserial", + # aliases + "int", + "int2", + "int4", + "int8", + "serial2", + "serial4", + "serial8", + ] + + def is_numeric(self) -> bool: + lower = self.dtype.lower() + if lower.startswith("decimal") or lower.startswith("numeric"): + return True + return lower in ["numeric", "decimal"] + + def string_type(cls, size: int = 0) -> str: return "string" def can_expand_to(self: Self, other_column: Self) -> bool: diff --git a/dbt/adapters/maxcompute/context.py b/dbt/adapters/maxcompute/context.py index c5d8c93..9452cb8 100644 --- a/dbt/adapters/maxcompute/context.py +++ b/dbt/adapters/maxcompute/context.py @@ -4,4 +4,5 @@ "odps.sql.submit.mode": "script", "odps.sql.allow.cartesian": "true", "odps.sql.timezone": "GMT", + "odps.sql.allow.schema.evolution": "true", } diff --git a/dbt/adapters/maxcompute/impl.py b/dbt/adapters/maxcompute/impl.py index ecd93c5..5a7b657 100644 --- a/dbt/adapters/maxcompute/impl.py +++ b/dbt/adapters/maxcompute/impl.py @@ -23,6 +23,7 @@ from dbt_common.contracts.constraints import ConstraintType from dbt_common.exceptions import DbtRuntimeError from dbt_common.utils import AttrDict +from odps import ODPS from odps.errors import ODPSError, NoSuchObject from dbt.adapters.maxcompute import MaxComputeConnectionManager @@ -61,8 +62,8 @@ class MaxComputeAdapter(SQLAdapter): CONSTRAINT_SUPPORT = { ConstraintType.check: ConstraintSupport.NOT_SUPPORTED, ConstraintType.not_null: ConstraintSupport.ENFORCED, - ConstraintType.unique: ConstraintSupport.NOT_ENFORCED, - ConstraintType.primary_key: ConstraintSupport.NOT_ENFORCED, + ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED, + ConstraintType.primary_key: ConstraintSupport.NOT_SUPPORTED, ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED, } @@ -77,7 +78,7 @@ def __init__(self, config, mp_context: SpawnContext) -> None: super().__init__(config, mp_context) self.connections: MaxComputeConnectionManager = self.connections - def get_odps_client(self): + def get_odps_client(self) -> ODPS: conn = self.connections.get_thread_connection() return conn.handle.odps @@ -109,6 +110,16 @@ def support_namespace_schema(self, project: str): ### # Implementations of abstract methods ### + def get_relation( + self, database: str, schema: str, identifier: str + ) -> Optional[MaxComputeRelation]: + odpsTable = self.get_odps_client().get_table(identifier, database, schema) + try: + odpsTable.reload() + except NoSuchObject: + return None + return MaxComputeRelation.from_odps_table(odpsTable) + @classmethod def date_function(cls) -> str: return "current_timestamp()" @@ -121,6 +132,8 @@ def drop_relation(self, relation: MaxComputeRelation) -> None: is_cached = self._schema_is_cached(relation.database, relation.schema) if is_cached: self.cache_dropped(relation) + if relation.table is None: + return self.get_odps_client().delete_table( relation.identifier, relation.project, True, relation.schema ) @@ -177,7 +190,8 @@ def execute_macro( kwargs=kwargs, needs_conn=needs_conn, ) - self.connections.execute(str(sql)) + if sql is not None and str(sql).strip() != "None": + self.connections.execute(str(sql)) return sql def create_schema(self, relation: MaxComputeRelation) -> None: @@ -196,7 +210,7 @@ def create_schema(self, relation: MaxComputeRelation) -> None: raise e def drop_schema(self, relation: MaxComputeRelation) -> None: - logger.debug(f"drop_schema: '{relation.project}.{relation.schema}'") + logger.debug(f"drop_schema: '{relation.database}.{relation.schema}'") # Although the odps client has a check schema exist method, it will have a considerable delay, # so that it is impossible to judge how many seconds it should wait. @@ -302,18 +316,21 @@ def _get_one_catalog_by_relations( sql_rows = [] for relation in relations: - odps_table = self.get_odps_table_by_relation(relation) + odps_table = self.get_odps_table_by_relation(relation, 10) table_database = relation.project table_schema = relation.schema table_name = relation.table - if odps_table or odps_table.is_materialized_view: + if not odps_table: + continue + + if odps_table.is_virtual_view or odps_table.is_materialized_view: table_type = "VIEW" else: table_type = "TABLE" table_comment = odps_table.comment table_owner = odps_table.owner - column_index = 0 + column_index = 1 for column in odps_table.table_schema.simple_columns: column_name = column.name column_type = column.type.name @@ -348,7 +365,8 @@ def convert_text_type(cls, agate_table: "agate.Table", col_idx: int) -> str: @classmethod def convert_number_type(cls, agate_table: "agate.Table", col_idx: int) -> str: - return "decimal" + decimals = agate_table.aggregate(agate.MaxPrecision(col_idx)) + return "decimal" if decimals else "bigint" @classmethod def convert_integer_type(cls, agate_table: "agate.Table", col_idx: int) -> str: @@ -394,9 +412,16 @@ def string_add_sql( raise DbtRuntimeError(f'Got an unexpected location value of "{location}"') def validate_sql(self, sql: str) -> AdapterResponse: - res = self.connections.execute(sql) + validate_sql = "explain " + sql + res = self.connections.execute(validate_sql) return res[0] + def valid_incremental_strategies(self): + """The set of standard builtin strategies which this adapter supports out-of-the-box. + Not used to validate custom strategies defined by end users. + """ + return ["append", "merge", "delete+insert"] + @available.parse_none def load_dataframe( self, @@ -415,17 +440,22 @@ def load_dataframe( if isinstance(column_type, agate.data_types.date_time.DateTime): timestamp_columns.append(agate_table.column_names[i]) - print(timestamp_columns) - pd_dataframe = pd.read_csv( file_path, delimiter=field_delimiter, parse_dates=timestamp_columns ) - - self.get_odps_client().write_table( - table_name, - pd_dataframe, - project=database, - schema=schema, - create_table=False, - create_partition=False, - ) + # make sure target table exist + for i in range(10): + try: + self.get_odps_client().write_table( + table_name, + pd_dataframe, + project=database, + schema=schema, + create_table=False, + create_partition=False, + ) + break + except NoSuchObject: + logger.info(f"Table {database}.{schema}.{table_name} does not exist, retrying...") + time.sleep(10) + continue diff --git a/dbt/adapters/maxcompute/wrapper.py b/dbt/adapters/maxcompute/wrapper.py index 48c5b88..88d7632 100644 --- a/dbt/adapters/maxcompute/wrapper.py +++ b/dbt/adapters/maxcompute/wrapper.py @@ -19,6 +19,9 @@ def cursor(self, *args, **kwargs): **kwargs, ) + def cancel(self): + self.close() + logger = AdapterLogger("MaxCompute") diff --git a/dbt/include/maxcompute/macros/adapters.sql b/dbt/include/maxcompute/macros/adapters.sql index d87352d..980bc00 100644 --- a/dbt/include/maxcompute/macros/adapters.sql +++ b/dbt/include/maxcompute/macros/adapters.sql @@ -14,76 +14,15 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter ALTER TABLE {{ from_relation.render() }} RENAME TO {{ to_relation.identifier }}; {% else -%} - ALTER VIEW {{ from_relation.database }}.{{ from_relation.schema }}.{{ from_relation.identifier }} + ALTER VIEW {{ from_relation.render() }} RENAME TO {{ to_relation.identifier }}; {% endif -%} {% endmacro %} -{% macro maxcompute__alter_column_type(relation, column_name, new_column_type) -%} - ALTER TABLE {{ relation.render() }} - CHANGE {{ column_name }} {{ column_name }} {{ new_column_type }}; -{% endmacro %} - {% macro maxcompute__copy_grants() -%} {{ return(True) }} {% endmacro %} -/* {# override dbt/include/global_project/macros/relations/table/create.sql #} */ -{% macro maxcompute__create_table_as(temporary, relation, sql) -%} - {% set is_transactional = config.get('transactional') -%} - - {%- if is_transactional -%} - {{ create_transactional_table_as(temporary, relation, sql) }} - - {%- else -%} - CREATE TABLE IF NOT EXISTS {{ relation.render() }} - {% if temporary %} - LIFECYCLE 1 - {% endif %} - AS ( - {{ sql }} - ) - ; - {%- endif %} -{% endmacro %} - - -/* {# override dbt/include/global_project/macros/relations/view/create.sql #} */ -{% macro maxcompute__create_view_as(relation, sql) -%} - CREATE OR REPLACE VIEW {{ relation.render() }} AS ({{ sql }}); -{% endmacro %} - -{% macro create_transactional_table_as(temporary, relation, sql) -%} - {% call statement('create_table', auto_begin=False) -%} - create table {{ relation.render() }} - {{ get_schema_from_query(sql) }} - tblproperties("transactional"="true") - {% if temporary %} - LIFECYCLE 1 - {% endif %} - ; - {% endcall %} - insert into {{ relation.render() }} - ( - {{ sql }} - ); -{% endmacro %} - -{% macro get_schema_from_query(sql) -%} -( - {% set model_columns = model.columns %} - {% for c in get_column_schema_from_query(sql) -%} - {{ c.name }} {{ c.dtype }} - {% if model_columns and c.name in model_columns -%} - {{ "COMMENT" }} '{{ model_columns[c.name].description }}' - {%- endif %} - {{ "," if not loop.last or raw_model_constraints }} - - {% endfor %} -) -{%- endmacro %} - - {% macro maxcompute__current_timestamp() -%} current_timestamp() {%- endmacro %} diff --git a/dbt/include/maxcompute/macros/adapters/columns.sql b/dbt/include/maxcompute/macros/adapters/columns.sql new file mode 100644 index 0000000..df0acd0 --- /dev/null +++ b/dbt/include/maxcompute/macros/adapters/columns.sql @@ -0,0 +1,25 @@ +{% macro maxcompute__alter_column_type(relation, column_name, new_column_type) -%} + alter table {{ relation.render() }} change column {{ adapter.quote(column_name) }} {{ adapter.quote(column_name) }} {{ new_column_type }}; +{% endmacro %} + + +{% macro maxcompute__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} + {% if add_columns is not none and add_columns|length > 0%} + {% set sql -%} + alter {{ relation.type }} {{ relation.render() }} add columns + {% for column in add_columns %} + {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} + {% endfor %}; + {%- endset -%} + {% do run_query(sql) %} + {% endif %} + {% if remove_columns is not none and remove_columns|length > 0%} + {% set sql -%} + alter {{ relation.type }} {{ relation.render() }} drop columns + {% for column in remove_columns %} + {{ column.name }} {{ ',' if not loop.last }} + {% endfor %}; + {%- endset -%} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql b/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql new file mode 100644 index 0000000..af0a6d7 --- /dev/null +++ b/dbt/include/maxcompute/macros/materializations/incremental/incremental.sql @@ -0,0 +1,106 @@ + +{% materialization incremental, adapter='maxcompute' -%} + + -- relations + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='table') -%} + {%- set temp_relation = make_temp_relation(target_relation)-%} + {%- set intermediate_relation = make_intermediate_relation(target_relation)-%} + {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + + -- configs + {%- set unique_key = config.get('unique_key') -%} + {%- set unique_key_list = unique_key.split(',') if unique_key else [] -%} + {%- set transaction_table = unique_key_list|length > 0 -%} + {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} + + -- the temp_ and backup_ relations should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation. This has to happen before + -- BEGIN, in a separate transaction + {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + -- grab current tables grants config for comparision later on + {% set grant_config = config.get('grants') %} + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set to_drop = [] %} + + {% if existing_relation is none %} + {% set build_sql = create_table_as_internal(False, target_relation, sql, transaction_table) %} + {% elif full_refresh_mode %} + {% set build_sql = create_table_as_internal(False, intermediate_relation, sql, transaction_table) %} + {% set need_swap = true %} + {% else %} + {% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %} + {% set contract_config = config.get('contract') %} + {% if not contract_config or not contract_config.enforced %} + {% do adapter.expand_target_column_types( + from_relation=temp_relation, + to_relation=target_relation) %} + {% endif %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} + {% if not dest_columns %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {% endif %} + + {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} + {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} + {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} + {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} + + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + {% if need_swap %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% do adapter.rename_relation(intermediate_relation, target_relation) %} + {% do to_drop.append(backup_relation) %} + {% endif %} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do adapter.drop_relation(rel) %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} + + +{% macro get_quoted_list(column_names) %} + {% set quoted = [] %} + {% for col in column_names -%} + {%- do quoted.append(adapter.quote(col)) -%} + {%- endfor %} + {{ return(quoted) }} +{% endmacro %} diff --git a/dbt/include/maxcompute/macros/materializations/incremental/merge.sql b/dbt/include/maxcompute/macros/materializations/incremental/merge.sql new file mode 100644 index 0000000..64133d6 --- /dev/null +++ b/dbt/include/maxcompute/macros/materializations/incremental/merge.sql @@ -0,0 +1,126 @@ +{% macro maxcompute__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set dest_cols_names = get_quoted_list(dest_columns | map(attribute="name")) -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set merge_update_columns = config.get('merge_update_columns') -%} + {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} + {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} + + {{ sql_header if sql_header is not none }} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{"(" ~ predicates | join(") and (") ~ ")"}} + + {% if unique_key %} + when matched then update set + {% for column_name in update_columns -%} + DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {% endif %} + + when not matched then insert + ({{ dest_cols_csv }}) + values ( + {% for column in dest_cols_names %} + DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} + {% endfor %}); + +{% endmacro %} + + +{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not string %} + delete from {{target }} + using {{ source }} + where ( + {% for key in unique_key %} + {{ source }}.{{ key }} = {{ target }}.{{ key }} + {{ "and " if not loop.last}} + {% endfor %} + {% if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {% endif %} + ); + {% else %} + delete from {{ target }} + where ( + {{ unique_key }}) in ( + select ({{ unique_key }}) + from {{ source }} + ) + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%}; + + {% endif %} + {% endif %} + + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) + +{%- endmacro %} + + +{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%} + {{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates, include_sql_header) }} +{%- endmacro %} + +{% macro maxcompute__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} + {#-- The only time include_sql_header is True: --#} + {#-- BigQuery + insert_overwrite strategy + "static" partitions config --#} + {#-- We should consider including the sql header at the materialization level instead --#} + + {%- set predicates = [] if predicates is none else [] + predicates -%} + {%- set dest_cols_names = get_quoted_list(dest_columns | map(attribute="name")) -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none and include_sql_header }} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on FALSE + + when not matched by source + {% if predicates %} and {{ predicates | join(' and ') }} {% endif %} + then delete + + when not matched then insert + ({{ dest_cols_csv }}) + values ( + {% for column in dest_cols_names %} + DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} + {% endfor %}); +{% endmacro %} diff --git a/dbt/include/maxcompute/macros/materializations/snapshots/snapshot.sql b/dbt/include/maxcompute/macros/materializations/snapshots/snapshot.sql index 714f46a..16b6ffc 100644 --- a/dbt/include/maxcompute/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/maxcompute/macros/materializations/snapshots/snapshot.sql @@ -21,7 +21,7 @@ {% set select = snapshot_staging_table(strategy, sql, target_relation) %} {% call statement('build_snapshot_staging_relation') %} - {{ create_transactional_table_as(True, temp_relation, select) }} + {{ create_table_as_internal(True, temp_relation, select, True) }} {% endcall %} {% do return(temp_relation) %} @@ -109,7 +109,7 @@ {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} {% set build_or_select_sql = build_sql %} - {% set final_sql = create_transactional_table_as(False, target_relation, build_sql) %} + {% set final_sql = create_table_as_internal(False, target_relation, build_sql, True) %} {% else %} diff --git a/dbt/include/maxcompute/macros/relations/table/create.sql b/dbt/include/maxcompute/macros/relations/table/create.sql new file mode 100644 index 0000000..6b16611 --- /dev/null +++ b/dbt/include/maxcompute/macros/relations/table/create.sql @@ -0,0 +1,77 @@ +{% macro maxcompute__create_table_as(temporary, relation, sql) -%} + {%- set is_transactional = config.get('transactional') or config.get('delta') -%} + {%- set primary_keys = config.get('primary_keys') -%} + {%- set delta_table_bucket_num = config.get('delta_table_bucket_num', 16)-%} + {{ create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys, delta_table_bucket_num) }} +{%- endmacro %} + + +{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {{ sql_header if sql_header is not none }} + {%- set is_delta = is_transactional and primary_keys is not none and primary_keys|length > 0 -%} + + {% call statement('create_table', auto_begin=False) -%} + create table if not exists {{ relation.render() }} ( + {% set contract_config = config.get('contract') %} + {% if contract_config.enforced and (not temporary) %} + {{ get_assert_columns_equivalent(sql) }} + {{ get_table_columns_and_constraints_without_brackets() }} + {%- set sql = get_select_subquery(sql) %} + {%- else -%} + {{ get_table_columns(sql, primary_keys) }} + {%- endif -%} + {% if is_delta -%} + ,primary key( + {%- for pk in primary_keys -%} + {{ pk }}{{ "," if not loop.last }} + {%- endfor -%}) + {%- endif -%} + ) + {%- if is_transactional -%} + {%- if is_delta -%} + tblproperties("transactional"="true", "write.bucket.num"="{{ delta_table_bucket_num }}") + {%- else -%} + tblproperties("transactional"="true") + {%- endif -%} + {%- endif -%} + {%- if temporary %} + LIFECYCLE 1 + {%- endif %} + ; + {%- endcall -%} + + insert into {{ relation.render() }} ( + {% for c in get_column_schema_from_query(sql) -%} + `{{ c.name }}`{{ "," if not loop.last }} + {% endfor %} + )( + {{ sql }} + ); +{%- endmacro %} + + +{% macro get_table_columns(sql, primary_keys=none) -%} + {% set model_columns = model.columns %} + {% for c in get_column_schema_from_query(sql) -%} + {{ c.name }} {{ c.dtype }} + {% if primary_keys and c.name in primary_keys -%}not null{%- endif %} + {% if model_columns and c.name in model_columns -%} + {{ "COMMENT" }} '{{ model_columns[c.name].description }}' + {%- endif %} + {{ "," if not loop.last }} + {% endfor %} +{%- endmacro %} + +-- Compared to get_table_columns_and_constraints, only the surrounding brackets are deleted +{% macro get_table_columns_and_constraints_without_brackets() -%} + {# loop through user_provided_columns to create DDL with data types and constraints #} + {%- set raw_column_constraints = adapter.render_raw_columns_constraints(raw_columns=model['columns']) -%} + {%- set raw_model_constraints = adapter.render_raw_model_constraints(raw_constraints=model['constraints']) -%} + {% for c in raw_column_constraints -%} + {{ c }}{{ "," if not loop.last or raw_model_constraints }} + {% endfor %} + {% for c in raw_model_constraints -%} + {{ c }}{{ "," if not loop.last }} + {% endfor -%} +{%- endmacro %} diff --git a/dbt/include/maxcompute/macros/relations/view/create.sql b/dbt/include/maxcompute/macros/relations/view/create.sql new file mode 100644 index 0000000..a4ef081 --- /dev/null +++ b/dbt/include/maxcompute/macros/relations/view/create.sql @@ -0,0 +1,13 @@ +{% macro default__create_view_as(relation, sql) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + create or replace view {{ relation.render() }} + {% set contract_config = config.get('contract') %} + {% if contract_config.enforced %} + {{ get_assert_columns_equivalent(sql) }} + {%- endif %} + as ( + {{ sql }} + ); +{%- endmacro %} diff --git a/dbt/include/maxcompute/macros/utils/array_construct.sql b/dbt/include/maxcompute/macros/utils/array_construct.sql index 01daa7e..9553b76 100644 --- a/dbt/include/maxcompute/macros/utils/array_construct.sql +++ b/dbt/include/maxcompute/macros/utils/array_construct.sql @@ -1,4 +1,5 @@ {% macro maxcompute__array_construct(inputs, data_type) -%} + {% set data_type = data_type.lower() %} {%- if inputs|length > 0 -%} {%- if data_type == 'string' -%} array({{ '\"' + inputs|join('\", \"') + '\"' }}) diff --git a/dbt/include/maxcompute/macros/utils/date_trunc.sql b/dbt/include/maxcompute/macros/utils/date_trunc.sql index 1bdc443..4f8daa2 100644 --- a/dbt/include/maxcompute/macros/utils/date_trunc.sql +++ b/dbt/include/maxcompute/macros/utils/date_trunc.sql @@ -1,5 +1,6 @@ -- https://help.aliyun.com/zh/maxcompute/user-guide/datetrunc {% macro maxcompute__date_trunc(datepart, date) -%} + {% set datepart = datepart.lower() %} {%- if datepart in ['day', 'month', 'year', 'hour'] %} datetrunc({{date}}, '{{datepart}}') {%- elif datepart in ['minute', 'second'] -%} diff --git a/dbt/include/maxcompute/macros/utils/dateadd.sql b/dbt/include/maxcompute/macros/utils/dateadd.sql index 32a987a..561103a 100644 --- a/dbt/include/maxcompute/macros/utils/dateadd.sql +++ b/dbt/include/maxcompute/macros/utils/dateadd.sql @@ -1,4 +1,5 @@ {% macro maxcompute__dateadd(datepart, interval, from_date_or_timestamp) %} + {% set datepart = datepart.lower() %} {%- if datepart in ['day', 'month', 'year', 'hour'] %} dateadd({{ from_date_or_timestamp }}, {{ interval }}, '{{ datepart }}') {%- elif datepart == 'quarter' -%} diff --git a/dbt/include/maxcompute/macros/utils/datediff.sql b/dbt/include/maxcompute/macros/utils/datediff.sql index 3bf3391..aca0709 100644 --- a/dbt/include/maxcompute/macros/utils/datediff.sql +++ b/dbt/include/maxcompute/macros/utils/datediff.sql @@ -1,6 +1,7 @@ {% macro maxcompute__datediff(first_date, second_date, datepart) %} + {% set datepart = datepart.lower() %} {%- if datepart == 'day' -%} diff --git a/dbt/include/maxcompute/macros/utils/last_day.sql b/dbt/include/maxcompute/macros/utils/last_day.sql index 8e76e75..9215c44 100644 --- a/dbt/include/maxcompute/macros/utils/last_day.sql +++ b/dbt/include/maxcompute/macros/utils/last_day.sql @@ -1,4 +1,5 @@ {%- macro maxcompute_last_day(date, datepart) -%} + {% set datepart = datepart.lower() %} {%- if datepart == 'quarter' -%} {{ exceptions.raise_compiler_error("macro last_day not support for datepart ~ '" ~ datepart ~ "'") }} {%- else -%} diff --git a/tests/functional/adapter/test_aliases.py b/tests/functional/adapter/test_aliases.py index 75bf872..af02503 100644 --- a/tests/functional/adapter/test_aliases.py +++ b/tests/functional/adapter/test_aliases.py @@ -1,5 +1,5 @@ import pytest - +from dbt.tests.adapter.aliases import fixtures from dbt.tests.adapter.aliases.test_aliases import ( BaseAliases, BaseAliasErrors, @@ -7,22 +7,61 @@ BaseSameAliasDifferentDatabases, ) +# macros # +MACROS__CAST_SQL = """ +{% macro string_literal(s) -%} + {{ adapter.dispatch('string_literal', macro_namespace='test')(s) }} +{%- endmacro %} + +{% macro default__string_literal(s) %} + '{{ s }}' +{% endmacro %} + +""" + -@pytest.mark.skip(reason="Not yet test") class TestAliasesMaxCompute(BaseAliases): + @pytest.fixture(scope="class") + def macros(self): + return { + "cast.sql": MACROS__CAST_SQL, + "expect_value.sql": fixtures.MACROS__EXPECT_VALUE_SQL, + } + pass -@pytest.mark.skip(reason="Not yet test") class TestAliasErrorsMaxCompute(BaseAliasErrors): + @pytest.fixture(scope="class") + def macros(self): + return { + "cast.sql": MACROS__CAST_SQL, + "expect_value.sql": fixtures.MACROS__EXPECT_VALUE_SQL, + } + pass -@pytest.mark.skip(reason="Not yet test") class TestSameAliasDifferentSchemasMaxCompute(BaseSameAliasDifferentSchemas): + @pytest.fixture(scope="class") + def macros(self): + return { + "cast.sql": MACROS__CAST_SQL, + "expect_value.sql": fixtures.MACROS__EXPECT_VALUE_SQL, + } + pass -@pytest.mark.skip(reason="Not yet test") +@pytest.mark.skip( + reason="The unstable case is not a problem with dbt-adapter, needs to be solved by server." +) class TestSameAliasDifferentDatabasesMaxCompute(BaseSameAliasDifferentDatabases): + @pytest.fixture(scope="class") + def macros(self): + return { + "cast.sql": MACROS__CAST_SQL, + "expect_value.sql": fixtures.MACROS__EXPECT_VALUE_SQL, + } + pass diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index 378c626..f0b70fa 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -14,6 +14,7 @@ from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod # additional basic tests +from dbt.tests.adapter.basic.test_docs_generate import BaseDocsGenerate, BaseDocsGenReferences from dbt.tests.adapter.basic.test_table_materialization import BaseTableMaterialization from dbt.tests.adapter.basic.test_validate_connection import BaseValidateConnection @@ -98,6 +99,47 @@ class TestBaseAdapterMethodMaxCompute(BaseAdapterMethod): """ +@pytest.mark.skip(reason="Test expect model docs have no comment, don't know why.") +class TestDocsGenerateMaxCompute(BaseDocsGenerate): + @pytest.fixture(scope="class") + def expected_catalog(self, project, profile_user): + from dbt.tests.adapter.basic import expected_catalog + + return expected_catalog.base_expected_catalog( + project, + role=profile_user, + id_type="bigint", + text_type="string", + time_type="timestamp", + view_type="VIEW", + table_type="TABLE", + model_stats=expected_catalog.no_stats(), + ) + + pass + + +@pytest.mark.skip(reason="Test expect model docs have no comment, don't know why.") +class TestDocsGenReferencesMaxCompute(BaseDocsGenReferences): + @pytest.fixture(scope="class") + def expected_catalog(self, project, profile_user): + from dbt.tests.adapter.basic import expected_catalog + + return expected_catalog.expected_references_catalog( + project, + role=profile_user, + id_type="bigint", + text_type="string", + time_type="timestamp", + bigint_type="bigint", + view_type="VIEW", + table_type="TABLE", + model_stats=expected_catalog.no_stats(), + ) + + pass + + class TestTableMaterializationMaxCompute(BaseTableMaterialization): pass diff --git a/tests/functional/adapter/test_caching.py b/tests/functional/adapter/test_caching.py new file mode 100644 index 0000000..056c9be --- /dev/null +++ b/tests/functional/adapter/test_caching.py @@ -0,0 +1,22 @@ +from dbt.tests.adapter.caching.test_caching import ( + BaseCachingLowercaseModel, + BaseCachingUppercaseModel, + BaseNoPopulateCache, + BaseCachingSelectedSchemaOnly, +) + + +class TestCachingLowerCaseModel(BaseCachingLowercaseModel): + pass + + +class TestCachingUppercaseModel(BaseCachingUppercaseModel): + pass + + +class TestCachingSelectedSchemaOnly(BaseCachingSelectedSchemaOnly): + pass + + +class TestNoPopulateCache(BaseNoPopulateCache): + pass diff --git a/tests/functional/adapter/test_case_insensitivity.py b/tests/functional/adapter/test_case_insensitivity.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/functional/adapter/test_column_types.py b/tests/functional/adapter/test_column_types.py new file mode 100644 index 0000000..baa715f --- /dev/null +++ b/tests/functional/adapter/test_column_types.py @@ -0,0 +1,44 @@ +import pytest +from dbt.tests.adapter.column_types.test_column_types import BaseColumnTypes + +model_sql = """ +select + CAST(1 AS smallint) as smallint_col, + CAST(2 AS int) as int_col, + CAST(3 AS bigint) as bigint_col, + CAST(4.0 AS float) as real_col, + CAST(5.0 AS double) as double_col, + CAST(6.0 AS decimal) as numeric_col, + CAST('7' AS STRING) as text_col, + CAST('8' AS varchar(20)) as varchar_col +""" + +schema_yml = """ +version: 2 +models: + - name: model + data_tests: + - is_type: + column_map: + smallint_col: ['integer', 'number'] + int_col: ['integer', 'number'] + bigint_col: ['integer', 'number'] + real_col: ['float', 'number'] + double_col: ['float', 'number'] + numeric_col: ['numeric', 'number'] + text_col: ['string', 'not number'] + varchar_col: ['string', 'not number'] +""" + + +class BaseMaxComputeColumnTypes(BaseColumnTypes): + @pytest.fixture(scope="class") + def models(self): + return {"model.sql": model_sql, "schema.yml": schema_yml} + + def test_run_and_test(self, project): + self.run_and_test() + + +class TestMaxComputeColumnTypes(BaseMaxComputeColumnTypes): + pass diff --git a/tests/functional/adapter/test_concurrency.py b/tests/functional/adapter/test_concurrency.py new file mode 100644 index 0000000..c713a60 --- /dev/null +++ b/tests/functional/adapter/test_concurrency.py @@ -0,0 +1,5 @@ +from dbt.tests.adapter.concurrency.test_concurrency import BaseConcurrency + + +class TestConcurrencyMaxCompute(BaseConcurrency): + pass diff --git a/tests/functional/adapter/test_constraints.py b/tests/functional/adapter/test_constraints.py new file mode 100644 index 0000000..5cef84b --- /dev/null +++ b/tests/functional/adapter/test_constraints.py @@ -0,0 +1,145 @@ +import re + +import pytest +from dbt.tests.adapter.constraints.test_constraints import ( + BaseTableConstraintsColumnsEqual, + BaseViewConstraintsColumnsEqual, + BaseIncrementalConstraintsColumnsEqual, + BaseConstraintsRuntimeDdlEnforcement, + BaseConstraintsRollback, + BaseIncrementalConstraintsRuntimeDdlEnforcement, + BaseIncrementalConstraintsRollback, + BaseTableContractSqlHeader, + BaseIncrementalContractSqlHeader, + BaseModelConstraintsRuntimeEnforcement, + BaseConstraintQuotedColumn, + BaseIncrementalForeignKeyConstraint, +) + + +class TestTableConstraintsColumnsEqual(BaseTableConstraintsColumnsEqual): + @pytest.fixture + def string_type(self): + return "string" + + @pytest.fixture + def int_type(self): + return "int" + + @pytest.fixture + def data_types(self, schema_int_type, int_type, string_type): + # sql_column_value, schema_data_type, error_data_type + return [ + ["1", schema_int_type, int_type], + ["'1'", string_type, string_type], + ["true", "bool", "bool"], + ["timestamp'2013-11-03 00:00:00.07'", "timestamp", "timestamp"], + ["1BD", "decimal(1,0)", "decimal(1,0)"], + ] + + pass + + +class TestViewConstraintsColumnsEqual(BaseViewConstraintsColumnsEqual): + @pytest.fixture + def string_type(self): + return "string" + + @pytest.fixture + def int_type(self): + return "int" + + @pytest.fixture + def data_types(self, schema_int_type, int_type, string_type): + # sql_column_value, schema_data_type, error_data_type + return [ + ["1", schema_int_type, int_type], + ["'1'", string_type, string_type], + ["true", "bool", "bool"], + ["timestamp'2013-11-03 00:00:00.07'", "timestamp", "timestamp"], + ["1BD", "decimal(1,0)", "decimal(1,0)"], + ] + + pass + + +class TestIncrementalConstraintsColumnsEqual(BaseIncrementalConstraintsColumnsEqual): + @pytest.fixture + def string_type(self): + return "string" + + @pytest.fixture + def int_type(self): + return "int" + + @pytest.fixture + def data_types(self, schema_int_type, int_type, string_type): + # sql_column_value, schema_data_type, error_data_type + return [ + ["1", schema_int_type, int_type], + ["'1'", string_type, string_type], + ["true", "bool", "bool"], + ["timestamp'2013-11-03 00:00:00.07'", "timestamp", "timestamp"], + ["1BD", "decimal(1,0)", "decimal(1,0)"], + ] + + pass + + +@pytest.mark.skip(reason="See comments.") +class TestTableConstraintsRuntimeDdlEnforcement(BaseConstraintsRuntimeDdlEnforcement): + """ + This test will pass by modify the expected_sql. + + However, dbt-maxcompute is not-support for all the tested constraints. + This test is meaningless. + """ + + pass + + +class TestTableConstraintsRollback(BaseConstraintsRollback): + @pytest.fixture(scope="class") + def expected_error_messages(self): + return ["Can't insert a null value into not-null column"] + + pass + + +@pytest.mark.skip(reason="Like TestTableConstraintsRuntimeDdlEnforcement") +class TestIncrementalConstraintsRuntimeDdlEnforcement( + BaseIncrementalConstraintsRuntimeDdlEnforcement +): + pass + + +class TestIncrementalConstraintsRollback(BaseIncrementalConstraintsRollback): + @pytest.fixture(scope="class") + def expected_error_messages(self): + return ["Can't insert a null value into not-null column"] + + pass + + +@pytest.mark.skip(reason="Support, but set timezone is not support.") +class TestTableContractSqlHeader(BaseTableContractSqlHeader): + pass + + +@pytest.mark.skip(reason="Support, but set timezone is not support.") +class TestIncrementalContractSqlHeader(BaseIncrementalContractSqlHeader): + pass + + +@pytest.mark.skip(reason="Like TestTableConstraintsRuntimeDdlEnforcement") +class TestModelConstraintsRuntimeEnforcement(BaseModelConstraintsRuntimeEnforcement): + pass + + +@pytest.mark.skip(reason="Like TestTableConstraintsRuntimeDdlEnforcement") +class TestConstraintQuotedColumn(BaseConstraintQuotedColumn): + pass + + +class TestIncrementalForeignKeyConstraint(BaseIncrementalForeignKeyConstraint): + pass diff --git a/tests/functional/adapter/test_dbt_debug.py b/tests/functional/adapter/test_dbt_debug.py new file mode 100644 index 0000000..9906158 --- /dev/null +++ b/tests/functional/adapter/test_dbt_debug.py @@ -0,0 +1,12 @@ +from dbt.tests.adapter.dbt_debug.test_dbt_debug import ( + BaseDebugPostgres, + BaseDebugInvalidProjectPostgres, +) + + +class TestDebugMaxCompute(BaseDebugPostgres): + pass + + +class TestDebugInvalidProjectPostgres(BaseDebugInvalidProjectPostgres): + pass diff --git a/tests/functional/adapter/test_incremental.py b/tests/functional/adapter/test_incremental.py new file mode 100644 index 0000000..8a4a59b --- /dev/null +++ b/tests/functional/adapter/test_incremental.py @@ -0,0 +1,161 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_unique_id import BaseIncrementalUniqueKey +from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import ( + BaseMergeExcludeColumns, +) +from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates +from dbt.tests.adapter.incremental.test_incremental_on_schema_change import ( + BaseIncrementalOnSchemaChange, +) +from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch + + +class TestMergeExcludeColumnsMaxCompute(BaseMergeExcludeColumns): + pass + + +@pytest.mark.skip(reason="MaxCompute Api not support freeze time.") +class TestMicrobatchMaxCompute(BaseMicrobatch): + pass + + +class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange): + pass + + +class TestIncrementalPredicatesDeleteInsert(BaseIncrementalPredicates): + pass + + +class TestPredicatesDeleteInsert(BaseIncrementalPredicates): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"+predicates": ["id != 2"], "+incremental_strategy": "delete+insert"}} + + +seeds__add_new_rows_sql = """ +-- Insert statement which when applied to seed.csv sees incremental model +-- grow in size while not (necessarily) diverging from the seed itself. + +-- insert two new rows, both of which should be in incremental model +-- with any unique columns +insert into {schema}.seed + (state, county, city, last_visit_date) +values ('WA','King','Seattle',DATE'2022-02-01'); + +insert into {schema}.seed + (state, county, city, last_visit_date) +values ('CA','Los Angeles','Los Angeles',DATE'2022-02-01'); + +""" +seeds__duplicate_insert_sql = """ +-- Insert statement which when applied to seed.csv triggers the inplace +-- overwrite strategy of incremental models. Seed and incremental model +-- diverge. + +-- insert new row, which should not be in incremental model +-- with primary or first three columns unique +insert into {schema}.seed + (state, county, city, last_visit_date) +values ('CT','Hartford','Hartford',DATE'2022-02-14'); + +""" +models__expected__one_str__overwrite_sql = """ +{{ + config( + materialized='table' + ) +}} + +select + 'CT' as state, + 'Hartford' as county, + 'Hartford' as city, + cast('2022-02-14' as date) as last_visit_date +union all +select 'MA','Suffolk','Boston',DATE'2020-02-12' +union all +select 'NJ','Mercer','Trenton',DATE'2022-01-01' +union all +select 'NY','Kings','Brooklyn',DATE'2021-04-02' +union all +select 'NY','New York','Manhattan',DATE'2021-04-01' +union all +select 'PA','Philadelphia','Philadelphia',DATE'2021-05-21' + +""" + +models__expected__unique_key_list__inplace_overwrite_sql = """ +{{ + config( + materialized='table' + ) +}} + +select + 'CT' as state, + 'Hartford' as county, + 'Hartford' as city, + cast('2022-02-14' as date) as last_visit_date +union all +select 'MA','Suffolk','Boston',DATE'2020-02-12' +union all +select 'NJ','Mercer','Trenton',DATE'2022-01-01' +union all +select 'NY','Kings','Brooklyn',DATE'2021-04-02' +union all +select 'NY','New York','Manhattan',DATE'2021-04-01' +union all +select 'PA','Philadelphia','Philadelphia',DATE'2021-05-21' + +""" + + +@pytest.mark.skip( + reason="The function is ok, but it cannot run successfully due to some reasons on the server side." +) +class TestIncrementalUniqueKey(BaseIncrementalUniqueKey): + + @pytest.fixture(scope="class") + def models(self): + from dbt.tests.adapter.incremental.test_incremental_unique_id import ( + models__empty_str_unique_key_sql, + models__str_unique_key_sql, + models__duplicated_unary_unique_key_list_sql, + models__not_found_unique_key_list_sql, + models__not_found_unique_key_sql, + models__empty_unique_key_list_sql, + models__no_unique_key_sql, + models__trinary_unique_key_list_sql, + models__nontyped_trinary_unique_key_list_sql, + models__unary_unique_key_list_sql, + ) + + return { + "trinary_unique_key_list.sql": models__trinary_unique_key_list_sql, + "nontyped_trinary_unique_key_list.sql": models__nontyped_trinary_unique_key_list_sql, + "unary_unique_key_list.sql": models__unary_unique_key_list_sql, + "not_found_unique_key.sql": models__not_found_unique_key_sql, + "empty_unique_key_list.sql": models__empty_unique_key_list_sql, + "no_unique_key.sql": models__no_unique_key_sql, + "empty_str_unique_key.sql": models__empty_str_unique_key_sql, + "str_unique_key.sql": models__str_unique_key_sql, + "duplicated_unary_unique_key_list.sql": models__duplicated_unary_unique_key_list_sql, + "not_found_unique_key_list.sql": models__not_found_unique_key_list_sql, + "expected": { + "one_str__overwrite.sql": models__expected__one_str__overwrite_sql, + "unique_key_list__inplace_overwrite.sql": models__expected__unique_key_list__inplace_overwrite_sql, + }, + } + + @pytest.fixture(scope="class") + def seeds(self): + from dbt.tests.adapter.incremental.test_incremental_unique_id import seeds__seed_csv + + return { + "duplicate_insert.sql": seeds__duplicate_insert_sql, + "seed.csv": seeds__seed_csv, + "add_new_rows.sql": seeds__add_new_rows_sql, + } + + pass diff --git a/tests/functional/adapter/test_relations.py b/tests/functional/adapter/test_relations.py new file mode 100644 index 0000000..213eff5 --- /dev/null +++ b/tests/functional/adapter/test_relations.py @@ -0,0 +1,10 @@ +from dbt.tests.adapter.relations.test_changing_relation_type import BaseChangeRelationTypeValidator +from dbt.tests.adapter.relations.test_dropping_schema_named import BaseDropSchemaNamed + + +class TestChangeRelationTypes(BaseChangeRelationTypeValidator): + pass + + +class TestDropSchemaNamed(BaseDropSchemaNamed): + pass diff --git a/tests/functional/adapter/test_type.py b/tests/functional/adapter/test_type.py new file mode 100644 index 0000000..2d9acfd --- /dev/null +++ b/tests/functional/adapter/test_type.py @@ -0,0 +1,92 @@ +import pytest +from dbt.tests.adapter.utils.data_types.test_type_int import BaseTypeInt +from dbt.tests.adapter.utils.data_types.test_type_bigint import BaseTypeBigInt +from dbt.tests.adapter.utils.data_types.test_type_boolean import BaseTypeBoolean +from dbt.tests.adapter.utils.data_types.test_type_float import BaseTypeFloat +from dbt.tests.adapter.utils.data_types.test_type_numeric import BaseTypeNumeric +from dbt.tests.adapter.utils.data_types.test_type_string import BaseTypeString +from dbt.tests.adapter.utils.data_types.test_type_timestamp import BaseTypeTimestamp + + +class TestTypeBigIntMaxCompute(BaseTypeBigInt): + pass + + +class TestTypeBoolean(BaseTypeBoolean): + pass + + +schema_seed_float__yml = """ +seeds: + - name: expected + config: + column_types: { + float_col: 'float' + } +""" + + +class TestTypeFloat(BaseTypeFloat): + @pytest.fixture(scope="class") + def models(self): + from dbt.tests.adapter.utils.data_types.test_type_float import models__actual_sql + + return { + "actual.sql": self.interpolate_macro_namespace(models__actual_sql, "type_float"), + "schema.yml": schema_seed_float__yml, + } + + pass + + +schema_seed_int__yml = """ +seeds: + - name: expected + config: + column_types: { + int_col: 'int' + } +""" + + +class TestTypeInt(BaseTypeInt): + @pytest.fixture(scope="class") + def models(self): + from dbt.tests.adapter.utils.data_types.test_type_int import models__actual_sql + + return { + "actual.sql": self.interpolate_macro_namespace(models__actual_sql, "type_int"), + "schema.yml": schema_seed_int__yml, + } + + pass + + +schema_seed_numeric__yml = """ +seeds: + - name: expected + config: + column_types: + numeric_col: 'decimal(28,6)' +""" + + +class TestTypeNumeric(BaseTypeNumeric): + def numeric_fixture_type(self): + return "decimal(28,6)" + + @pytest.fixture(scope="class") + def seeds(self): + from dbt.tests.adapter.utils.data_types.test_type_numeric import seeds__expected_csv + + return {"expected.csv": seeds__expected_csv, "schema.yml": schema_seed_numeric__yml} + + pass + + +class TestTypeString(BaseTypeString): + pass + + +class TestTypeTimestamp(BaseTypeTimestamp): + pass diff --git a/tests/functional/adapter/test_unit_testings.py b/tests/functional/adapter/test_unit_testings.py new file mode 100644 index 0000000..e3d751b --- /dev/null +++ b/tests/functional/adapter/test_unit_testings.py @@ -0,0 +1,28 @@ +import pytest +from dbt.tests.adapter.unit_testing.test_case_insensitivity import BaseUnitTestCaseInsensivity +from dbt.tests.adapter.unit_testing.test_invalid_input import BaseUnitTestInvalidInput +from dbt.tests.adapter.unit_testing.test_types import BaseUnitTestingTypes + + +class TestMaxComputeUnitTestCaseInsensitivity(BaseUnitTestCaseInsensivity): + pass + + +class TestMaxComputeUnitTestInvalidInput(BaseUnitTestInvalidInput): + pass + + +class TestMaxComputeUnitTestingTypes(BaseUnitTestingTypes): + @pytest.fixture + def data_types(self): + # sql_value, yaml_value + return [ + ["1", "1"], + ["'1'", "1"], + ["true", "true"], + ["DATE '2020-01-02'", "2020-01-02"], + ["TIMESTAMP'2013-11-03 00:00:00.0'", "2013-11-03 00:00:00-0"], + ["1BD", "1"], + ] + + pass diff --git a/tests/functional/maxcompute/test_delta_table.py b/tests/functional/maxcompute/test_delta_table.py new file mode 100644 index 0000000..46ca2a0 --- /dev/null +++ b/tests/functional/maxcompute/test_delta_table.py @@ -0,0 +1,75 @@ +import pytest +from dbt.tests.util import ( + run_dbt, +) + +seeds_base_csv = """ +id,name,some_date +1,Easton,1981-05-20T06:46:51 +2,Lillian,1978-09-03T18:10:33 +3,Jeremiah,1982-03-11T03:59:51 +4,Nolan,1976-05-06T20:21:35 +5,Hannah,1982-06-23T05:41:26 +6,Eleanor,1991-08-10T23:12:21 +7,Lily,1971-03-29T14:58:02 +8,Jonathan,1988-02-26T02:55:24 +9,Adrian,1994-02-09T13:14:23 +10,Nora,1976-03-01T16:51:39 +""".lstrip() + +models__sql = """ +{{ config( + materialized='table', + transactional=true, + primary_keys=['id'] +) }} +select * from {{ source('raw', 'seed') }} + +""".lstrip() + +schema_base_yml = """ +version: 2 +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: seed + identifier: "{{ var('seed_name', 'base') }}" +""" + + +class BaseTestDeltaTable: + + @pytest.fixture(scope="class") + def seeds(self): + return { + "base.csv": seeds_base_csv, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": models__sql, + "schema.yml": schema_base_yml, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "base", + } + + def test_base(self, project): + # seed command + results = run_dbt(["seed"]) + # seed result length + print(results) + + # run command + results = run_dbt() + # run result length + print(results) + + +class TestDeltaTable(BaseTestDeltaTable): + pass