From 9d4c2cf5aa341710d29cb317be199b5690d382bc Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 18 Jul 2024 20:55:48 -0500 Subject: [PATCH 01/27] Add default source nodes rendering --- cosmos/airflow/graph.py | 10 ++++ cosmos/core/airflow.py | 2 +- cosmos/dbt/graph.py | 46 ++++++++++++++++++- cosmos/operators/local.py | 9 ++++ cosmos/operators/virtualenv.py | 8 ++++ .../jaffle_shop/models/staging/sources.yml | 26 +++++++++++ .../models/staging/stg_customers.sql | 2 +- .../jaffle_shop/models/staging/stg_orders.sql | 2 +- .../models/staging/stg_payments.sql | 2 +- tests/dbt/test_graph.py | 14 +++--- tests/operators/test_local.py | 2 +- 11 files changed, 110 insertions(+), 13 deletions(-) create mode 100644 dev/dags/dbt/jaffle_shop/models/staging/sources.yml diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index ebae7f32f7..bbf1392ce9 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -142,6 +142,7 @@ def create_task_metadata( DbtResourceType.SNAPSHOT: "DbtSnapshot", DbtResourceType.SEED: "DbtSeed", DbtResourceType.TEST: "DbtTest", + DbtResourceType.SOURCE: "DbtSource", } args = {**args, **{"models": node.resource_name}} @@ -151,6 +152,15 @@ def create_task_metadata( task_id = f"{node.name}_run" if use_task_group is True: task_id = "run" + elif node.resource_type == DbtResourceType.SOURCE: + task_id = f"{node.name}_source" + args["select"] = f"source:{node.resource_name}" + args["models"] = None + if use_task_group is True: + task_id = node.resource_type.value + if node.has_freshness is False: + # render sources without freshness as empty operators + return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator") else: task_id = f"{node.name}_{node.resource_type.value}" if use_task_group is True: diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index d4a9624832..7355f777bc 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -29,7 +29,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None task_id=task.id, dag=dag, task_group=task_group, - extra_context=task.extra_context, + **({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}), **task.arguments, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 62b6d88bf0..b9fc98db43 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -48,7 +48,7 @@ class CosmosLoadDbtException(Exception): @dataclass class DbtNode: """ - Metadata related to a dbt node (e.g. model, seed, snapshot). + Metadata related to a dbt node (e.g. model, seed, snapshot, source). """ unique_id: str @@ -57,6 +57,7 @@ class DbtNode: file_path: Path tags: list[str] = field(default_factory=lambda: []) config: dict[str, Any] = field(default_factory=lambda: {}) + has_freshness: bool = False has_test: bool = False @property @@ -95,6 +96,30 @@ def context_dict(self) -> dict[str, Any]: } +def is_freshness_effective(freshness: dict[str, Any]) -> bool: + """Function to find if a source has null freshness. Scenarios where freshness + looks like: + "freshness": { + "warn_after": { + "count": null, + "period": null + }, + "error_after": { + "count": null, + "period": null + }, + "filter": null + } + should be considered as null, this function ensures that.""" + for _, value in freshness.items(): + if isinstance(value, dict): + if any(subvalue is not None for subvalue in value.values()): + return True + elif value is not None: + return True + return False + + def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) @@ -138,6 +163,11 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, file_path=project_path / node_dict["original_file_path"], tags=node_dict.get("tags", []), config=node_dict.get("config", {}), + has_freshness=( + is_freshness_effective(node_dict.get("freshness", False)) + if node_dict["resource_type"] == "source" + else False + ), ) nodes[node.unique_id] = node logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type) @@ -339,7 +369,14 @@ def run_dbt_ls( self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] ) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - ls_command = [dbt_cmd, "ls", "--output", "json"] + ls_command = [ + dbt_cmd, + "ls", + "--output", + "json", + "--output-keys", + "name alias unique_id resource_type depends_on original_file_path tags config freshness", + ] ls_args = self.dbt_ls_args ls_command.extend(self.local_flags) @@ -605,6 +642,11 @@ def load_from_dbt_manifest(self) -> None: file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], + has_freshness=( + node_dict["freshness"] is not None + if node_dict["resource_type"] == "source" and "freshness" in node_dict + else False + ), ) nodes[node.unique_id] = node diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4acaa94536..dd72d59805 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -816,3 +816,12 @@ def __init__(self, **kwargs: str) -> None: raise DeprecationWarning( "The DbtDepsOperator has been deprecated. " "Please use the `install_deps` flag in dbt_args instead." ) + + +class DbtSourceLocalOperator(DbtLocalBaseOperator): + """ + Executes a dbt source freshness command. + """ + + ui_color = "#34CCEB" + base_cmd = ["source", "freshness"] diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index cbe8c67e96..22cc76f7c7 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -18,6 +18,7 @@ DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, + DbtSourceLocalOperator, DbtTestLocalOperator, ) @@ -162,3 +163,10 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator) Executes `dbt docs generate` command within a Python Virtual Environment, that is created before running the dbt command and deleted just after. """ + + +class DbtSourceVirtualenvOperator(DbtVirtualenvBaseOperator, DbtSourceLocalOperator): + """ + Executes `dbt source freshness` command within a Python Virtual Environment, that is created before running the dbt + command and deleted just after. + """ diff --git a/dev/dags/dbt/jaffle_shop/models/staging/sources.yml b/dev/dags/dbt/jaffle_shop/models/staging/sources.yml new file mode 100644 index 0000000000..b45fbaba3d --- /dev/null +++ b/dev/dags/dbt/jaffle_shop/models/staging/sources.yml @@ -0,0 +1,26 @@ + +version: 2 + +sources: + - name: postgres_db + database: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + tables: + - name: raw_customers + columns: + - name: id + tests: + - unique + - not_null + - name: raw_payments + - name: raw_orders + columns: + - name: id + tests: + - unique + - not_null + freshness: + warn_after: + count: 3650 + period: day + loaded_at_field: CAST(order_date AS TIMESTAMP) diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql index cad0472695..a408d7868f 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql @@ -4,7 +4,7 @@ with source as ( Normally we would select from the table here, but we are using seeds to load our data in this project #} - select * from {{ ref('raw_customers') }} + select * from {{ source('postgres_db', 'raw_customers') }} ), diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql index a654dcb947..12e1b247ed 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql @@ -4,7 +4,7 @@ with source as ( Normally we would select from the table here, but we are using seeds to load our data in this project #} - select * from {{ ref('raw_orders') }} + select * from {{ source('postgres_db', 'raw_orders') }} ), diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql index f718596ad0..9171ac8034 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql @@ -4,7 +4,7 @@ with source as ( Normally we would select from the table here, but we are using seeds to load our data in this project #} - select * from {{ ref('raw_payments') }} + select * from {{ source('postgres_db', 'raw_payments') }} ), diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 05aad822ad..3d79affe1c 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -453,13 +453,15 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config): dbt_graph.load_via_dbt_ls() assert dbt_graph.nodes == dbt_graph.filtered_nodes # This test is dependent upon dbt >= 1.5.4 - assert len(dbt_graph.nodes) == 7 + assert len(dbt_graph.nodes) == 9 expected_keys = [ "model.jaffle_shop.customers", "model.jaffle_shop.stg_customers", "seed.jaffle_shop.raw_customers", "test.jaffle_shop.not_null_customers_customer_id.5c9bf9911d", "test.jaffle_shop.not_null_stg_customers_customer_id.e2cfb1f9aa", + "test.jaffle_shop.source_not_null_postgres_db_raw_customers_id.de3e9fff76", + "test.jaffle_shop.source_unique_postgres_db_raw_customers_id.6e5ad1d707", "test.jaffle_shop.unique_customers_customer_id.c5af1ff4b1", "test.jaffle_shop.unique_stg_customers_customer_id.c7614daada", ] @@ -492,7 +494,7 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): dbt_graph.load_via_dbt_ls() assert dbt_graph.nodes == dbt_graph.filtered_nodes - assert len(dbt_graph.nodes) == 28 + assert len(dbt_graph.nodes) == 35 def test_load_via_custom_without_project_path(): @@ -1276,11 +1278,11 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile dbt_graph.load_via_dbt_ls() filtered_nodes = dbt_graph.filtered_nodes.keys() - assert len(filtered_nodes) == 4 + assert len(filtered_nodes) == 6 assert "model.jaffle_shop.stg_customers" in filtered_nodes - assert "seed.jaffle_shop.raw_customers" in filtered_nodes - # Two tests should be filtered - assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 2 + assert "source.jaffle_shop.postgres_db.raw_customers" in filtered_nodes + # Four tests should be filtered + assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 4 @pytest.mark.parametrize( diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index ed46caf887..97dd9b817a 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -444,7 +444,7 @@ def test_run_operator_dataset_inlets_and_outlets(caplog): run_test_dag(dag) - assert run_operator.inlets == [] + assert run_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.raw_customers", extra=None)] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.outlets == [] From d925d4005f33bf8fa49f79eff5a3cbb07e4e2bee Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 18 Jul 2024 21:22:00 -0500 Subject: [PATCH 02/27] Update ResourceType reference with constant --- cosmos/dbt/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b9fc98db43..71b81a012e 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -165,7 +165,7 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, config=node_dict.get("config", {}), has_freshness=( is_freshness_effective(node_dict.get("freshness", False)) - if node_dict["resource_type"] == "source" + if node_dict["resource_type"] == DbtResourceType.SOURCE else False ), ) From 070b4d57fd48126bc4b3822555e3f871c61c9be4 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Sun, 21 Jul 2024 01:36:21 -0500 Subject: [PATCH 03/27] Add MixIn Operator + freshness template rendering --- cosmos/dbt/graph.py | 14 ++-- cosmos/operators/aws_eks.py | 7 ++ cosmos/operators/azure_container_instance.py | 7 ++ cosmos/operators/base.py | 9 +++ cosmos/operators/docker.py | 7 ++ cosmos/operators/kubernetes.py | 7 ++ cosmos/operators/local.py | 67 +++++++++++++++++--- cosmos/operators/virtualenv.py | 14 ++-- tests/dbt/test_graph.py | 2 +- 9 files changed, 110 insertions(+), 24 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 71b81a012e..724a8b7c97 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -13,7 +13,7 @@ from functools import cached_property from pathlib import Path from subprocess import PIPE, Popen -from typing import Any +from typing import Any, Optional from airflow.models import Variable @@ -96,7 +96,7 @@ def context_dict(self) -> dict[str, Any]: } -def is_freshness_effective(freshness: dict[str, Any]) -> bool: +def is_freshness_effective(freshness: Optional[dict[str, Any]]) -> bool: """Function to find if a source has null freshness. Scenarios where freshness looks like: "freshness": { @@ -111,6 +111,8 @@ def is_freshness_effective(freshness: dict[str, Any]) -> bool: "filter": null } should be considered as null, this function ensures that.""" + if freshness is None: + return False for _, value in freshness.items(): if isinstance(value, dict): if any(subvalue is not None for subvalue in value.values()): @@ -164,8 +166,8 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, tags=node_dict.get("tags", []), config=node_dict.get("config", {}), has_freshness=( - is_freshness_effective(node_dict.get("freshness", False)) - if node_dict["resource_type"] == DbtResourceType.SOURCE + is_freshness_effective(node_dict.get("freshness")) + if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE else False ), ) @@ -643,8 +645,8 @@ def load_from_dbt_manifest(self) -> None: tags=node_dict["tags"], config=node_dict["config"], has_freshness=( - node_dict["freshness"] is not None - if node_dict["resource_type"] == "source" and "freshness" in node_dict + is_freshness_effective(node_dict.get("freshness")) + if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE else False ), ) diff --git a/cosmos/operators/aws_eks.py b/cosmos/operators/aws_eks.py index 1800283783..0d98b2c63f 100644 --- a/cosmos/operators/aws_eks.py +++ b/cosmos/operators/aws_eks.py @@ -14,6 +14,7 @@ DbtRunOperationKubernetesOperator, DbtSeedKubernetesOperator, DbtSnapshotKubernetesOperator, + DbtSourceKubernetesOperator, DbtTestKubernetesOperator, ) @@ -101,6 +102,12 @@ class DbtSnapshotAwsEksOperator(DbtAwsEksBaseOperator, DbtSnapshotKubernetesOper """ +class DbtSourceAzureContainerInstanceOperator(DbtAwsEksBaseOperator, DbtSourceKubernetesOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunAwsEksOperator(DbtAwsEksBaseOperator, DbtRunKubernetesOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index d8427b2fbb..993d4315f0 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -13,6 +13,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -102,6 +103,12 @@ class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContai """ +class DbtSourceAzureContainerInstanceOperator(DbtSourceMixin, DbtAzureContainerInstanceBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core run command. diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index d0cbdd282a..37aae7a816 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -344,6 +344,15 @@ class DbtSnapshotMixin: ui_color = "#964B00" +class DbtSourceMixin: + """ + Executes a dbt source freshness command. + """ + + base_cmd = ["source", "freshness"] + ui_color = "#34CCEB" + + class DbtRunMixin: """ Mixin for dbt run command. diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 532de380e7..4abf9e9943 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -13,6 +13,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -94,6 +95,12 @@ class DbtSnapshotDockerOperator(DbtSnapshotMixin, DbtDockerBaseOperator): """ +class DbtSourceDockerOperator(DbtSourceMixin, DbtDockerBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunDockerOperator(DbtRunMixin, DbtDockerBaseOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index f842191998..ef69cd561e 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -17,6 +17,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -125,6 +126,12 @@ class DbtSnapshotKubernetesOperator(DbtSnapshotMixin, DbtKubernetesBaseOperator) """ +class DbtSourceKubernetesOperator(DbtSourceMixin, DbtKubernetesBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator): """ Executes a dbt core run command. diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index dd72d59805..b8dcfbff39 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os import tempfile import warnings @@ -62,6 +63,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -110,9 +112,10 @@ class DbtLocalBaseOperator(AbstractDbtBaseOperator): and does not inherit the current process environment. """ - template_fields: Sequence[str] = AbstractDbtBaseOperator.template_fields + ("compiled_sql",) # type: ignore[operator] + template_fields: Sequence[str] = AbstractDbtBaseOperator.template_fields + ("compiled_sql", "freshness") # type: ignore[operator] template_fields_renderers = { "compiled_sql": "sql", + "freshness": "json", } def __init__( @@ -128,6 +131,7 @@ def __init__( self.profile_config = profile_config self.callback = callback self.compiled_sql = "" + self.freshness = "" self.should_store_compiled_sql = should_store_compiled_sql self.openlineage_events_completes: list[RunEvent] = [] self.invocation_mode = invocation_mode @@ -243,6 +247,51 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context, session: Se else: logger.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.") + @provide_session + def store_freshness_json(self, tmp_project_dir: str, context: Context, session: Session = NEW_SESSION) -> None: + """ + Takes the compiled sources.json file from the dbt source freshness and stores it in the freshness rendered template. + Gets called after every dbt run / source freshness. + """ + if not self.should_store_compiled_sql: + return + + sources_json_path = Path(os.path.join(tmp_project_dir, "target", "sources.json")) + + if sources_json_path.exists(): + sources_json_content = sources_json_path.read_text(encoding="utf-8").strip() + + sources_data = json.loads(sources_json_content) + + formatted_sources_json = json.dumps(sources_data, indent=4) + + self.freshness = formatted_sources_json + + else: + self.freshness = "" + + # need to refresh the rendered task field record in the db because Airflow only does this + # before executing the task, not after + from airflow.models.renderedtifields import RenderedTaskInstanceFields + + ti = context["ti"] + + if isinstance(ti, TaskInstance): # verifies ti is a TaskInstance in order to access and use the "task" field + if TYPE_CHECKING: + assert ti.task is not None + ti.task.template_fields = self.template_fields + rtif = RenderedTaskInstanceFields(ti, render_templates=False) + + # delete the old records + session.query(RenderedTaskInstanceFields).filter( + RenderedTaskInstanceFields.dag_id == self.dag_id, + RenderedTaskInstanceFields.task_id == self.task_id, + RenderedTaskInstanceFields.run_id == ti.run_id, + ).delete() + session.add(rtif) + else: + logger.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.") + def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: logger.info("Trying to run the command:\n %s\nFrom %s", command, cwd) subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( @@ -355,6 +404,7 @@ def run_command( if partial_parse_file.exists(): cache._update_partial_parse_cache(partial_parse_file, self.cache_dir) + self.store_freshness_json(tmp_project_dir, context) self.store_compiled_sql(tmp_project_dir, context) self.handle_exception(result) if self.callback: @@ -525,6 +575,12 @@ class DbtSnapshotLocalOperator(DbtSnapshotMixin, DbtLocalBaseOperator): """ +class DbtSourceLocalOperator(DbtSourceMixin, DbtLocalBaseOperator): + """ + Executes a dbt source freshness command. + """ + + class DbtRunLocalOperator(DbtRunMixin, DbtLocalBaseOperator): """ Executes a dbt core run command. @@ -816,12 +872,3 @@ def __init__(self, **kwargs: str) -> None: raise DeprecationWarning( "The DbtDepsOperator has been deprecated. " "Please use the `install_deps` flag in dbt_args instead." ) - - -class DbtSourceLocalOperator(DbtLocalBaseOperator): - """ - Executes a dbt source freshness command. - """ - - ui_color = "#34CCEB" - base_cmd = ["source", "freshness"] diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 22cc76f7c7..4b1a97e6ce 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -137,6 +137,13 @@ class DbtSnapshotVirtualenvOperator(DbtVirtualenvBaseOperator, DbtSnapshotLocalO """ +class DbtSourceVirtualenvOperator(DbtVirtualenvBaseOperator, DbtSourceLocalOperator): + """ + Executes `dbt source freshness` command within a Python Virtual Environment, that is created before running the dbt + command and deleted just after. + """ + + class DbtRunVirtualenvOperator(DbtVirtualenvBaseOperator, DbtRunLocalOperator): # type: ignore[misc] """ Executes a dbt core run command within a Python Virtual Environment, that is created before running the dbt command @@ -163,10 +170,3 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator) Executes `dbt docs generate` command within a Python Virtual Environment, that is created before running the dbt command and deleted just after. """ - - -class DbtSourceVirtualenvOperator(DbtVirtualenvBaseOperator, DbtSourceLocalOperator): - """ - Executes `dbt source freshness` command within a Python Virtual Environment, that is created before running the dbt - command and deleted just after. - """ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 3d79affe1c..d499a2673d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1391,7 +1391,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir if sys.platform == "darwin": assert hash_dir == "cdc6f0bec00f4edc616f3aa755a34330" else: - assert hash_dir == "77d08d6da374330ac1b49438ff2873f7" + assert hash_dir == "cd9c5005d75c876bed5ce53ef13c15fc" @pytest.mark.integration From 021a83431a5ee42ce31187d65a6928a32202bc6c Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Sun, 21 Jul 2024 21:26:50 -0500 Subject: [PATCH 04/27] Add render_source_nodes to settings.py --- cosmos/airflow/graph.py | 9 +++++++++ cosmos/settings.py | 1 + docs/configuration/cosmos-conf.rst | 8 ++++++++ 3 files changed, 18 insertions(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index bbf1392ce9..8374275421 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -19,6 +19,7 @@ from cosmos.core.graph.entities import Task as TaskMetadata from cosmos.dbt.graph import DbtNode from cosmos.log import get_logger +from cosmos.settings import render_source_nodes logger = get_logger(__name__) @@ -153,6 +154,14 @@ def create_task_metadata( if use_task_group is True: task_id = "run" elif node.resource_type == DbtResourceType.SOURCE: + if str(render_source_nodes) != "True": + msg = ( + "Source node rendering is currently disabled. To enable it, set the environment variable " + "AIRFLOW__COSMOS__RENDER_SOURCE_NODES=True. Enabling source node rendering may enhance the " + "visual representation of your DAGs." + ) + logger.warning(msg) + return None task_id = f"{node.name}_source" args["select"] = f"source:{node.resource_name}" args["models"] = None diff --git a/cosmos/settings.py b/cosmos/settings.py index 62d4ee5bdf..3737b53a29 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -19,6 +19,7 @@ dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html") enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") +render_source_nodes = conf.get("cosmos", "render_source_nodes", fallback=True) try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 8dc90a5c18..5cee0638ba 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -86,6 +86,14 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``profile`` - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` +.. _render_source_nodes: + +`render_source_nodes`_: + Enable individual source node rendering. Nodes without freshness checks would be rendered as EmptyOperator. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__RENDER_SOURCE_NODES`` + [openlineage] ~~~~~~~~~~~~~ From 56a77edc390a2d8fa769dc1116d0acaa6929ce11 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Sun, 21 Jul 2024 21:32:15 -0500 Subject: [PATCH 05/27] update jaffle shop source with tests --- dev/dags/dbt/jaffle_shop/models/staging/sources.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dev/dags/dbt/jaffle_shop/models/staging/sources.yml b/dev/dags/dbt/jaffle_shop/models/staging/sources.yml index b45fbaba3d..a3139b5858 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/sources.yml +++ b/dev/dags/dbt/jaffle_shop/models/staging/sources.yml @@ -13,6 +13,11 @@ sources: - unique - not_null - name: raw_payments + columns: + - name: id + tests: + - unique + - not_null - name: raw_orders columns: - name: id From 6c208c18b3111992699ced79aa728a33cb62c474 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Wed, 24 Jul 2024 23:35:19 -0500 Subject: [PATCH 06/27] Add source node rendering param to render config --- cosmos/airflow/graph.py | 32 ++++++++++++----- cosmos/config.py | 3 ++ cosmos/constants.py | 10 ++++++ cosmos/settings.py | 1 - docs/configuration/cosmos-conf.rst | 9 ----- docs/configuration/index.rst | 1 + docs/configuration/source-nodes-rendering.rst | 34 +++++++++++++++++++ tests/dbt/test_graph.py | 2 +- 8 files changed, 73 insertions(+), 19 deletions(-) create mode 100644 docs/configuration/source-nodes-rendering.rst diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 8374275421..bd9f4189da 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -12,6 +12,7 @@ TESTABLE_DBT_RESOURCES, DbtResourceType, ExecutionMode, + SourceRenderingBehavior, TestBehavior, TestIndirectSelection, ) @@ -19,7 +20,6 @@ from cosmos.core.graph.entities import Task as TaskMetadata from cosmos.dbt.graph import DbtNode from cosmos.log import get_logger -from cosmos.settings import render_source_nodes logger = get_logger(__name__) @@ -125,7 +125,11 @@ def create_test_task_metadata( def create_task_metadata( - node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False + node: DbtNode, + execution_mode: ExecutionMode, + args: dict[str, Any], + use_task_group: bool = False, + source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.ALL, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -154,20 +158,25 @@ def create_task_metadata( if use_task_group is True: task_id = "run" elif node.resource_type == DbtResourceType.SOURCE: - if str(render_source_nodes) != "True": + if source_rendering_behavior == SourceRenderingBehavior.NONE: msg = ( - "Source node rendering is currently disabled. To enable it, set the environment variable " - "AIRFLOW__COSMOS__RENDER_SOURCE_NODES=True. Enabling source node rendering may enhance the " - "visual representation of your DAGs." + "Source node rendering is currently disabled. To enable it, set the RenderConfig source_rendering_behavior = all " + "Enabling source node rendering may enhance the visual representation of your DAGs." ) logger.warning(msg) return None + elif ( + source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS + and node.has_freshness is False + and node.has_test is False + ): + return None task_id = f"{node.name}_source" args["select"] = f"source:{node.resource_name}" args["models"] = None if use_task_group is True: task_id = node.resource_type.value - if node.has_freshness is False: + if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: # render sources without freshness as empty operators return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator") else: @@ -200,6 +209,7 @@ def generate_task_or_group( execution_mode: ExecutionMode, task_args: dict[str, Any], test_behavior: TestBehavior, + source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, **kwargs: Any, @@ -213,7 +223,11 @@ def generate_task_or_group( ) task_meta = create_task_metadata( - node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group + node=node, + execution_mode=execution_mode, + args=task_args, + use_task_group=use_task_group, + source_rendering_behavior=source_rendering_behavior, ) # In most cases, we'll map one DBT node to one Airflow task @@ -275,6 +289,7 @@ def build_airflow_graph( """ node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior + source_rendering_behavior = render_config.source_rendering_behavior tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -293,6 +308,7 @@ def build_airflow_graph( execution_mode=execution_mode, task_args=task_args, test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, node=node, diff --git a/cosmos/config.py b/cosmos/config.py index e1e5d56f9a..6e1a8f4a12 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -17,6 +17,7 @@ ExecutionMode, InvocationMode, LoadMode, + SourceRenderingBehavior, TestBehavior, TestIndirectSelection, ) @@ -55,6 +56,7 @@ class RenderConfig: :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. :param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4). + :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "ALL" (since Cosmos 1.6). """ emit_datasets: bool = True @@ -71,6 +73,7 @@ class RenderConfig: dbt_ls_path: Path | None = None project_path: Path | None = field(init=False) enable_mock_profile: bool = True + source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.ALL airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) def __post_init__(self, dbt_project_path: str | Path | None) -> None: diff --git a/cosmos/constants.py b/cosmos/constants.py index 956660e016..0010839d70 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -84,6 +84,16 @@ class TestIndirectSelection(Enum): EMPTY = "empty" +class SourceRenderingBehavior(Enum): + """ + Modes to configure the source rendering behavior. + """ + + NONE = "none" + ALL = "all" + WITH_TESTS_OR_FRESHNESS = "with_tests_or_freshness" + + class DbtResourceType(aenum.Enum): # type: ignore """ Type of dbt node. diff --git a/cosmos/settings.py b/cosmos/settings.py index 3737b53a29..62d4ee5bdf 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -19,7 +19,6 @@ dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html") enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") -render_source_nodes = conf.get("cosmos", "render_source_nodes", fallback=True) try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 5cee0638ba..1c9f7220c7 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -86,15 +86,6 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``profile`` - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` -.. _render_source_nodes: - -`render_source_nodes`_: - Enable individual source node rendering. Nodes without freshness checks would be rendered as EmptyOperator. - - - Default: ``True`` - - Environment Variable: ``AIRFLOW__COSMOS__RENDER_SOURCE_NODES`` - - [openlineage] ~~~~~~~~~~~~~ diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index 90f1959385..f6e60f61b9 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -22,6 +22,7 @@ Cosmos offers a number of configuration options to customize its behavior. For m Testing Behavior Selecting & Excluding Partial Parsing + Source Nodes Rendering Operator Args Compiled SQL Logging diff --git a/docs/configuration/source-nodes-rendering.rst b/docs/configuration/source-nodes-rendering.rst new file mode 100644 index 0000000000..59ea020dbf --- /dev/null +++ b/docs/configuration/source-nodes-rendering.rst @@ -0,0 +1,34 @@ +.. _source-nodes-rendering: + +Source Nodes Rendering +================ + +Cosmos by default renders every dbt source as one of the following: + +- ``EmptyOperator`` if the source does not have `tests `_ or `freshness checks `_. +- ``DbtSourceOperator`` which tests the source's freshness. +- ``DbtTestOperator`` which runs the source's tests. + +This setup aims to create a DAG that aligns with dbt documentation, rendering both sources and models for enhanced visual representation. +It also prevents model dependencies from running if their sources are not fresh, avoiding the execution of stale data or writing incomplete data to your warehouse. + +This can be overridden using the ``source_rendering_behavior`` field in the ``RenderConfig`` object. + +Cosmos supports the following source rendering behaviors: + +- ``all`` (default): Renders all sources in the dbt project, using the 3 different node types: ``EmptyOperator``, ``DbtSourceOperator`` and ``DbtTestOperator``. +- ``none``: sources won't be rendered using cosmos' default source node rendering. Note: If node converters are being used for sources, they will still function. +- ``with_tests_or_freshness``: renders only sources that have either tests or freshness checks. + +Example: + +.. code-block:: python + + from cosmos import DbtTaskGroup, RenderConfig + from cosmos.constants import SourceRenderingBehavior + + jaffle_shop = DbtTaskGroup( + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, + ) + ) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index d499a2673d..80e3329c77 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -494,7 +494,7 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): dbt_graph.load_via_dbt_ls() assert dbt_graph.nodes == dbt_graph.filtered_nodes - assert len(dbt_graph.nodes) == 35 + assert len(dbt_graph.nodes) == 37 def test_load_via_custom_without_project_path(): From 8ad040c36857e5a8bcaff22c0400130d38af590b Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 25 Jul 2024 23:40:03 -0500 Subject: [PATCH 07/27] Update existing tests with new source rendering --- cosmos/airflow/graph.py | 2 +- cosmos/operators/local.py | 22 ---------------------- tests/airflow/test_graph.py | 11 +++++++---- tests/dbt/test_graph.py | 2 +- tests/operators/test_local.py | 11 ++++++++--- 5 files changed, 17 insertions(+), 31 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index bd9f4189da..8c1bad4afa 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -173,7 +173,7 @@ def create_task_metadata( return None task_id = f"{node.name}_source" args["select"] = f"source:{node.resource_name}" - args["models"] = None + args.pop("models") if use_task_group is True: task_id = node.resource_type.value if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b8dcfbff39..19d397cdfd 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -270,28 +270,6 @@ def store_freshness_json(self, tmp_project_dir: str, context: Context, session: else: self.freshness = "" - # need to refresh the rendered task field record in the db because Airflow only does this - # before executing the task, not after - from airflow.models.renderedtifields import RenderedTaskInstanceFields - - ti = context["ti"] - - if isinstance(ti, TaskInstance): # verifies ti is a TaskInstance in order to access and use the "task" field - if TYPE_CHECKING: - assert ti.task is not None - ti.task.template_fields = self.template_fields - rtif = RenderedTaskInstanceFields(ti, render_templates=False) - - # delete the old records - session.query(RenderedTaskInstanceFields).filter( - RenderedTaskInstanceFields.dag_id == self.dag_id, - RenderedTaskInstanceFields.task_id == self.task_id, - RenderedTaskInstanceFields.run_id == ti.run_id, - ).delete() - session.add(rtif) - else: - logger.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.") - def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: logger.info("Trying to run the command:\n %s\nFrom %s", command, cwd) subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index a238475c2c..42a6755f32 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -21,6 +21,7 @@ from cosmos.constants import ( DbtResourceType, ExecutionMode, + SourceRenderingBehavior, TestBehavior, TestIndirectSelection, ) @@ -163,6 +164,7 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource }, test_behavior=TestBehavior.AFTER_EACH, on_warning_callback=None, + source_rendering_behavior=SourceRenderingBehavior.ALL, ) assert isinstance(output, TaskGroup) assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"] @@ -303,12 +305,12 @@ def test_create_task_metadata_unsupported(caplog): ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, - "my_source_run", - "cosmos.operators.local.DbtRunLocalOperator", - {"models": "my_source"}, + "my_source_source", + "cosmos.operators.local.DbtSourceLocalOperator", + {"select": "source:my_source"}, { "dbt_node_config": { - "unique_id": "model.my_folder.my_source", + "unique_id": "source.my_folder.my_source", "resource_type": "source", "depends_on": [], "file_path": ".", @@ -358,6 +360,7 @@ def test_create_task_metadata_model( file_path=Path(""), tags=[], config={}, + has_freshness=True, ) metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 80e3329c77..e537d6532d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1391,7 +1391,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir if sys.platform == "darwin": assert hash_dir == "cdc6f0bec00f4edc616f3aa755a34330" else: - assert hash_dir == "cd9c5005d75c876bed5ce53ef13c15fc" + assert hash_dir == "2ea8e56adc790506a084db9195dc05de" @pytest.mark.integration diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 97dd9b817a..23f255b80e 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -36,6 +36,7 @@ DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, + DbtSourceLocalOperator, DbtTestLocalOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -770,15 +771,19 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo [ ( DbtSeedLocalOperator, - ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness", "full_refresh"), ), ( DbtRunLocalOperator, - ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness", "full_refresh"), ), ( DbtBuildLocalOperator, - ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness", "full_refresh"), + ), + ( + DbtSourceLocalOperator, + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "freshness"), ), ], ) From 395ecc64ea56a8c89dd41b8da3eff9b0610854c1 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 25 Jul 2024 23:57:32 -0500 Subject: [PATCH 08/27] Update project hash in tests --- tests/dbt/test_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index e537d6532d..9ad9aace4a 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1391,7 +1391,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir if sys.platform == "darwin": assert hash_dir == "cdc6f0bec00f4edc616f3aa755a34330" else: - assert hash_dir == "2ea8e56adc790506a084db9195dc05de" + assert hash_dir == "6c95281ae5225b38afe6d1b2886bac84" @pytest.mark.integration From 4fcb9a2ced64472b2bcfecb7f10a77a5819e5714 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Sun, 28 Jul 2024 21:02:19 -0500 Subject: [PATCH 09/27] update extra context after merge --- cosmos/core/airflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index 38d5113ec9..acff5d012d 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,7 +35,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None dag=dag, task_group=task_group, owner=task_owner, - extra_context=task.extra_context, + **({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}), **task.arguments, ) From 760d3f8a39e8ca9626cba6010253155f28ce37c8 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Sun, 28 Jul 2024 21:11:17 -0500 Subject: [PATCH 10/27] update test hash after merge --- tests/dbt/test_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 08fcaa02e3..2fe2806487 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1426,7 +1426,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir if sys.platform == "darwin": assert hash_dir == "18b97e2bff2684161f71db817f1f50e2" else: - assert hash_dir == "6c662da10b64a8390c469c884af88321" + assert hash_dir == "7c214f32d2e428c215e791435065f617" @pytest.mark.integration From 91ae0f416cc002f2bda2cc07090bf5182f7fac75 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 1 Aug 2024 23:33:52 -0500 Subject: [PATCH 11/27] adding quotes in ls command + source test --- cosmos/airflow/graph.py | 9 +---- cosmos/dbt/graph.py | 4 +- tests/airflow/test_graph.py | 74 +++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 91f60739d5..c329954011 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -161,14 +161,7 @@ def create_task_metadata( if use_task_group is True: task_id = "run" elif node.resource_type == DbtResourceType.SOURCE: - if source_rendering_behavior == SourceRenderingBehavior.NONE: - msg = ( - "Source node rendering is currently disabled. To enable it, set the RenderConfig source_rendering_behavior = all " - "Enabling source node rendering may enhance the visual representation of your DAGs." - ) - logger.warning(msg) - return None - elif ( + if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS and node.has_freshness is False and node.has_test is False diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 059961428a..d3fe3b9653 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -121,8 +121,6 @@ def is_freshness_effective(freshness: Optional[dict[str, Any]]) -> bool: if isinstance(value, dict): if any(subvalue is not None for subvalue in value.values()): return True - elif value is not None: - return True return False @@ -386,7 +384,7 @@ def run_dbt_ls( "--output", "json", "--output-keys", - "name alias unique_id resource_type depends_on original_file_path tags config freshness", + '"name alias unique_id resource_type depends_on original_file_path tags config freshness"', ] ls_args = self.dbt_ls_args diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index cf9e6c683f..31b1702b50 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -405,6 +405,80 @@ def test_create_task_metadata_model_use_task_group(caplog): assert metadata.id == "run" +@pytest.mark.parametrize( + "unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class", + [ + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + True, + SourceRenderingBehavior.ALL, + "my_source_source", + "cosmos.operators.local.DbtSourceLocalOperator", + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + False, + SourceRenderingBehavior.ALL, + "my_source_source", + "airflow.operators.empty.EmptyOperator", + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + True, + SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, + "my_source_source", + "cosmos.operators.local.DbtSourceLocalOperator", + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + False, + SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, + None, + None, + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + False, + SourceRenderingBehavior.NONE, + None, + None, + ), + ( + f"{DbtResourceType.SOURCE.value}.my_folder.my_source", + DbtResourceType.SOURCE, + False, + SourceRenderingBehavior.NONE, + None, + None, + ), + ], +) +def test_create_task_metadata_source_with_rendering_options( + unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class, caplog +): + child_node = DbtNode( + unique_id=unique_id, + resource_type=resource_type, + depends_on=[], + file_path=Path(""), + tags=[], + config={}, + has_freshness=has_freshness, + ) + + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, source_rendering_behavior=source_rendering_behavior, args={} + ) + if metadata: + assert metadata.id == expected_id + assert metadata.operator_class == expected_operator_class + + @pytest.mark.parametrize("use_task_group", (None, True, False)) def test_create_task_metadata_seed(caplog, use_task_group): sample_node = DbtNode( From d8983f8fca6b8509f51efd9959b5eb450c426b0b Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Fri, 2 Aug 2024 00:09:15 -0500 Subject: [PATCH 12/27] Update freshness param in source tests --- tests/airflow/test_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 31b1702b50..df64f63146 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -443,7 +443,7 @@ def test_create_task_metadata_model_use_task_group(caplog): ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, - False, + True, SourceRenderingBehavior.NONE, None, None, From 203c5346f304e37e4e6b05e1522606f2250d51cf Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Mon, 5 Aug 2024 21:05:18 -0500 Subject: [PATCH 13/27] update dbt ls command for 1.5.4 --- cosmos/dbt/graph.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index d3fe3b9653..7e66ee927d 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -384,7 +384,14 @@ def run_dbt_ls( "--output", "json", "--output-keys", - '"name alias unique_id resource_type depends_on original_file_path tags config freshness"', + "name", + "unique_id", + "resource_type", + "depends_on", + "original_file_path", + "tags", + "config", + "freshness", ] ls_args = self.dbt_ls_args From 093b6e5ba4b4107fa16ea6dc836ad2d50f8cab43 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Aug 2024 01:51:47 +0000 Subject: [PATCH 14/27] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/dbt/test_graph.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index b6c769c656..2a2598a669 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1429,7 +1429,6 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir assert hash_dir == "7c214f32d2e428c215e791435065f617" - @pytest.mark.integration def test_get_dbt_ls_cache_returns_empty_if_non_json_var(airflow_variable): graph = DbtGraph(project=ProjectConfig()) From 41546f0ce7e7e0082bc74adaa5ac379cb384ae45 Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 8 Aug 2024 21:07:11 -0500 Subject: [PATCH 15/27] Change default source rendering to None --- cosmos/config.py | 2 +- cosmos/dbt/graph.py | 34 ++-- .../models/staging/stg_customers.sql | 7 + .../jaffle_shop/models/staging/stg_orders.sql | 7 + .../models/staging/stg_payments.sql | 11 +- docs/configuration/source-nodes-rendering.rst | 24 +-- scripts/test/integration-dbt-1-5-4.sh | 1 + scripts/test/integration-expensive.sh | 1 + scripts/test/integration.sh | 2 + tests/airflow/test_graph.py | 30 ++-- tests/dbt/test_graph.py | 166 ++++++++++++++---- 11 files changed, 201 insertions(+), 84 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 4f2e216ba1..d10f17cc78 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -77,7 +77,7 @@ class RenderConfig: dbt_ls_path: Path | None = None project_path: Path | None = field(init=False) enable_mock_profile: bool = True - source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.ALL + source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) def __post_init__(self, dbt_project_path: str | Path | None) -> None: diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 7e66ee927d..5a2e663977 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -28,6 +28,7 @@ DbtResourceType, ExecutionMode, LoadMode, + SourceRenderingBehavior, ) from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file @@ -378,21 +379,24 @@ def run_dbt_ls( self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] ) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - ls_command = [ - dbt_cmd, - "ls", - "--output", - "json", - "--output-keys", - "name", - "unique_id", - "resource_type", - "depends_on", - "original_file_path", - "tags", - "config", - "freshness", - ] + if self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE: + ls_command = [ + dbt_cmd, + "ls", + "--output", + "json", + "--output-keys", + "name", + "unique_id", + "resource_type", + "depends_on", + "original_file_path", + "tags", + "config", + "freshness", + ] + else: + ls_command = [dbt_cmd, "ls", "--output", "json"] ls_args = self.dbt_ls_args ls_command.extend(self.local_flags) diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql index a408d7868f..71b6c7c0a9 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_customers.sql @@ -8,6 +8,13 @@ with source as ( ), +force_seed_dep as ( + {#- + This CTE is used to ensure tests wait for seeds to run if source_node_rendering = none + #} + select * from {{ ref('raw_customers') }} +), + renamed as ( select diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql index 12e1b247ed..b6c13a33f2 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_orders.sql @@ -8,6 +8,13 @@ with source as ( ), +force_seed_dep as ( + {#- + This CTE is used to ensure tests wait for seeds to run if source_node_rendering = none + #} + select * from {{ ref('raw_customers') }} +), + renamed as ( select diff --git a/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql b/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql index 9171ac8034..3ff1fbece3 100644 --- a/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql +++ b/dev/dags/dbt/jaffle_shop/models/staging/stg_payments.sql @@ -1,13 +1,16 @@ with source as ( - {#- - Normally we would select from the table here, but we are using seeds to load - our data in this project - #} select * from {{ source('postgres_db', 'raw_payments') }} ), +force_seed_dep as ( + {#- + This CTE is used to ensure tests wait for seeds to run if source_node_rendering = none + #} + select * from {{ ref('raw_customers') }} +), + renamed as ( select diff --git a/docs/configuration/source-nodes-rendering.rst b/docs/configuration/source-nodes-rendering.rst index 59ea020dbf..ad99bb2fab 100644 --- a/docs/configuration/source-nodes-rendering.rst +++ b/docs/configuration/source-nodes-rendering.rst @@ -3,22 +3,24 @@ Source Nodes Rendering ================ -Cosmos by default renders every dbt source as one of the following: +.. note:: + This feature is only available for dbt-core >= 1.5. -- ``EmptyOperator`` if the source does not have `tests `_ or `freshness checks `_. -- ``DbtSourceOperator`` which tests the source's freshness. -- ``DbtTestOperator`` which runs the source's tests. +By default, Cosmos does not render dbt sources automatically. Instead, you need to configure the rendering of sources explicitly. +You can control this behavior using the ``source_rendering_behavior`` field in the ``RenderConfig`` object. This is how it works: -This setup aims to create a DAG that aligns with dbt documentation, rendering both sources and models for enhanced visual representation. -It also prevents model dependencies from running if their sources are not fresh, avoiding the execution of stale data or writing incomplete data to your warehouse. +- **all**: + When set to ``all``, Cosmos renders all sources in the dbt project. It uses three different node types for this: + - ``EmptyOperator``: For sources that do not have tests or freshness checks. + - ``DbtSourceOperator``: For sources that have freshness checks. + - ``DbtTestOperator``: For sources that have tests. -This can be overridden using the ``source_rendering_behavior`` field in the ``RenderConfig`` object. + This approach aims to create a comprehensive DAG that aligns with dbt documentation, allowing for the rendering of both sources and models for a more detailed visual representation. + It also ensures that model dependencies do not run if their sources are not fresh, thus preventing the execution of stale or incomplete data. -Cosmos supports the following source rendering behaviors: +- **none** (default): When set to ``none``, Cosmos does not automatically render any sources. Note that if node converters are being used for sources, they will still function as intended. -- ``all`` (default): Renders all sources in the dbt project, using the 3 different node types: ``EmptyOperator``, ``DbtSourceOperator`` and ``DbtTestOperator``. -- ``none``: sources won't be rendered using cosmos' default source node rendering. Note: If node converters are being used for sources, they will still function. -- ``with_tests_or_freshness``: renders only sources that have either tests or freshness checks. +- **with_tests_or_freshness**: When set to ``with_tests_or_freshness``, Cosmos only renders sources that have either tests or freshness checks. Example: diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index 0875330820..284f605170 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -1,5 +1,6 @@ pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 +export SOURCE_RENDERING_BEHAVIOR=all rm -rf airflow.*; \ airflow db init; \ pytest -vv \ diff --git a/scripts/test/integration-expensive.sh b/scripts/test/integration-expensive.sh index 24bace86d4..96c2388cfe 100644 --- a/scripts/test/integration-expensive.sh +++ b/scripts/test/integration-expensive.sh @@ -1,3 +1,4 @@ +export SOURCE_RENDERING_BEHAVIOR=all pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 1d8264768a..a39ef63b1f 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -3,6 +3,8 @@ set -x set -e +export SOURCE_RENDERING_BEHAVIOR=all + pip freeze | grep airflow echo $AIRFLOW_HOME ls $AIRFLOW_HOME diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index df64f63146..39373a24b5 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1,3 +1,4 @@ +import os from datetime import datetime from pathlib import Path from unittest.mock import patch @@ -30,6 +31,7 @@ from cosmos.profiles import PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") +SOURCE_RENDERING_BEHAVIOR_ENV = os.getenv("SOURCE_RENDERING_BEHAVIOR", "none") parent_seed = DbtNode( unique_id=f"{DbtResourceType.SEED.value}.{SAMPLE_PROJ_PATH.stem}.seed_parent", @@ -101,6 +103,7 @@ def test_build_airflow_graph_with_after_each(): task_args=task_args, render_config=RenderConfig( test_behavior=TestBehavior.AFTER_EACH, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ), dbt_project_name="astro_shop", ) @@ -170,7 +173,7 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource }, test_behavior=TestBehavior.AFTER_EACH, on_warning_callback=None, - source_rendering_behavior=SourceRenderingBehavior.ALL, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) assert isinstance(output, TaskGroup) assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"] @@ -198,6 +201,7 @@ def test_build_airflow_graph_with_after_all(): render_config = RenderConfig( select=["tag:some"], test_behavior=TestBehavior.AFTER_ALL, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) build_airflow_graph( nodes=sample_nodes, @@ -412,7 +416,7 @@ def test_create_task_metadata_model_use_task_group(caplog): f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, True, - SourceRenderingBehavior.ALL, + SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), "my_source_source", "cosmos.operators.local.DbtSourceLocalOperator", ), @@ -420,26 +424,10 @@ def test_create_task_metadata_model_use_task_group(caplog): f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, False, - SourceRenderingBehavior.ALL, + SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), "my_source_source", "airflow.operators.empty.EmptyOperator", ), - ( - f"{DbtResourceType.SOURCE.value}.my_folder.my_source", - DbtResourceType.SOURCE, - True, - SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, - "my_source_source", - "cosmos.operators.local.DbtSourceLocalOperator", - ), - ( - f"{DbtResourceType.SOURCE.value}.my_folder.my_source", - DbtResourceType.SOURCE, - False, - SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, - None, - None, - ), ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, @@ -601,7 +589,9 @@ def test_airflow_kwargs_generation(): "group_id": "fake_group_id", "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", - "render_config": RenderConfig(select=["fake-render"]), + "render_config": RenderConfig( + select=["fake-render"], source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + ), "default_args": {"retries": 2}, "profile_config": ProfileConfig( profile_name="default", diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 2fe2806487..84f3dce41d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -14,7 +14,7 @@ from cosmos import settings from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode +from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode, SourceRenderingBehavior from cosmos.dbt.graph import ( CosmosLoadDbtException, DbtGraph, @@ -32,6 +32,7 @@ SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json" SAMPLE_DBT_LS_OUTPUT = Path(__file__).parent.parent / "sample/sample_dbt_ls.txt" +SOURCE_RENDERING_BEHAVIOR_ENV = os.getenv("SOURCE_RENDERING_BEHAVIOR", "none") @pytest.fixture @@ -131,7 +132,10 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(exclude=["config.materialized:table"]) + render_config = RenderConfig( + exclude=["config.materialized:table"], + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -170,7 +174,9 @@ def test_load_via_manifest_with_select(project_name, manifest_filepath, model_fi target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(select=["+customers"]) + render_config = RenderConfig( + select=["+customers"], source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -251,7 +257,10 @@ def test_load_automatic_dbt_ls_file_is_available(mock_load_via_dbt_ls_file): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT) + render_config = RenderConfig( + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL) assert mock_load_via_dbt_ls_file.called @@ -264,7 +273,9 @@ def test_load_dbt_ls_file_without_file(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(dbt_ls_path=None) + render_config = RenderConfig( + dbt_ls_path=None, source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + ) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE) @@ -278,7 +289,11 @@ def test_load_dbt_ls_file_without_project_path(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=None) + render_config = RenderConfig( + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, + dbt_project_path=None, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) dbt_graph = DbtGraph( project=project_config, profile_config=profile_config, @@ -419,7 +434,10 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( assert not (tmp_dbt_project_dir / "logs").exists() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -440,7 +458,10 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( def test_load_via_dbt_ls_with_exclude(postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["*customers*"], exclude=["*orders*"] + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + select=["*customers*"], + exclude=["*orders*"], + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -483,7 +504,10 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config): @pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -500,7 +524,10 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") + render_config = RenderConfig( + dbt_executable_path="/inexistent/dbt", + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) dbt_graph = DbtGraph( project=project_config, execution_config=execution_config, @@ -518,7 +545,9 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME + dbt_executable_path="existing-dbt-cmd", + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) dbt_graph = DbtGraph( project=project_config, @@ -537,7 +566,9 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + dbt_executable_path="/inexistent/dbt", + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( @@ -571,6 +602,7 @@ def test_load_via_dbt_ls_with_sources(load_method): dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False, env_vars={"DBT_SQLITE_PATH": str(DBT_PROJECTS_ROOT_DIR / "data")}, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( @@ -588,7 +620,11 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_deps=False) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + dbt_deps=False, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -631,7 +667,11 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages( stdout, stderr = process.communicate() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=False) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + dbt_deps=False, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -659,7 +699,10 @@ def test_load_via_dbt_ls_caching_partial_parsing( project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + dbt_deps=True, + enable_mock_profile=False, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -699,7 +742,10 @@ def test_load_via_dbt_ls_uses_partial_parse_when_cache_is_disabled( caplog.set_level(logging.DEBUG) project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( - dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + dbt_deps=True, + enable_mock_profile=False, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -740,7 +786,10 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr( mock_popen().returncode = 0 project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -759,7 +808,10 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen, postgres_profile_c mock_popen().returncode = 1 project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -777,7 +829,10 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen, postgres_profile_c def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate, postgres_profile_config): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, @@ -796,7 +851,10 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate, po def test_load_via_load_via_custom_parser(project_name): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -818,7 +876,11 @@ def test_load_via_load_via_custom_parser(project_name): def test_load_via_load_via_custom_parser_select_rendering_config(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "jaffle_shop") execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["customers"]) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + select=["customers"], + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -886,7 +948,10 @@ def test_update_node_dependency_test_not_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(exclude=["config.materialized:test"]) + render_config = RenderConfig( + exclude=["config.materialized:test"], + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -909,7 +974,9 @@ def test_tag_selected_node_test_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig(select=["tag:test_tag"]) + render_config = RenderConfig( + select=["tag:test_tag"], source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -934,7 +1001,10 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method, postgres_profi dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), - render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), + render_config=RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), profile_config=postgres_profile_config, ) @@ -982,7 +1052,9 @@ def test_load_via_dbt_ls_file(): profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) render_config = RenderConfig( - dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) dbt_graph = DbtGraph( project=project_config, @@ -1080,7 +1152,10 @@ def test_load_via_dbt_ls_project_config_env_vars( mock_popen().returncode = 0 env_vars = {"MY_ENV_VAR": "my_value"} project_config = ProjectConfig(env_vars=env_vars) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -1117,7 +1192,10 @@ def test_profile_created_correctly_with_profile_mapping( mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 project_config = ProjectConfig(env_vars={}) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) profile_config = postgres_profile_config execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -1142,7 +1220,10 @@ def test_load_via_dbt_ls_project_config_dbt_vars( mock_popen().returncode = 0 dbt_vars = {"my_var1": "my_value1", "my_var2": "my_value2"} project_config = ProjectConfig(dbt_vars=dbt_vars) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -1177,6 +1258,7 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS, selector=selector, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) profile_config = ProfileConfig( profile_name="test", @@ -1207,7 +1289,11 @@ def test_load_via_dbt_ls_render_config_no_partial_parse( mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 project_config = ProjectConfig(partial_parse=False) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + load_method=LoadMode.DBT_LS, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -1233,7 +1319,11 @@ def test_load_method_with_unsupported_render_config_selector_arg(load_method): f"RenderConfig.selector is not yet supported when loading dbt projects using the {load_method} parser." ) dbt_graph = DbtGraph( - render_config=RenderConfig(load_method=load_method, selector="my_selector"), + render_config=RenderConfig( + load_method=load_method, + selector="my_selector", + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ), project=MagicMock(), ) with pytest.raises(CosmosLoadDbtException, match=expected_error_msg): @@ -1257,6 +1347,7 @@ def test_load_via_dbt_ls_with_project_config_vars(): render_config=RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False, + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( @@ -1293,6 +1384,7 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, selector="stage_customers", + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), ) dbt_graph = DbtGraph( @@ -1304,9 +1396,11 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile dbt_graph.load_via_dbt_ls() filtered_nodes = dbt_graph.filtered_nodes.keys() - assert len(filtered_nodes) == 6 + assert len(filtered_nodes) == 7 assert "model.jaffle_shop.stg_customers" in filtered_nodes - assert "source.jaffle_shop.postgres_db.raw_customers" in filtered_nodes + assert "seed.jaffle_shop.raw_customers" in filtered_nodes + if SOURCE_RENDERING_BEHAVIOR_ENV == "all": + assert "source.jaffle_shop.postgres_db.raw_customers" in filtered_nodes # Four tests should be filtered assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 4 @@ -1406,7 +1500,13 @@ def airflow_variable(): @pytest.mark.integration def test_dbt_ls_cache_key_args_uses_airflow_vars_to_purge_dbt_ls_cache(airflow_variable): key, value = airflow_variable - graph = DbtGraph(project=ProjectConfig(), render_config=RenderConfig(airflow_vars_to_purge_dbt_ls_cache=[key])) + graph = DbtGraph( + project=ProjectConfig(), + render_config=RenderConfig( + airflow_vars_to_purge_dbt_ls_cache=[key], + source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + ), + ) assert graph.dbt_ls_cache_key_args == [key, value] From 4866f6181cc62f4225828f6f4086b4a4db5a741c Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Thu, 8 Aug 2024 21:43:04 -0500 Subject: [PATCH 16/27] Update project hash --- tests/dbt/test_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index efc521d37e..34171ec2ae 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1526,7 +1526,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir if sys.platform == "darwin": assert hash_dir == "65595448aded2c2b52878a801c1d9c59" else: - assert hash_dir == "7c214f32d2e428c215e791435065f617" + assert hash_dir == "9001bedf4aa8a329f7b669c89f337c24" @pytest.mark.integration From ce6da818dc46b25570cf4ec6d04998995caa2e5d Mon Sep 17 00:00:00 2001 From: arojasb3 Date: Mon, 12 Aug 2024 20:38:50 -0500 Subject: [PATCH 17/27] Add suggestions + workflow env variables --- .github/workflows/test.yml | 3 +++ cosmos/airflow/graph.py | 2 +- cosmos/config.py | 2 +- docs/configuration/source-nodes-rendering.rst | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f19831676c..219ea2019a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -163,6 +163,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -234,6 +235,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -377,6 +379,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index c329954011..32da4618d3 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -132,7 +132,7 @@ def create_task_metadata( execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False, - source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.ALL, + source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. diff --git a/cosmos/config.py b/cosmos/config.py index d10f17cc78..f2c07bb980 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -60,7 +60,7 @@ class RenderConfig: :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. :param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4). - :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "ALL" (since Cosmos 1.6). + :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). """ emit_datasets: bool = True diff --git a/docs/configuration/source-nodes-rendering.rst b/docs/configuration/source-nodes-rendering.rst index ad99bb2fab..ae1417361f 100644 --- a/docs/configuration/source-nodes-rendering.rst +++ b/docs/configuration/source-nodes-rendering.rst @@ -4,7 +4,7 @@ Source Nodes Rendering ================ .. note:: - This feature is only available for dbt-core >= 1.5. + This feature is only available for dbt-core >= 1.5 and cosmos >= 1.6.0. By default, Cosmos does not render dbt sources automatically. Instead, you need to configure the rendering of sources explicitly. You can control this behavior using the ``source_rendering_behavior`` field in the ``RenderConfig`` object. This is how it works: From dab5229ec9ebb9e3174648eb8cdc7f3965578fb2 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 11:29:31 +0530 Subject: [PATCH 18/27] Run ci on add-source-nodes branch --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 219ea2019a..66d9652e01 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, add-source-nodes] pull_request_target: # Also run on pull requests originated from forks - branches: [main] + branches: [main, add-source-nodes] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From d6ad582663ae60e0752996ab758c35aede9d73da Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 11:39:49 +0530 Subject: [PATCH 19/27] Run ci on add-source-nodes branch --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 66d9652e01..20c86754fe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: test on: push: # Run on pushes to the default branch - branches: [main, add-source-nodes] + branches: [add-source-nodes] pull_request_target: # Also run on pull requests originated from forks - branches: [main, add-source-nodes] + branches: [add-source-nodes] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From 1dd4b82fd3c2fd1785e88ec5e8f14d4a927f5e65 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 15:46:16 +0530 Subject: [PATCH 20/27] intentionally fail the sql job --- .github/workflows/test.yml | 1 + scripts/test/integration-sqlite.sh | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 20c86754fe..07707ba127 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -303,6 +303,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 + SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 diff --git a/scripts/test/integration-sqlite.sh b/scripts/test/integration-sqlite.sh index dc32324d47..47cc7163ba 100644 --- a/scripts/test/integration-sqlite.sh +++ b/scripts/test/integration-sqlite.sh @@ -1,3 +1,4 @@ +export SOURCE_RENDERING_BEHAVIOR=all pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ From 1c6ae6525c9e91e55a5990af9b4104d9a5f05626 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 16:51:46 +0530 Subject: [PATCH 21/27] Revert env changes --- .github/workflows/test.yml | 1 - scripts/test/integration-sqlite.sh | 1 - tests/airflow/test_graph.py | 14 +++--- tests/dbt/test_graph.py | 74 +++++++++++++++--------------- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 07707ba127..20c86754fe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -303,7 +303,6 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 - SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 diff --git a/scripts/test/integration-sqlite.sh b/scripts/test/integration-sqlite.sh index 47cc7163ba..dc32324d47 100644 --- a/scripts/test/integration-sqlite.sh +++ b/scripts/test/integration-sqlite.sh @@ -1,4 +1,3 @@ -export SOURCE_RENDERING_BEHAVIOR=all pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 39373a24b5..dc3c948f72 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -31,7 +31,7 @@ from cosmos.profiles import PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") -SOURCE_RENDERING_BEHAVIOR_ENV = os.getenv("SOURCE_RENDERING_BEHAVIOR", "none") +SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) parent_seed = DbtNode( unique_id=f"{DbtResourceType.SEED.value}.{SAMPLE_PROJ_PATH.stem}.seed_parent", @@ -103,7 +103,7 @@ def test_build_airflow_graph_with_after_each(): task_args=task_args, render_config=RenderConfig( test_behavior=TestBehavior.AFTER_EACH, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), dbt_project_name="astro_shop", ) @@ -173,7 +173,7 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource }, test_behavior=TestBehavior.AFTER_EACH, on_warning_callback=None, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) assert isinstance(output, TaskGroup) assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"] @@ -201,7 +201,7 @@ def test_build_airflow_graph_with_after_all(): render_config = RenderConfig( select=["tag:some"], test_behavior=TestBehavior.AFTER_ALL, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) build_airflow_graph( nodes=sample_nodes, @@ -416,7 +416,7 @@ def test_create_task_metadata_model_use_task_group(caplog): f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, True, - SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + SOURCE_RENDERING_BEHAVIOR, "my_source_source", "cosmos.operators.local.DbtSourceLocalOperator", ), @@ -424,7 +424,7 @@ def test_create_task_metadata_model_use_task_group(caplog): f"{DbtResourceType.SOURCE.value}.my_folder.my_source", DbtResourceType.SOURCE, False, - SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + SOURCE_RENDERING_BEHAVIOR, "my_source_source", "airflow.operators.empty.EmptyOperator", ), @@ -590,7 +590,7 @@ def test_airflow_kwargs_generation(): "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", "render_config": RenderConfig( - select=["fake-render"], source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + select=["fake-render"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR ), "default_args": {"retries": 2}, "profile_config": ProfileConfig( diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 34171ec2ae..3998227c05 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -32,7 +32,7 @@ SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json" SAMPLE_DBT_LS_OUTPUT = Path(__file__).parent.parent / "sample/sample_dbt_ls.txt" -SOURCE_RENDERING_BEHAVIOR_ENV = os.getenv("SOURCE_RENDERING_BEHAVIOR", "none") +SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) @pytest.fixture @@ -134,7 +134,7 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f ) render_config = RenderConfig( exclude=["config.materialized:table"], - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( @@ -175,7 +175,7 @@ def test_load_via_manifest_with_select(project_name, manifest_filepath, model_fi profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) render_config = RenderConfig( - select=["+customers"], source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + select=["+customers"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( @@ -259,7 +259,7 @@ def test_load_automatic_dbt_ls_file_is_available(mock_load_via_dbt_ls_file): ) render_config = RenderConfig( dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL) @@ -274,7 +274,7 @@ def test_load_dbt_ls_file_without_file(): profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) render_config = RenderConfig( - dbt_ls_path=None, source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + dbt_ls_path=None, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR ) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) with pytest.raises(CosmosLoadDbtException) as err_info: @@ -292,7 +292,7 @@ def test_load_dbt_ls_file_without_project_path(): render_config = RenderConfig( dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=None, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( project=project_config, @@ -436,7 +436,7 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -461,7 +461,7 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config): dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["*customers*"], exclude=["*orders*"], - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -506,7 +506,7 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -526,7 +526,7 @@ def test_load_via_custom_without_project_path(): execution_config = ExecutionConfig() render_config = RenderConfig( dbt_executable_path="/inexistent/dbt", - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( project=project_config, @@ -547,7 +547,7 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): render_config = RenderConfig( dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( project=project_config, @@ -568,7 +568,7 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt", - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( @@ -602,7 +602,7 @@ def test_load_via_dbt_ls_with_sources(load_method): dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False, env_vars={"DBT_SQLITE_PATH": str(DBT_PROJECTS_ROOT_DIR / "data")}, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( @@ -623,7 +623,7 @@ def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_deps=False, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -670,7 +670,7 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages( render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=False, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -702,7 +702,7 @@ def test_load_via_dbt_ls_caching_partial_parsing( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -745,7 +745,7 @@ def test_load_via_dbt_ls_uses_partial_parse_when_cache_is_disabled( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -788,7 +788,7 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr( project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -810,7 +810,7 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen, postgres_profile_c project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -831,7 +831,7 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate, po project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( @@ -853,7 +853,7 @@ def test_load_via_load_via_custom_parser(project_name): execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -879,7 +879,7 @@ def test_load_via_load_via_custom_parser_select_rendering_config(): render_config = RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["customers"], - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -950,7 +950,7 @@ def test_update_node_dependency_test_not_exist(): ) render_config = RenderConfig( exclude=["config.materialized:test"], - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( @@ -975,7 +975,7 @@ def test_tag_selected_node_test_exist(): profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) render_config = RenderConfig( - select=["tag:test_tag"], source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV) + select=["tag:test_tag"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR ) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( @@ -1003,7 +1003,7 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method, postgres_profi ), render_config=RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), profile_config=postgres_profile_config, @@ -1054,7 +1054,7 @@ def test_load_via_dbt_ls_file(): render_config = RenderConfig( dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( project=project_config, @@ -1154,7 +1154,7 @@ def test_load_via_dbt_ls_project_config_env_vars( project_config = ProjectConfig(env_vars=env_vars) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -1194,7 +1194,7 @@ def test_profile_created_correctly_with_profile_mapping( project_config = ProjectConfig(env_vars={}) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = postgres_profile_config execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) @@ -1222,7 +1222,7 @@ def test_load_via_dbt_ls_project_config_dbt_vars( project_config = ProjectConfig(dbt_vars=dbt_vars) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -1258,7 +1258,7 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS, selector=selector, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -1292,7 +1292,7 @@ def test_load_via_dbt_ls_render_config_no_partial_parse( render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, load_method=LoadMode.DBT_LS, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) profile_config = ProfileConfig( profile_name="test", @@ -1322,7 +1322,7 @@ def test_load_method_with_unsupported_render_config_selector_arg(load_method): render_config=RenderConfig( load_method=load_method, selector="my_selector", - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), project=MagicMock(), ) @@ -1347,7 +1347,7 @@ def test_load_via_dbt_ls_with_project_config_vars(): render_config=RenderConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False, - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( @@ -1384,7 +1384,7 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, selector="stage_customers", - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) dbt_graph = DbtGraph( @@ -1399,7 +1399,7 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile assert len(filtered_nodes) == 7 assert "model.jaffle_shop.stg_customers" in filtered_nodes assert "seed.jaffle_shop.raw_customers" in filtered_nodes - if SOURCE_RENDERING_BEHAVIOR_ENV == "all": + if SOURCE_RENDERING_BEHAVIOR == SourceRenderingBehavior.ALL: assert "source.jaffle_shop.postgres_db.raw_customers" in filtered_nodes # Four tests should be filtered assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 4 @@ -1504,7 +1504,7 @@ def test_dbt_ls_cache_key_args_uses_airflow_vars_to_purge_dbt_ls_cache(airflow_v project=ProjectConfig(), render_config=RenderConfig( airflow_vars_to_purge_dbt_ls_cache=[key], - source_rendering_behavior=SourceRenderingBehavior(SOURCE_RENDERING_BEHAVIOR_ENV), + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ), ) assert graph.dbt_ls_cache_key_args == [key, value] @@ -1524,7 +1524,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "65595448aded2c2b52878a801c1d9c59" + assert hash_dir == "a9879ec2ec503b0fe023d059caf50d41" else: assert hash_dir == "9001bedf4aa8a329f7b669c89f337c24" From 59574fc007ccb8383f86cce6d36d3bfa13c06938 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 13 Aug 2024 11:22:07 +0000 Subject: [PATCH 22/27] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/airflow/test_graph.py | 4 +--- tests/dbt/test_graph.py | 12 +++--------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index dc3c948f72..ddffa226c3 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -589,9 +589,7 @@ def test_airflow_kwargs_generation(): "group_id": "fake_group_id", "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", - "render_config": RenderConfig( - select=["fake-render"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR - ), + "render_config": RenderConfig(select=["fake-render"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR), "default_args": {"retries": 2}, "profile_config": ProfileConfig( profile_name="default", diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 3998227c05..8b526a8ab3 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -174,9 +174,7 @@ def test_load_via_manifest_with_select(project_name, manifest_filepath, model_fi target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig( - select=["+customers"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR - ) + render_config = RenderConfig(select=["+customers"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, @@ -273,9 +271,7 @@ def test_load_dbt_ls_file_without_file(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig( - dbt_ls_path=None, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR - ) + render_config = RenderConfig(dbt_ls_path=None, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR) dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE) @@ -974,9 +970,7 @@ def test_tag_selected_node_test_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - render_config = RenderConfig( - select=["tag:test_tag"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR - ) + render_config = RenderConfig(select=["tag:test_tag"], source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR) execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) dbt_graph = DbtGraph( project=project_config, From ad0e0c2b45880e9421ea3d8471033057c707f1be Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 17:31:24 +0530 Subject: [PATCH 23/27] Add tests --- tests/operators/test_local.py | 71 +++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 23f255b80e..54d9d01da6 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,4 @@ +import json import logging import os import shutil @@ -946,3 +947,73 @@ def test_handle_exception_subprocess(caplog): operator.handle_exception_subprocess(result) assert len(str(err_context.value)) < 100 # Ensure the error message is not too long assert len(caplog.text) > 1000 # Ensure the log message is not truncated + + +@pytest.fixture +def mock_context(): + return MagicMock() + + +@pytest.fixture +def mock_session(): + return MagicMock() + + +@patch("cosmos.operators.local.Path") +def test_store_freshness_json(mock_path_class, mock_context, mock_session): + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + ) + + # Mock the behavior of Path.exists() and Path.read_text() + mock_sources_json_path = MagicMock() + mock_path_class.return_value = mock_sources_json_path + mock_sources_json_path.exists.return_value = True + mock_sources_json_path.read_text.return_value = '{"key": "value"}' + + # Expected formatted JSON content + expected_freshness = json.dumps({"key": "value"}, indent=4) + + # Call the method under test + instance.store_freshness_json(tmp_project_dir="/mock/dir", context=mock_context, session=mock_session) + + # Verify the freshness attribute is set correctly + assert instance.freshness == expected_freshness + + +@patch("cosmos.operators.local.Path") +def test_store_freshness_json_no_file(mock_path_class, mock_context, mock_session): + # Create an instance of the class that contains the method + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + ) + + # Mock the behavior of Path.exists() and Path.read_text() + mock_sources_json_path = MagicMock() + mock_path_class.return_value = mock_sources_json_path + mock_sources_json_path.exists.return_value = False + + # Call the method under test + instance.store_freshness_json(tmp_project_dir="/mock/dir", context=mock_context, session=mock_session) + + # Verify the freshness attribute is set correctly + assert instance.freshness == "" + + +def test_store_freshness_not_store_compiled_sql(mock_context, mock_session): + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + should_store_compiled_sql=False, + ) + + # Call the method under test + instance.store_freshness_json(tmp_project_dir="/mock/dir", context=mock_context, session=mock_session) + + # Verify the freshness attribute is set correctly + assert instance.freshness == "" From b0b40206b6d73cd63241f26675706896362fd32c Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 17:41:53 +0530 Subject: [PATCH 24/27] Add test --- dev/dags/basic_cosmos_task_group.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index d63cf2c923..3e9cf741c2 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -10,7 +10,7 @@ from airflow.operators.empty import EmptyOperator from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import InvocationMode +from cosmos.constants import InvocationMode, SourceRenderingBehavior from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -64,6 +64,7 @@ def basic_cosmos_task_group() -> None: render_config=RenderConfig( select=["path:seeds/raw_orders.csv"], enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping + source_rendering_behavior=SourceRenderingBehavior.ALL ), execution_config=shared_execution_config, operator_args={"install_deps": True}, From 7fd92abb053509396e136049a9f18de38bb3a148 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:12:15 +0000 Subject: [PATCH 25/27] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dev/dags/basic_cosmos_task_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 3e9cf741c2..0cd31944bc 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -64,7 +64,7 @@ def basic_cosmos_task_group() -> None: render_config=RenderConfig( select=["path:seeds/raw_orders.csv"], enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping - source_rendering_behavior=SourceRenderingBehavior.ALL + source_rendering_behavior=SourceRenderingBehavior.ALL, ), execution_config=shared_execution_config, operator_args={"install_deps": True}, From 96492c8db2fa69c293aa3d0bceda1a2c7d4d3133 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 18:43:08 +0530 Subject: [PATCH 26/27] Revert source render param --- dev/dags/basic_cosmos_task_group.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 0cd31944bc..d63cf2c923 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -10,7 +10,7 @@ from airflow.operators.empty import EmptyOperator from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import InvocationMode, SourceRenderingBehavior +from cosmos.constants import InvocationMode from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -64,7 +64,6 @@ def basic_cosmos_task_group() -> None: render_config=RenderConfig( select=["path:seeds/raw_orders.csv"], enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping - source_rendering_behavior=SourceRenderingBehavior.ALL, ), execution_config=shared_execution_config, operator_args={"install_deps": True}, From bacd19db37c4d0e686ceed6566ccd32374f101bf Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Aug 2024 20:39:08 +0530 Subject: [PATCH 27/27] Add no cover --- .github/workflows/test.yml | 4 ++-- cosmos/airflow/graph.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 20c86754fe..219ea2019a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: test on: push: # Run on pushes to the default branch - branches: [add-source-nodes] + branches: [main] pull_request_target: # Also run on pull requests originated from forks - branches: [add-source-nodes] + branches: [main] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 32da4618d3..17ee22c95b 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -167,6 +167,8 @@ def create_task_metadata( and node.has_test is False ): return None + # TODO: https://github.com/astronomer/astronomer-cosmos + # pragma: no cover task_id = f"{node.name}_source" args["select"] = f"source:{node.resource_name}" args.pop("models")