Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Views V2 #954

Open
wants to merge 15 commits into
base: 1.10.latest
Choose a base branch
from
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

- Support databricks OAuth M2M auth type. Updated OAuth readme doc with instructions.([827](https://github.com/databricks/dbt-databricks/pull/827))
- Introduced use_materialization_v2 flag for gating materialization revamps. ([844](https://github.com/databricks/dbt-databricks/pull/844))
- Introduce Tables V2, including safe_table_create which will not change the production table unless new data can safely be ingested ([927](https://github.com/databricks/dbt-databricks/pull/927))
- Introduce Tables V2, including use_safer_relation_operations which will not change the production table unless new data can safely be ingested ([927](https://github.com/databricks/dbt-databricks/pull/927))

### Under the Hood

Expand Down
96 changes: 64 additions & 32 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from dbt.adapters.databricks.relation_configs.table_format import TableFormat
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig
from dbt.adapters.databricks.relation_configs.view import ViewConfig
from dbt.adapters.databricks.utils import get_first_row, handle_missing_objects, redact_credentials
from dbt.adapters.relation_configs import RelationResults
from dbt.adapters.spark.impl import (
Expand Down Expand Up @@ -138,7 +139,9 @@ class DatabricksConfig(AdapterConfig):
target_alias: Optional[str] = None
source_alias: Optional[str] = None
merge_with_schema_evolution: Optional[bool] = None
safe_table_create: Optional[bool] = None
use_safer_relation_operations: Optional[bool] = None
incremental_apply_config_changes: Optional[bool] = None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was previously missing

view_update_via_alter: Optional[bool] = None


def get_identifier_list_string(table_names: set[str]) -> str:
Expand Down Expand Up @@ -724,7 +727,7 @@ def get_persist_doc_columns(
if name in columns:
config_column = columns[name]
if isinstance(config_column, dict):
comment = columns[name].get("description", "")
comment = columns[name].get("description")
elif hasattr(config_column, "description"):
comment = config_column.description
else:
Expand All @@ -736,31 +739,6 @@ def get_persist_doc_columns(

return return_columns

@available.parse(lambda *a, **k: {})
def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase:
if relation.type == DatabricksRelationType.MaterializedView:
return MaterializedViewAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.StreamingTable:
return StreamingTableAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.Table:
return IncrementalTableAPI.get_from_relation(self, relation)
else:
raise NotImplementedError(f"Relation type {relation.type} is not supported.")

@available.parse(lambda *a, **k: {})
def get_config_from_model(self, model: RelationConfig) -> DatabricksRelationConfigBase:
assert model.config, "Config was missing from relation"
if model.config.materialized == "materialized_view":
return MaterializedViewAPI.get_from_relation_config(model)
elif model.config.materialized == "streaming_table":
return StreamingTableAPI.get_from_relation_config(model)
elif model.config.materialized == "incremental":
return IncrementalTableAPI.get_from_relation_config(model)
else:
raise NotImplementedError(
f"Materialization {model.config.materialized} is not supported."
)

@available
def generate_unique_temporary_table_suffix(self, suffix_initial: str = "__dbt_tmp") -> str:
return f"{suffix_initial}_{str(uuid4())}"
Expand Down Expand Up @@ -790,6 +768,35 @@ def parse_columns_and_constraints(

return enriched_columns, parsed_constraints

@available.parse(lambda *a, **k: {})
def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase:
if relation.type == DatabricksRelationType.MaterializedView:
return MaterializedViewAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.StreamingTable:
return StreamingTableAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.Table:
return IncrementalTableAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.View:
return ViewAPI.get_from_relation(self, relation)
else:
raise NotImplementedError(f"Relation type {relation.type} is not supported.")

@available.parse(lambda *a, **k: {})
def get_config_from_model(self, model: RelationConfig) -> DatabricksRelationConfigBase:
assert model.config, "Config was missing from relation"
if model.config.materialized == "materialized_view":
return MaterializedViewAPI.get_from_relation_config(model)
elif model.config.materialized == "streaming_table":
return StreamingTableAPI.get_from_relation_config(model)
elif model.config.materialized == "incremental":
return IncrementalTableAPI.get_from_relation_config(model)
elif model.config.materialized == "view":
return ViewAPI.get_from_relation_config(model)
else:
raise NotImplementedError(
f"Materialization {model.config.materialized} is not supported."
)


@dataclass(frozen=True)
class RelationAPIBase(ABC, Generic[DatabricksRelationConfig]):
Expand Down Expand Up @@ -868,14 +875,12 @@ def _describe_relation(
)

kwargs = {"relation": relation}
results["information_schema.views"] = cls._get_information_schema_views(adapter, kwargs)
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
return results

@staticmethod
def _get_information_schema_views(adapter: DatabricksAdapter, kwargs: dict[str, Any]) -> "Row":
return get_first_row(adapter.execute_macro("get_view_description", kwargs=kwargs))


class StreamingTableAPI(DeltaLiveTableAPIBase[StreamingTableConfig]):
relation_type = DatabricksRelationType.StreamingTable
Expand Down Expand Up @@ -918,3 +923,30 @@ def _describe_relation(
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
return results


class ViewAPI(RelationAPIBase[ViewConfig]):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use something similar to MV/ST and Incremental to check for changes so that we can update views via ALTER.

relation_type = DatabricksRelationType.View

@classmethod
def config_type(cls) -> type[ViewConfig]:
return ViewConfig

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
) -> RelationResults:
results = {}
kwargs = {"relation": relation}

results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

kwargs = {"table_name": relation}
results["describe_extended"] = adapter.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs
)
return results
4 changes: 4 additions & 0 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def is_materialized_view(self) -> bool:
def is_streaming_table(self) -> bool:
return self.type == DatabricksRelationType.StreamingTable

@property
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was missing from a previous PR

def is_dlt(self) -> bool:
return self.is_materialized_view or self.is_streaming_table

@property
def is_delta(self) -> bool:
assert self.metadata is not None
Expand Down
13 changes: 11 additions & 2 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,23 @@ def from_results(cls, results: RelationResults) -> Self:

return cls(config=config_dict)

@abstractmethod
def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]:
"""Get the changeset that must be applied to the existing relation to make it match the
current state of the dbt project. If no changes are required, this method should return
None.
"""
changes: dict[str, DatabricksComponentConfig] = {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining a base since the incremental and view versions of this method are very similar


raise NotImplementedError("Must be implemented by subclass")
for component in self.config_components:
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if diff:
changes[key] = diff

if len(changes) > 0:
return DatabricksRelationChangeSet(changes=changes, requires_full_refresh=False)
return None


DatabricksRelationConfig = TypeVar("DatabricksRelationConfig", bound=DatabricksRelationConfigBase)
Expand Down
20 changes: 0 additions & 20 deletions dbt/adapters/databricks/relation_configs/incremental.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from typing import Optional

from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksRelationChangeSet,
DatabricksRelationConfigBase,
)
from dbt.adapters.databricks.relation_configs.tags import TagsProcessor
Expand All @@ -11,19 +7,3 @@

class IncrementalTableConfig(DatabricksRelationConfigBase):
config_components = [TagsProcessor, TblPropertiesProcessor]

def get_changeset(
self, existing: "IncrementalTableConfig"
) -> Optional[DatabricksRelationChangeSet]:
changes: dict[str, DatabricksComponentConfig] = {}

for component in self.config_components:
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if diff:
changes[key] = diff

if len(changes) > 0:
return DatabricksRelationChangeSet(changes=changes, requires_full_refresh=False)
return None
22 changes: 22 additions & 0 deletions dbt/adapters/databricks/relation_configs/view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Optional

from typing_extensions import Self

from dbt.adapters.databricks.relation_configs.base import (
DatabricksRelationChangeSet,
DatabricksRelationConfigBase,
)
from dbt.adapters.databricks.relation_configs.comment import CommentProcessor
from dbt.adapters.databricks.relation_configs.query import QueryProcessor
from dbt.adapters.databricks.relation_configs.tags import TagsProcessor
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesProcessor


class ViewConfig(DatabricksRelationConfigBase):
config_components = [TagsProcessor, TblPropertiesProcessor, QueryProcessor, CommentProcessor]

def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]:
changeset = super().get_changeset(existing)
if changeset and "comment" in changeset.changes:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason there is no mechanism in Databricks to alter the comment on a view ;(. Therefore, even if you want me to update the view via alter, if you update the description, we'll need to replace the view.

changeset.requires_full_refresh = True
return changeset
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

{% macro databricks__get_columns_in_query(select_sql) %}
{{ log("Getting column information via empty query")}}
{% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%}
{% call statement('get_columns_in_query', fetch_result=True) -%}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto_begin doesn't do anything in Databricks

{{ get_empty_subquery_sql(select_sql) }}
{% endcall %}
{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{% set comment = column['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
{% set comment_query %}
alter table {{ relation.render()|lower }} change column {{ api.Column.get_name(column) }} comment '{{ escaped_comment }}';
COMMENT ON COLUMN {{ relation.render()|lower }}.{{ api.Column.get_name(column) }} IS '{{ escaped_comment }}';
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This form works with tables and views

{% endset %}
{% do run_query(comment_query) %}
{% endfor %}
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/databricks/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% macro make_staging_relation(base_relation, suffix='__dbt_stg') %}
{% macro make_staging_relation(base_relation, suffix='__dbt_stg', type='table') %}
{% set stg_identifier = base_relation.identifier ~ suffix %}
{% set stg_relation = api.Relation.create(database=base_relation.database, schema=base_relation.schema, identifier=stg_identifier, type='table') %}
{% set stg_relation = api.Relation.create(database=base_relation.database, schema=base_relation.schema, identifier=stg_identifier, type=type) %}
{% do return(stg_relation) %}
{% endmacro %}

Expand Down
14 changes: 14 additions & 0 deletions dbt/include/databricks/macros/etc/statement.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,17 @@

{%- endif -%}
{%- endmacro %}

{% macro execute_multiple_statements(statements) %}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring out of MV/ST because now we have a third place that we need this pattern.

{%- if statements is string %}
{% call statement(name="main") %}
{{ statements }}
{% endcall %}
{%- else %}
{%- for sql in statements %}
{% call statement(name="main") %}
{{ sql }}
{% endcall %}
{% endfor %}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{% if adapter.behavior.use_materialization_v2 %}
{{ log("USING V2 MATERIALIZATION") }}
{#-- Set vars --#}
{% set safe_create = config.get('safe_table_create', True) | as_bool %}
{% set safe_create = config.get('use_safer_relation_operations', True) | as_bool %}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed this flag since now I want to use it for views too.

{% set should_replace = existing_relation.is_dlt or existing_relation.is_view or full_refresh %}
{% set is_replaceable = existing_relation.can_be_replaced and is_delta and config.get("location_root") %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

-- get config options
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %}
{% set configuration_changes = get_configuration_changes(existing_relation) %}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was able to make this macro more generic too.


{% if configuration_changes is none %}
{% set build_sql = refresh_materialized_view(target_relation) %}
Expand Down Expand Up @@ -75,18 +75,7 @@

{% set grant_config = config.get('grants') %}

{%- if build_sql is string %}
{% call statement(name="main") %}
{{ build_sql }}
{% endcall %}
{%- else %}
{%- for sql in build_sql %}
{% call statement(name="main") %}
{{ sql }}
{% endcall %}
{% endfor %}
{% endif %}

{{ execute_multiple_statements(build_sql) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

-- get config options
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_streaming_table_configuration_changes(existing_relation, config) %}
{% set configuration_changes = get_configuration_changes(existing_relation) %}
{% if configuration_changes is none %}
{% set build_sql = refresh_streaming_table(target_relation, sql) %}

Expand Down Expand Up @@ -74,18 +74,7 @@

{% set grant_config = config.get('grants') %}

{%- if build_sql is string %}
{% call statement(name="main") %}
{{ build_sql }}
{% endcall %}
{%- else %}
{%- for sql in build_sql %}
{% call statement(name="main") %}
{{ sql }}
{% endcall %}
{% endfor %}
{% endif %}

{{ execute_multiple_statements(build_sql) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{%- set grant_config = config.get('grants') -%}
{%- set tblproperties = config.get('tblproperties') -%}
{%- set tags = config.get('databricks_tags') -%}
{%- set safe_create = config.get('safe_table_create', True) %}
{%- set safe_create = config.get('use_safer_relation_operations', True) %}
{% set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) %}
{% set target_relation = this.incorporate(type='table') %}

Expand Down
Loading