Skip to content

Commit

Permalink
feat: support materialized view
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech committed Dec 24, 2024
1 parent 9daaee2 commit 3440faf
Show file tree
Hide file tree
Showing 20 changed files with 369 additions and 100 deletions.
4 changes: 3 additions & 1 deletion dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,10 @@ def _get_one_catalog_by_relations(
if not odps_table:
continue

if odps_table.is_virtual_view or odps_table.is_materialized_view:
if odps_table.is_virtual_view:
table_type = "VIEW"
elif odps_table.is_materialized_view:
table_type = "MATERIALIZED_VIEW"
else:
table_type = "TABLE"
table_comment = odps_table.comment
Expand Down
19 changes: 16 additions & 3 deletions dbt/adapters/maxcompute/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from typing import FrozenSet, Optional, TypeVar

from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.contracts.relation import RelationType, Path, Policy
from dbt.adapters.contracts.relation import RelationType, Path, Policy, RelationConfig
from odps.models import Table

from dbt.adapters.maxcompute.relation_configs._materialized_view import MaxComputeMaterializedViewConfig

Self = TypeVar("Self", bound="MaxComputeRelation")


Expand Down Expand Up @@ -40,6 +42,7 @@ def without_quote(self):
{
RelationType.View,
RelationType.Table,
RelationType.MaterializedView,
}
)
)
Expand All @@ -62,15 +65,25 @@ def from_odps_table(cls, table: Table):
schema = table.get_schema()
schema = schema.name if schema else "default"

is_view = table.is_virtual_view or table.is_materialized_view
table_type = RelationType.Table
if table.is_virtual_view:
table_type = RelationType.View
if table.is_materialized_view:
table_type = RelationType.MaterializedView

return cls.create(
database=table.project.name,
schema=schema,
identifier=table.name,
type=RelationType.View if is_view else RelationType.Table,
type=table_type,
)

@classmethod
def materialized_view_from_relation_config(
cls, relation_config: RelationConfig
) -> MaxComputeMaterializedViewConfig:
return MaxComputeMaterializedViewConfig.from_relation_config(relation_config)


@dataclass(frozen=True, eq=False, repr=False)
class MaxComputeInformationSchema(InformationSchema):
Expand Down
4 changes: 3 additions & 1 deletion dbt/adapters/maxcompute/relation_configs/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
)
from dbt.adapters.contracts.relation import ComponentName, RelationConfig

from dbt.adapters.maxcompute.utils import quote_ref

if TYPE_CHECKING:
# Indirectly imported via agate_helper, which is lazy loaded further downfile.
# Used by mypy for earlier type hints.
Expand Down Expand Up @@ -54,7 +56,7 @@ def parse_mc_table(cls, table: MaxComputeTable) -> Dict:
def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
if cls.include_policy().get_part(component) and value:
if cls.quote_policy().get_part(component):
return f'"{value}"'
return quote_ref(value)
return value.lower()
return None

Expand Down
99 changes: 99 additions & 0 deletions dbt/adapters/maxcompute/relation_configs/_materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, List

from dbt.adapters.contracts.relation import (
RelationConfig,
ComponentName,
)
from dbt.adapters.maxcompute.relation_configs._base import MaxComputeBaseRelationConfig
from dbt.adapters.maxcompute.relation_configs._partition import (
PartitionConfig,
)
from dbt.adapters.maxcompute.utils import quote_string, quote_ref


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class MaxComputeMaterializedViewConfig(MaxComputeBaseRelationConfig):
name: str
project: str
schema: str
lifecycle: Optional[int] = None
build_deferred: bool = False
columns: Optional[List[str]] = None
column_comment: Optional[Dict[str, str]] = None
disable_rewrite: bool = False
table_comment: Optional[str] = None
partition_by: Optional[PartitionConfig] = None
tblProperties: Optional[Dict[str, str]] = None

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> "MaxComputeMaterializedViewConfig":
# required
kwargs_dict: Dict[str, Any] = {
"name": cls._render_part(ComponentName.Identifier, config_dict["name"]),
"schema": cls._render_part(ComponentName.Schema, config_dict["schema"]),
"project": cls._render_part(ComponentName.Database, config_dict["project"]),
}
for key, value in config_dict.items():
if key in ["name", "schema", "project"]:
pass
kwargs_dict[key] = value

if partition := config_dict.get("partition_by"):
kwargs_dict.update({"partition_by": PartitionConfig.parse(partition)})

materialized_view: "MaxComputeMaterializedViewConfig" = super().from_dict(kwargs_dict)
return materialized_view

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
config_dict = {
"name": relation_config.identifier,
"schema": relation_config.schema,
"project": relation_config.database,
}
items = ["lifecycle", "build_deferred", "columns", "column_comment", "disable_rewrite", "table_comment",
"partition_by", "tblProperties"]

if relation_config:
for item in items:
if item in relation_config.config:
config_dict.update({item: relation_config.config[item]})
return config_dict

def get_coordinate(self) -> str:
if self.schema is None:
return f"{self.name}"
if self.project is None:
return f"{self.schema}.{self.name}"
return f"{self.project}.{self.schema}.{self.name}"

def create_table_sql(self) -> str:
sql = f"CREATE MATERIALIZED VIEW IF NOT EXISTS {self.get_coordinate()}\n"
if self.lifecycle and self.lifecycle > 0:
sql += f"LIFECYCLE {self.lifecycle}\n"
if self.build_deferred:
sql += "BUILD DEFERRED\n"
if self.columns and len(self.columns) > 0:
sql += "("
for column in self.columns:
if self.column_comment and column in self.column_comment:
sql += f"{quote_ref(column)} COMMENT {quote_string(self.column_comment[column])}"
else:
sql += f"{quote_ref(column)}"
sql += ", "
sql = sql[:-2]
sql += ")\n"
if self.disable_rewrite:
sql += " DISABLE REWRITE\n"
if self.table_comment:
sql += f"COMMENT {quote_string(self.table_comment)}\n"
if self.partition_by and len(self.partition_by.fields) > 0:
sql += f"PARTITIONED BY({self.partition_by.render(False)})\n"
if self.tblProperties and len(self.tblProperties) > 0:
sql += "TBLPROPERTIES( "
for k, v in self.tblProperties.items():
sql += f"\"{k}\"=\"{v}\", "
sql = sql[:-2]
sql += ")\n"
return sql
15 changes: 14 additions & 1 deletion dbt/adapters/maxcompute/relation_configs/_partition.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass, field
from typing import Optional, List
from typing import Optional, List, Dict, Any

import dbt_common.exceptions
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.dataclass_schema import dbtClassMixin


Expand Down Expand Up @@ -55,6 +56,18 @@ def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]:
f' Expected a dictionary with "fields" and "data_types" keys'
)

@classmethod
def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]:
"""
Parse model node into a raw config for `PartitionConfig.parse`
- Note:
This doesn't currently collect `time_ingestion_partitioning` and `copy_partitions`
because this was built for materialized views, which do not support those settings.
"""
config_dict: Dict[str, Any] = relation_config.config.extra.get("partition_by")
return config_dict

def post_validate(self):
if 0 < len(self.data_types) != len(self.fields):
raise dbt_common.exceptions.DbtValidationError(
Expand Down
10 changes: 10 additions & 0 deletions dbt/adapters/maxcompute/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
from odps.errors import ODPSError, NoSuchObject


def quote_string(value: str) -> str:
value.replace('\'', "\\'")
return f"'{value}'"


def quote_ref(value: str) -> str:
value.replace('`', "``")
return f"`{value}`"


def is_schema_not_found(e: ODPSError) -> bool:
if isinstance(e, NoSuchObject):
return True
Expand Down
1 change: 0 additions & 1 deletion dbt/adapters/maxcompute/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def remove_comments(input_string):
or e.code == "ODPS-0110061"
or e.code == "ODPS-0130131"
or e.code == "ODPS-0420111"
or e.code == "ODPS-0130071"
):
if i == retry_times - 1:
raise e
Expand Down
4 changes: 3 additions & 1 deletion dbt/include/maxcompute/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
{% if from_relation.is_table -%}
ALTER TABLE {{ from_relation.render() }}
RENAME TO {{ to_relation.identifier }};
{% else -%}
{% elif from_relation.is_view -%}
ALTER VIEW {{ from_relation.render() }}
RENAME TO {{ to_relation.identifier }};
{% else -%}
{{ get_rename_materialized_view_sql_2(from_relation, to_relation) }}
{% endif -%}
{% endmacro %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,13 @@
{% macro get_alter_materialized_view_as_sql(
{% macro maxcompute__get_alter_materialized_view_as_sql(
relation,
configuration_changes,
new_config,
sql,
existing_relation,
backup_relation,
intermediate_relation
) %}
{{- log('Applying ALTER to: ' ~ relation) -}}
{{- adapter.dispatch('get_alter_materialized_view_as_sql', 'dbt')(
relation,
configuration_changes,
sql,
existing_relation,
backup_relation,
intermediate_relation
) -}}
{{ get_replace_sql(existing_relation, relation, sql) }}
{% endmacro %}


{% macro default__get_alter_materialized_view_as_sql(
relation,
configuration_changes,
sql,
existing_relation,
backup_relation,
intermediate_relation
) %}
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
{% endmacro %}


{% macro get_materialized_view_configuration_changes(existing_relation, new_config) %}
/* {#
It's recommended that configuration changes be formatted as follows:
{"<change_category>": [{"action": "<name>", "context": ...}]}
For example:
{
"indexes": [
{"action": "drop", "context": "index_abc"},
{"action": "create", "context": {"columns": ["column_1", "column_2"], "type": "hash", "unique": True}},
],
}
Either way, `get_materialized_view_configuration_changes` needs to align with `get_alter_materialized_view_as_sql`.
#} */
{{- log('Determining configuration changes on: ' ~ existing_relation) -}}
{%- do return(adapter.dispatch('get_materialized_view_configuration_changes', 'dbt')(existing_relation, new_config)) -%}
{% endmacro %}


{% macro default__get_materialized_view_configuration_changes(existing_relation, new_config) %}
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
{% macro maxcompute__get_materialized_view_configuration_changes(existing_relation, new_config) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{% macro maxcompute__get_create_materialized_view_as_sql(relation, sql) %}

create materialized view if not exists {{ relation }}
as {{ sql }};

{%- set materialized_view = adapter.Relation.materialized_view_from_relation_config(config.model) -%}
{{ materialized_view.create_table_sql() }}
as ({{ sql }});
{% endmacro %}
18 changes: 5 additions & 13 deletions dbt/include/maxcompute/macros/relations/materialized_view/drop.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
{# /*
This was already implemented. Instead of creating a new macro that aligns with the standard,
this was reused and the default was maintained. This gets called by `drop_relation`, which
actually executes the drop, and `get_drop_sql`, which returns the template.
*/ #}

{% macro drop_materialized_view(relation) -%}
{{- adapter.dispatch('drop_materialized_view', 'dbt')(relation) -}}
{%- endmacro %}


{% macro default__drop_materialized_view(relation) -%}
drop materialized view if exists {{ relation.render() }} cascade
{% macro maxcompute__drop_materialized_view(relation) -%}
{% call statement(name="main") %}
{{- log("replace materialized view will drop it first and then recreate.") -}}
drop materialized view if exists {{ relation.render() }}
{% endcall %}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
{% macro refresh_materialized_view(relation) %}
{{- log('Applying REFRESH to: ' ~ relation) -}}
{{- adapter.dispatch('refresh_materialized_view', 'dbt')(relation) -}}
{% endmacro %}


{% macro default__refresh_materialized_view(relation) %}
{{ exceptions.raise_compiler_error("`refresh_materialized_view` has not been implemented for this adapter.") }}
{% macro maxcompute__refresh_materialized_view(relation) %}
ALTER MATERIALIZED VIEW {{ relation.render() }} REBUILD;
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
{% macro get_rename_materialized_view_sql(relation, new_name) %}
{{- adapter.dispatch('get_rename_materialized_view_sql', 'dbt')(relation, new_name) -}}
{% endmacro %}


{% macro default__get_rename_materialized_view_sql(relation, new_name) %}
{% macro maxcompute__get_rename_materialized_view_sql(relation, new_name) %}
{{ exceptions.raise_compiler_error(
"`get_rename_materialized_view_sql` has not been implemented for this adapter."
"maxcompute materialized view not support rename operation."
) }}
{% endmacro %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
{% macro get_replace_materialized_view_sql(relation, sql) %}
{{- adapter.dispatch('get_replace_materialized_view_sql', 'dbt')(relation, sql) -}}
{% endmacro %}


{% macro default__get_replace_materialized_view_sql(relation, sql) %}
{{ exceptions.raise_compiler_error(
"`get_replace_materialized_view_sql` has not been implemented for this adapter."
) }}
{% macro maxcompute__get_replace_materialized_view_sql(relation, sql) %}
{{ drop_materialized_view(relation) }}
{{ get_create_materialized_view_as_sql(relation, sql) }}
{% endmacro %}
5 changes: 5 additions & 0 deletions dbt/include/maxcompute/macros/relations/table/drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% macro maxcompute__drop_table(relation) %}
{% call statement(name="main") %}
drop table if exists {{ relation }}
{% endcall %}
{% endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/maxcompute/macros/relations/table/rename.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{%- macro maxcompute__get_rename_table_sql(relation, new_name) -%}
alter table {{ relation }} rename to {{ new_name }}
{%- endmacro -%}
5 changes: 5 additions & 0 deletions dbt/include/maxcompute/macros/relations/view/drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% macro maxcompute__drop_view(relation) %}
{% call statement(name="main") %}
drop view if exists {{ relation }}
{% endcall %}
{% endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/maxcompute/macros/relations/view/rename.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{%- macro maxcompute__get_rename_view_sql(relation, new_name) -%}
alter view {{ relation }} rename to {{ new_name }}
{%- endmacro -%}
Loading

0 comments on commit 3440faf

Please sign in to comment.