From 23ed0eb0da71486cfda5f20f05aa3fa789e8916e Mon Sep 17 00:00:00 2001 From: dwreeves Date: Sat, 6 Jan 2024 19:42:00 -0500 Subject: [PATCH 01/15] add more template fields --- cosmos/operators/base.py | 2 +- docs/configuration/operator-args.rst | 1 + tests/operators/test_local.py | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 6d276013d5..3912e435f0 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -59,7 +59,7 @@ class DbtBaseOperator(BaseOperator): :param dbt_cmd_global_flags: List of dbt global flags to be passed to the dbt command """ - template_fields: Sequence[str] = ("env", "vars") + template_fields: Sequence[str] = ("env", "select", "exclude", "selector", "vars", "models") global_flags = ( "project_dir", "select", diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 5ddbe6565a..7d164dd4f4 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -54,6 +54,7 @@ dbt-related - ``quiet``: run ``dbt`` in silent mode, only displaying its error logs. - ``vars``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.dbt_vars`` instead) Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``. - ``warn_error``: convert ``dbt`` warnings into errors. +- ``full_refresh``: If True, then full refresh the node. This only applies to model and seed nodes. Airflow-related ............... diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index dd7d34a6d8..df77f7f0e1 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -423,8 +423,8 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo @pytest.mark.parametrize( "operator_class,expected_template", [ - (DbtSeedLocalOperator, ("env", "vars", "compiled_sql", "full_refresh")), - (DbtRunLocalOperator, ("env", "vars", "compiled_sql", "full_refresh")), + (DbtSeedLocalOperator, ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh")), + (DbtRunLocalOperator, ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh")), ], ) def test_dbt_base_operator_template_fields(operator_class, expected_template): From 9561911457a07bc8a3e7ee92802efd5f5edd0f25 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Jan 2024 01:24:54 +0000 Subject: [PATCH 02/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_local.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index df77f7f0e1..c14738a83c 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -423,8 +423,14 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo @pytest.mark.parametrize( "operator_class,expected_template", [ - (DbtSeedLocalOperator, ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh")), - (DbtRunLocalOperator, ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh")), + ( + DbtSeedLocalOperator, + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ), + ( + DbtRunLocalOperator, + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ), ], ) def test_dbt_base_operator_template_fields(operator_class, expected_template): From 5c6210d8bdfd0d75d6a60776b3ae2e79b87848cc Mon Sep 17 00:00:00 2001 From: dwreeves Date: Fri, 9 Feb 2024 14:08:31 -0500 Subject: [PATCH 03/15] fix inplace mutation --- cosmos/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 97e8190dd1..4c49f1ad2b 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -210,7 +210,7 @@ def __init__( execution_config = execution_config or ExecutionConfig() render_config = render_config or RenderConfig() - operator_args = operator_args or {} + operator_args = operator_args.copy() if operator_args else {} validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) From 206de9a680d418c9886f0b15effbdf9d352c3669 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Sun, 25 Feb 2024 13:03:41 -0500 Subject: [PATCH 04/15] updates --- cosmos/operators/base.py | 43 +++++++++++++++++++++++++--- docs/configuration/operator-args.rst | 31 ++++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 8edd7c091d..109dbb7124 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -8,6 +8,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.utils.context import Context from airflow.utils.operator_helpers import context_to_airflow_vars +from airflow.utils.strings import to_boolean from cosmos.dbt.executable import get_system_dbt from cosmos.log import get_logger @@ -252,6 +253,26 @@ class DbtBuildMixin: base_cmd = ["build"] ui_color = "#8194E0" + template_fields: Sequence[str] = ("full_refresh",) + + def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None: + self.full_refresh = full_refresh + super().__init__(**kwargs) + + def add_cmd_flags(self) -> list[str]: + flags = [] + + if isinstance(self.full_refresh, str): + # Handle template fields when render_template_as_native_obj=False + full_refresh = to_boolean(self.full_refresh) + else: + full_refresh = self.full_refresh + + if full_refresh is True: + flags.append("--full-refresh") + + return flags + class DbtLSMixin: """ @@ -274,13 +295,20 @@ class DbtSeedMixin: template_fields: Sequence[str] = ("full_refresh",) - def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None: + def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None: self.full_refresh = full_refresh super().__init__(**kwargs) def add_cmd_flags(self) -> list[str]: flags = [] - if self.full_refresh is True: + + if isinstance(self.full_refresh, str): + # Handle template fields when render_template_as_native_obj=False + full_refresh = to_boolean(self.full_refresh) + else: + full_refresh = self.full_refresh + + if full_refresh is True: flags.append("--full-refresh") return flags @@ -306,13 +334,20 @@ class DbtRunMixin: template_fields: Sequence[str] = ("full_refresh",) - def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None: + def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None: self.full_refresh = full_refresh super().__init__(**kwargs) def add_cmd_flags(self) -> list[str]: flags = [] - if self.full_refresh is True: + + if isinstance(self.full_refresh, str): + # Handle template fields when render_template_as_native_obj=False + full_refresh = to_boolean(self.full_refresh) + else: + full_refresh = self.full_refresh + + if full_refresh is True: flags.append("--full-refresh") return flags diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 7d164dd4f4..302c3aa274 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -89,3 +89,34 @@ Sample usage "skip_exit_code": 1, } ) + + +Template fields +--------------- + +Some of the operator args are `template fields `_ for your convenience. + +These template fields can be useful for hooking into Airflow `Params `_, or for more advanced customization with `XComs `_. + +The following operator args support templating, and are accessible both through the ``DbtDag`` and ``DbtTaskGroup`` constructors in addition to being accessible standalone: + +- ``env`` +- ``vars`` +- ``full_refresh`` (for the ``build``, ``seed``, and ``run`` operators.) + +.. note:: + Using Jinja templating for ``env`` and ``vars`` may cause problems when using ``LoadMode.DBT_LS`` to render your DAG. + +The following template fields are only selectable when using the operators in a standalone context: + +- ``select`` +- ``exclude`` +- ``selector`` +- ``models`` + +The aforementioned args are not available to be templated via ``DbtDag`` and ``DbtTaskGroup`` because they need to select dbt nodes to render the DAG's tasks. +Since template fields are rendered on each ``DagRun``, + +Additionally, the SQL for compiled dbt models is stored in the template fields, which is viewable in the Airflow UI for each task run. +This is provided for telemetry on task execution, and is not an operator arg. +For more information about this, see the `Compiled SQL `_ docs. From 4cab6e6991c4a670a9b2cd5228d28e3efd55ba70 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Sun, 25 Feb 2024 13:11:26 -0500 Subject: [PATCH 05/15] fix typing --- cosmos/operators/docker.py | 2 ++ cosmos/operators/kubernetes.py | 2 ++ cosmos/operators/local.py | 2 ++ cosmos/operators/virtualenv.py | 2 +- 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 571bff046d..b9f8cf43c6 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -69,6 +69,8 @@ class DbtBuildDockerOperator(DbtBuildMixin, DbtDockerBaseOperator): Executes a dbt core build command. """ + template_fields: Sequence[str] = DbtDockerBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] + class DbtLSDockerOperator(DbtLSMixin, DbtDockerBaseOperator): """ diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index fc0e28c051..d36e4df7f2 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -103,6 +103,8 @@ class DbtBuildKubernetesOperator(DbtBuildMixin, DbtKubernetesBaseOperator): Executes a dbt core build command. """ + template_fields: Sequence[str] = DbtBuildKubernetesOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] + class DbtLSKubernetesOperator(DbtLSMixin, DbtKubernetesBaseOperator): """ diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index c6e72ee743..86e5bf1c34 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -392,6 +392,8 @@ class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator): Executes a dbt core build command. """ + template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] + class DbtLSLocalOperator(DbtLSMixin, DbtLocalBaseOperator): """ diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index bad88e2346..9196a4021f 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -99,7 +99,7 @@ def execute(self, context: Context) -> None: logger.info(output) -class DbtBuildVirtualenvOperator(DbtVirtualenvBaseOperator, DbtBuildLocalOperator): +class DbtBuildVirtualenvOperator(DbtVirtualenvBaseOperator, DbtBuildLocalOperator): # type: ignore[misc] """ Executes a dbt core build command within a Python Virtual Environment, that is created before running the dbt command and deleted just after. From bce8f2245e0660e551223e13493ccb3703bebd52 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Sun, 25 Feb 2024 13:20:27 -0500 Subject: [PATCH 06/15] fix --- cosmos/operators/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index d36e4df7f2..2291a9441a 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -103,7 +103,7 @@ class DbtBuildKubernetesOperator(DbtBuildMixin, DbtKubernetesBaseOperator): Executes a dbt core build command. """ - template_fields: Sequence[str] = DbtBuildKubernetesOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] + template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] class DbtLSKubernetesOperator(DbtLSMixin, DbtKubernetesBaseOperator): From 9704e7f62e871e40da6c8dec389612472540a7cf Mon Sep 17 00:00:00 2001 From: dwreeves Date: Wed, 28 Feb 2024 08:36:44 -0500 Subject: [PATCH 07/15] update tests --- tests/operators/test_local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index feaf6686e4..ea08b232e5 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -383,6 +383,7 @@ def test_store_compiled_sql() -> None: "operator_class,kwargs,expected_call_kwargs", [ (DbtSeedLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), + (DbtBuildLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), (DbtRunLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), ( DbtTestLocalOperator, From 4bba5b33fe2c1d9c269ba4c69b5e0effa89d4ac9 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Wed, 28 Feb 2024 09:56:40 -0500 Subject: [PATCH 08/15] fix something that leaked from another PR somehow --- cosmos/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 3c5dd6ddbc..3619fecf56 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -210,7 +210,7 @@ def __init__( execution_config = execution_config or ExecutionConfig() render_config = render_config or RenderConfig() - operator_args = operator_args.copy() if operator_args else {} + operator_args = operator_args or {} validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) From 6b4d9308fb0ee6121a9090993b222bcd2b081609 Mon Sep 17 00:00:00 2001 From: dwreeves Date: Wed, 28 Feb 2024 09:57:52 -0500 Subject: [PATCH 09/15] add test for template fields --- tests/operators/test_local.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 45b84162d3..c054cf4d19 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -659,6 +659,10 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo DbtRunLocalOperator, ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), ), + ( + DbtBuildLocalOperator, + ("env", "select", "exclude", "selector", "vars", "models", "compiled_sql", "full_refresh"), + ), ], ) def test_dbt_base_operator_template_fields(operator_class, expected_template): From eb6f27400603ef8352efddc38052369f927dda10 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 29 Feb 2024 22:41:25 +0000 Subject: [PATCH 10/15] Improve test coverage --- tests/operators/test_base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_base.py b/tests/operators/test_base.py index edaf8a8450..5761d66aa3 100644 --- a/tests/operators/test_base.py +++ b/tests/operators/test_base.py @@ -51,8 +51,10 @@ def test_dbt_mixin_base_cmd(dbt_command, dbt_operator_class): assert [dbt_command] == dbt_operator_class.base_cmd -@pytest.mark.parametrize("dbt_operator_class", [DbtSeedMixin, DbtRunMixin]) -@pytest.mark.parametrize("full_refresh, expected_flags", [(True, ["--full-refresh"]), (False, [])]) +@pytest.mark.parametrize("dbt_operator_class", [DbtSeedMixin, DbtRunMixin, DbtBuildMixin]) +@pytest.mark.parametrize( + "full_refresh, expected_flags", [("True", ["--full-refresh"]), (True, ["--full-refresh"]), (False, [])] +) def test_dbt_mixin_add_cmd_flags_full_refresh(full_refresh, expected_flags, dbt_operator_class): dbt_mixin = dbt_operator_class(full_refresh=full_refresh) flags = dbt_mixin.add_cmd_flags() From 173eee134f55fcf7f3587548189802a1bd568471 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 29 Feb 2024 22:47:41 +0000 Subject: [PATCH 11/15] Update docs/configuration/operator-args.rst --- docs/configuration/operator-args.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 302c3aa274..5e6921c71d 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -102,7 +102,7 @@ The following operator args support templating, and are accessible both through - ``env`` - ``vars`` -- ``full_refresh`` (for the ``build``, ``seed``, and ``run`` operators.) +- ``full_refresh`` (for the ``build``, ``seed``, and ``run`` operators since Cosmos 1.4.) .. note:: Using Jinja templating for ``env`` and ``vars`` may cause problems when using ``LoadMode.DBT_LS`` to render your DAG. From 790ceec8fa4df2585b8b7c60ab066685c752c55f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 29 Feb 2024 22:47:52 +0000 Subject: [PATCH 12/15] Update docs/configuration/operator-args.rst --- docs/configuration/operator-args.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 5e6921c71d..08b6a24dff 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -107,7 +107,7 @@ The following operator args support templating, and are accessible both through .. note:: Using Jinja templating for ``env`` and ``vars`` may cause problems when using ``LoadMode.DBT_LS`` to render your DAG. -The following template fields are only selectable when using the operators in a standalone context: +The following template fields are only selectable when using the operators in a standalone context (starting in Cosmos 1.4): - ``select`` - ``exclude`` From adaaae2e0f91d196755ca720dfe6308ef18e1384 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 29 Feb 2024 22:53:50 +0000 Subject: [PATCH 13/15] Update docs/configuration/operator-args.rst --- docs/configuration/operator-args.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 08b6a24dff..1ab5f4921b 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -114,7 +114,7 @@ The following template fields are only selectable when using the operators in a - ``selector`` - ``models`` -The aforementioned args are not available to be templated via ``DbtDag`` and ``DbtTaskGroup`` because they need to select dbt nodes to render the DAG's tasks. +Since Airflow resolves template fields during Airflow DAG execution and not DAG parsing, the args above cannot be templated via ``DbtDag`` and ``DbtTaskGroup`` because both need to select dbt nodes during DAG parsing. Since template fields are rendered on each ``DagRun``, Additionally, the SQL for compiled dbt models is stored in the template fields, which is viewable in the Airflow UI for each task run. From 5c9b9758625198ea834646144914ab9d3941bada Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 29 Feb 2024 22:54:00 +0000 Subject: [PATCH 14/15] Update docs/configuration/operator-args.rst --- docs/configuration/operator-args.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 1ab5f4921b..4bf2b64608 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -115,7 +115,6 @@ The following template fields are only selectable when using the operators in a - ``models`` Since Airflow resolves template fields during Airflow DAG execution and not DAG parsing, the args above cannot be templated via ``DbtDag`` and ``DbtTaskGroup`` because both need to select dbt nodes during DAG parsing. -Since template fields are rendered on each ``DagRun``, Additionally, the SQL for compiled dbt models is stored in the template fields, which is viewable in the Airflow UI for each task run. This is provided for telemetry on task execution, and is not an operator arg. From d44424b05fef4ffdf289744568baef9b0bd6f706 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 29 Feb 2024 22:54:10 +0000 Subject: [PATCH 15/15] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/configuration/operator-args.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 4bf2b64608..4e6a40b7fc 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -114,7 +114,7 @@ The following template fields are only selectable when using the operators in a - ``selector`` - ``models`` -Since Airflow resolves template fields during Airflow DAG execution and not DAG parsing, the args above cannot be templated via ``DbtDag`` and ``DbtTaskGroup`` because both need to select dbt nodes during DAG parsing. +Since Airflow resolves template fields during Airflow DAG execution and not DAG parsing, the args above cannot be templated via ``DbtDag`` and ``DbtTaskGroup`` because both need to select dbt nodes during DAG parsing. Additionally, the SQL for compiled dbt models is stored in the template fields, which is viewable in the Airflow UI for each task run. This is provided for telemetry on task execution, and is not an operator arg.