From 42b0de15bf16dfdc4bbf9ac6bb37996fa112fd4c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 14 Feb 2025 12:22:53 +0530 Subject: [PATCH 01/38] Release 1.9.0 --- CHANGELOG.rst | 21 +++++++++++++++++++-- cosmos/__init__.py | 2 +- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8044eade4a..ecf7eaa882 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.9.0a5 (2025-02-03) +1.9.0 (2025-02-14) -------------------- Breaking changes @@ -19,23 +19,40 @@ Features * Add structure to support multiple db for async operator execution by @pankajastro in #1483 * Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here `_. * Create and run accurate SQL statements when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1474 +* Add AWS ECS task run execution mode by @CarlosGitto and @aoelvp94 in #1507 +* Add support for running ``DbtSourceOperator`` individually by @victormacaubas in #1510 +* Add setup task for async executions by @pankajastro in #1518 +* Add teardown task for async executions by @pankajastro in #1529 Bug Fixes * Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466 +* Fix custom selector behaviour when the model name contains periods by @yakovlevvs and @60098727 in #1499 +* Filter dbt and non-dbt kwargs correctly for async operator by @pankajastro in #1526 Enhancement * Fix OpenLineage deprecation warning by @CorsettiS in #1449 * Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480 * Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501 +* Gracefully error when users set incompatible ``RenderConfig.dbt_deps`` and ``operator_args`` ``install_deps`` by @tatiana in #1505 +* Store compiled SQL as template field for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1534 +Docs + +* Improve ``RenderConfig`` arguments documentation by @tatiana in #1514 +* Improve callback documentation by @tatiana in #1516 +* Improve partial parsing docs by @tatiana in #1520 +* Fix typo in selecting & excluding docs by @pankajastro in #1523 Others * Ignore dbt package tests when running Cosmos tests by @tatiana in #1502 +* Refactor to consolidate async dbt adapter code by @pankajkoti in #1509 +* Log elapsed time for sql file(s) upload/download by @pankajastro in #1536 +* Remove the fallback operator for async task by @pankajastro in #1538 * GitHub Actions Dependabot: #1487 -* Pre-commit updates: #1473, #1493 +* Pre-commit updates: #1473, #1493, #1503, #1531 1.8.2 (2025-01-15) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 9410bffba3..2103fb1b8c 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.0a6" +__version__ = "1.9.0" from cosmos.airflow.dag import DbtDag From c5a49cd41b92ae082ab344d7ea470f8eeeba9417 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 14 Feb 2025 14:59:36 +0530 Subject: [PATCH 02/38] Update Changelog to include PR 1521 --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ecf7eaa882..bc3520af66 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -23,6 +23,7 @@ Features * Add support for running ``DbtSourceOperator`` individually by @victormacaubas in #1510 * Add setup task for async executions by @pankajastro in #1518 * Add teardown task for async executions by @pankajastro in #1529 +* Add ``ProjectConfig.install_dbt_deps`` & change operator ``install_deps=True`` as default by @tatiana in #1521 Bug Fixes From 3e6dd5d8b565f237ad05c0a21bb25988c4b10467 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 19 Feb 2025 16:17:03 +0530 Subject: [PATCH 03/38] Update CHANGELOG.rst Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bc3520af66..139b810366 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.9.0 (2025-02-14) +1.9.0 (2025-02-19) -------------------- Breaking changes From 696dcbf676daba3b063ede84d1677bb6c75d1abb Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 19 Feb 2025 16:21:14 +0530 Subject: [PATCH 04/38] Update Changelog to include PR 1544, 1545 --- CHANGELOG.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 139b810366..b6fc94e4e2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -24,6 +24,7 @@ Features * Add setup task for async executions by @pankajastro in #1518 * Add teardown task for async executions by @pankajastro in #1529 * Add ``ProjectConfig.install_dbt_deps`` & change operator ``install_deps=True`` as default by @tatiana in #1521 +* Extend Virtualenv operator and mock dbt adapters for setup & teardown tasks in ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1544 Bug Fixes @@ -45,6 +46,7 @@ Docs * Improve callback documentation by @tatiana in #1516 * Improve partial parsing docs by @tatiana in #1520 * Fix typo in selecting & excluding docs by @pankajastro in #1523 +* Document ``async_py_requirements`` added in ``ExecutionConfig`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1545 Others From e8fc621d2e0bfc1e62b2f6ec3c27185d5ead63f4 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 20 Feb 2025 12:09:11 +0530 Subject: [PATCH 05/38] Release 1.9.0 (#1539) Breaking changes * When using ``LoadMode.DBT_LS``, Cosmos will now attempt to use the ``dbtRunner`` as opposed to subprocess to run ``dbt ls``. While this represents significant performance improvements (half the vCPU usage and some memory consumption improvement), this may not work in scenarios where users had multiple Python virtual environments to manage different versions of dbt and its adaptors. In those cases, please, set ``RenderConfig(invocation_mode=InvocationMode.SUBPROCESS)`` to have the same behaviour Cosmos had in previous versions. Additional information `here `_ and `here `_. Features * Use ``dbtRunner`` in the DAG Processor when using ``LoadMode.DBT_LS`` if ``dbt-core`` is available by @tatiana in #1484. Additional information `here `_. * Allow users to opt-out of ``dbtRunner`` during DAG parsing with ``InvocationMode.SUBPROCESS`` by @tatiana in #1495. Check out the `documentation `_. * Add structure to support multiple db for async operator execution by @pankajastro in #1483 * Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here `_. * Create and run accurate SQL statements when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1474 * Add AWS ECS task run execution mode by @CarlosGitto and @aoelvp94 in * Add support for running ``DbtSourceOperator`` individually by @victormacaubas in #1510 * Add setup task for async executions by @pankajastro in #1518 * Add teardown task for async executions by @pankajastro in #1529 * Add ``ProjectConfig.install_dbt_deps`` & change operator ``install_deps=True`` as default by @tatiana in #1521 * Extend Virtualenv operator and mock dbt adapters for setup & teardown tasks in ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1544 Bug Fixes * Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466 * Fix custom selector behaviour when the model name contains periods by @yakovlevvs and @60098727 in #1499 * Filter dbt and non-dbt kwargs correctly for async operator by @pankajastro in #1526 Enhancement * Fix OpenLineage deprecation warning by @CorsettiS in #1449 * Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480 * Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501 * Gracefully error when users set incompatible ``RenderConfig.dbt_deps`` and ``operator_args`` ``install_deps`` by @tatiana in #1505 * Store compiled SQL as template field for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1534 Docs * Improve ``RenderConfig`` arguments documentation by @tatiana in #1514 * Improve callback documentation by @tatiana in #1516 * Improve partial parsing docs by @tatiana in #1520 * Fix typo in selecting & excluding docs by @pankajastro in #1523 * Document ``async_py_requirements`` added in ``ExecutionConfig`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1545 Others * Ignore dbt package tests when running Cosmos tests by @tatiana in * Refactor to consolidate async dbt adapter code by @pankajkoti in #1509 * Log elapsed time for sql file(s) upload/download by @pankajastro in * Remove the fallback operator for async task by @pankajastro in #1538 * GitHub Actions Dependabot: #1487 * Pre-commit updates: #1473, #1493, #1503, #1531 (cherry picked from commit c7de602f2557ca8919dd3551c8080e15aa39d543) From f5d874f54ff034543239e71b67fce0203f170b1e Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 21 Feb 2025 12:26:08 +0530 Subject: [PATCH 06/38] Fix import error in dbt bigquery adapter mock for `dbt-bigquery<1.8` for `ExecutionMode.AIRFLOW_ASYNC` (#1548) A user has reported after testing the `astronomer-cosmos==1.9.0a5` that they are getting the below error ``` from dbt_common.clients.agate_helper import empty_table ModuleNotFoundError: No module named 'dbt_common' ``` They are using `dbt-bigquery==1.7.2` Upon debugging, I observed that the `dbt_common` module that we rely on in the current mocking interface is available only in dbt bigquery adapterversion >= 1.8. For previous versions, to achieve the same, the helper seems to be available in `dbt.clients`. I tested this on dbt-bigquery versions 1.5, 1.6, 1.7 and 1.7.2 and the fix in this PR seems to solve the issue. closes: #1547 (cherry picked from commit 30019c6d4d7ff73114096e5d27ee446c36c66ab9) --- CHANGELOG.rst | 8 ++++++++ cosmos/__init__.py | 2 +- cosmos/operators/_asynchronous/bigquery.py | 6 +++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b6fc94e4e2..faa4468f66 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,14 @@ Changelog ========= +1.9.1a1 (2025-02-20) +-------------------- + +Bug Fixes + +* Fix import error in dbt bigquery adapter mock for ``dbt-bigquery<1.8`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1548 + + 1.9.0 (2025-02-19) -------------------- diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 2103fb1b8c..ceb5fce7e0 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.0" +__version__ = "1.9.1a1" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 72e2856d96..e8879b0fe7 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -28,7 +28,11 @@ def _mock_bigquery_adapter() -> None: import agate from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager - from dbt_common.clients.agate_helper import empty_table + + try: + from dbt_common.clients.agate_helper import empty_table + except (ModuleNotFoundError, ImportError): # pragma: no cover + from dbt.clients.agate_helper import empty_table def execute( # type: ignore[no-untyped-def] self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None From 62059719d8108e65a76375295e24ebe4f5d0c335 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 25 Feb 2025 10:21:59 +0000 Subject: [PATCH 07/38] =?UTF-8?q?=E2=AC=86=20[pre-commit.ci]=20pre-commit?= =?UTF-8?q?=20autoupdate=20(#1560)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.9.6 → v0.9.7](https://github.com/astral-sh/ruff-pre-commit/compare/v0.9.6...v0.9.7) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> (cherry picked from commit 4f793f159cd018b0c6ab978c05c450f7cc751d71) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 745cfda82e..db9ba4d868 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -57,7 +57,7 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.6 + rev: v0.9.7 hooks: - id: ruff args: From 70142eb27dc8b4a3fddb9d105e3858d1ebb62354 Mon Sep 17 00:00:00 2001 From: "jj.lee" <63435794+jx2lee@users.noreply.github.com> Date: Wed, 26 Feb 2025 18:05:03 +0900 Subject: [PATCH 08/38] Improve MWAA getting-started docs by removing unused imports (#1562) (cherry picked from commit 8ef378df0b584b40fbcc62742e99015e42f0c8ff) --- docs/getting_started/mwaa.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/getting_started/mwaa.rst b/docs/getting_started/mwaa.rst index 8555cde344..7c4dc88ad2 100644 --- a/docs/getting_started/mwaa.rst +++ b/docs/getting_started/mwaa.rst @@ -91,7 +91,6 @@ In your ``my_cosmos_dag.py`` file, import the ``DbtDag`` class from Cosmos and c from datetime import datetime from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig from cosmos.profiles import PostgresUserPasswordProfileMapping - from cosmos.constants import ExecutionMode profile_config = ProfileConfig( profile_name="default", From 922deebce358aa07d31c69f6e11b60ca7760e2f6 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 27 Feb 2025 19:47:44 +0530 Subject: [PATCH 09/38] Disable `example_cosmos_dbt_build.py` DAG in CI (#1567) closes: https://github.com/astronomer/astronomer-cosmos/issues/1564 **Fix unit test error** ``` tests/operators/_asynchronous/test_bigquery.py:6: in from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator ../../../.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.12-2.9/lib/python3.12/site-packages/airflow/providers/google/cloud/operators/bigquery.py:32: in from airflow.providers.common.sql.operators.sql import ( # type: ignore[attr-defined] # for _parse_boolean ../../../.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.12-2.9/lib/python3.12/site-packages/airflow/providers/common/sql/operators/sql.py:29: in from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler, return_single_query_results ../../../.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.12-2.9/lib/python3.12/site-packages/airflow/providers/common/sql/hooks/sql.py:37: in from methodtools import lru_cache E ModuleNotFoundError: No module named 'methodtools' ``` **Disable DAG example_cosmos_dbt_build.py in CI because of error** ``` [2025-02-26 13:13:41,578] {taskinstance.py:1851} ERROR - Task failed with exception Traceback (most recent call last): File "/home/runner/work/astronomer-cosmos/astronomer-cosmos/cosmos/operators/base.py", line 278, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/home/runner/work/astronomer-cosmos/astronomer-cosmos/cosmos/operators/local.py", line 708, in build_and_run_cmd result = self.run_command( File "/home/runner/work/astronomer-cosmos/astronomer-cosmos/cosmos/operators/local.py", line 556, in run_command self.handle_exception(result) File "/home/runner/work/astronomer-cosmos/astronomer-cosmos/cosmos/operators/local.py", line 229, in handle_exception_dbt_runner return dbt_runner.handle_exception_if_needed(result) File "/home/runner/work/astronomer-cosmos/astronomer-cosmos/cosmos/dbt/runner.py", line 113, in handle_exception_if_needed raise CosmosDbtRunError(f"dbt invocation completed with errors: {error_message}") cosmos.exceptions.CosmosDbtRunError: dbt invocation completed with errors: relationships_orders_customer_id__customer_id__ref_customers_: Database Error in test relationships_orders_customer_id__customer_id__ref_customers_ (models/schema.yml) relation "public.orders" does not exist LINE 13: from "***"."public"."orders" ^ compiled code at target/run/altered_jaffle_shop/models/schema.yml/relationships_orders_customer_id__customer_id__ref_customers_.sql ``` Created a follow-up issue: https://github.com/astronomer/astronomer-cosmos/issues/1568 to enable DAG example_cosmos_dbt_build.py (cherry picked from commit 8630cae34dba698e4ded05051c97a5a53383f52e) --- dev/dags/example_cosmos_dbt_build.py | 3 +-- pyproject.toml | 1 + tests/test_example_dags.py | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dev/dags/example_cosmos_dbt_build.py b/dev/dags/example_cosmos_dbt_build.py index 57ad8340f2..1016d62129 100644 --- a/dev/dags/example_cosmos_dbt_build.py +++ b/dev/dags/example_cosmos_dbt_build.py @@ -6,8 +6,7 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import TestBehavior +from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig, TestBehavior from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" diff --git a/pyproject.toml b/pyproject.toml index 58262f1bd8..79b7590460 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -153,6 +153,7 @@ dependencies = [ "types-python-dateutil", "Werkzeug<3.0.0", "dbt-core", + "methodtools", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 762985b59d..d858812795 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -81,6 +81,9 @@ def get_dag_bag() -> DagBag: if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) + # TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/1568 + file.writelines("example_cosmos_dbt_build.py\n") + print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From 9c1c3ac59e802deea7b3d52edfc067c30dfd0dee Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 27 Feb 2025 14:18:09 +0000 Subject: [PATCH 10/38] Upgrade GitHub Actions Ubuntu version (#1561) The Ubuntu 20.04 Actions runner image will begin deprecation on 2025-02-01 and will be fully unsupported by 2025-04-01: https://github.com/actions/runner-images/issues/11101 (cherry picked from commit 7df2fde702344279fcb31d0864d733d27e64491e) --- .github/workflows/docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 4ce9477265..86fb351dce 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -7,7 +7,7 @@ on: jobs: pages: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest environment: name: github-pages url: ${{ steps.deployment.outputs.page_url }} From 167f23bc6a8c8a7d3142dca3f427c92a22822860 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Malo=20Jaffr=C3=A9?= <8469951+ghjklw@users.noreply.github.com> Date: Thu, 27 Feb 2025 18:58:58 +0100 Subject: [PATCH 11/38] Bugfix `operator_args` override configuration (#1558) The function `cosmos.converter.override_configuration` had a small logical issue: the condition to override `install_deps` could never be reached. This seems to be the root cause of #1557 Closes #1557 (cherry picked from commit dfbef5c0b85e07958e757a04ed9a8a5290932ab2) --- cosmos/converter.py | 2 +- tests/test_converter.py | 57 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 7c917a0225..b0716a01bb 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -223,7 +223,7 @@ def override_configuration( if execution_config.invocation_mode: operator_args["invocation_mode"] = execution_config.invocation_mode - if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV): + if execution_config.execution_mode in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV): if "install_deps" not in operator_args: operator_args["install_deps"] = project_config.install_dbt_deps diff --git a/tests/test_converter.py b/tests/test_converter.py index 6b7d9181ff..8e1b724e03 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -96,9 +96,11 @@ def test_validate_initial_user_config_expects_profile(execution_mode): assert validate_initial_user_config(execution_config, profile_config, project_config, None, {}) is None -@pytest.mark.parametrize("operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}]) +@pytest.mark.parametrize( + "operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}, {"install_deps": {"key": "value"}}] +) def test_validate_user_config_operator_args_deprecated(operator_args): - """Deprecating warnings should be raised when using operator_args with "vars" or "env".""" + """Deprecating warnings should be raised when using operator_args with "vars", "env" or "install_deps".""" project_config = ProjectConfig() execution_config = ExecutionConfig() render_config = RenderConfig() @@ -709,6 +711,57 @@ def test_validate_converter_fetches_project_name_from_render_config( assert mock_build_airflow_graph.call_args.kwargs["dbt_project_name"] == "project1" +@pytest.mark.parametrize( + "execution_mode,operator_args,install_dbt_deps,expected", + [ + (ExecutionMode.KUBERNETES, {}, False, None), + (ExecutionMode.LOCAL, {}, False, False), + (ExecutionMode.VIRTUALENV, {}, False, False), + (ExecutionMode.LOCAL, {}, True, True), + (ExecutionMode.VIRTUALENV, {}, True, True), + (ExecutionMode.KUBERNETES, {"install_deps": True}, False, True), + (ExecutionMode.LOCAL, {"install_deps": True}, False, True), + (ExecutionMode.VIRTUALENV, {"install_deps": True}, False, True), + ], +) +@patch("cosmos.config.ProjectConfig.validate_project") +@patch("cosmos.converter.validate_initial_user_config") +@patch("cosmos.converter.DbtGraph") +@patch("cosmos.converter.build_airflow_graph") +def test_project_config_install_dbt_deps_overrides_operator_args( + mock_build_airflow_graph, + mock_user_config, + mock_dbt_graph, + mock_validate_project, + execution_mode, + operator_args, + install_dbt_deps, + expected, +): + """Tests that the value project_config.install_dbt_deps is used to define operator_args["install_deps"] if + execution mode is ExecutionMode.LOCAL or ExecutionMode.VIRTUALENV and operator_args["install_deps"] is not + already defined. + """ + project_config = ProjectConfig(project_name="fake-project", dbt_project_path="/some/project/path") + project_config.install_dbt_deps = install_dbt_deps + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = MagicMock() + profile_config = MagicMock() + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + _, kwargs = mock_build_airflow_graph.call_args + + assert kwargs["task_args"].get("install_deps", None) == expected + + @pytest.mark.parametrize("invocation_mode", [None, InvocationMode.SUBPROCESS, InvocationMode.DBT_RUNNER]) @patch("cosmos.config.ProjectConfig.validate_project") @patch("cosmos.converter.validate_initial_user_config") From 9ed6502311f7fcd9df126290b0bc3be29f7de479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Malo=20Jaffr=C3=A9?= <8469951+ghjklw@users.noreply.github.com> Date: Fri, 28 Feb 2025 01:32:27 +0100 Subject: [PATCH 12/38] Bugfix ProjectConfig install_dbt_deps (#1556) `install_dbt_deps` is missing from the `ProjectConfig` `__init__` method, which is inconsistent with the [documentation](https://astronomer.github.io/astronomer-cosmos/configuration/project-config.html) and does not make sense. Closes: #1555 (cherry picked from commit 0811e46d80814a3e86cbb87bfe4623ce592e540d) --- cosmos/config.py | 3 +++ tests/test_config.py | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/cosmos/config.py b/cosmos/config.py index 0fe17ce6f3..bd302b2853 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -152,6 +152,7 @@ class ProjectConfig: :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to snapshots :param manifest_path: The absolute path to the dbt manifest file. Defaults to None + :param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None :param project_name: Allows the user to define the project name. Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path. :param env_vars: Dictionary of environment variables that are used for both rendering and execution. Rendering with @@ -175,6 +176,7 @@ class ProjectConfig: def __init__( self, dbt_project_path: str | Path | None = None, + install_dbt_deps: bool = True, models_relative_path: str | Path = "models", seeds_relative_path: str | Path = "seeds", snapshots_relative_path: str | Path = "snapshots", @@ -228,6 +230,7 @@ def __init__( self.env_vars = env_vars self.dbt_vars = dbt_vars self.partial_parse = partial_parse + self.install_dbt_deps = install_dbt_deps def validate_project(self) -> None: """ diff --git a/tests/test_config.py b/tests/test_config.py index d557dd4fc2..850d6358f5 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -28,6 +28,15 @@ def test_init_with_project_path_only(): assert project_config.snapshots_path == Path("path/to/dbt/project/snapshots") assert project_config.project_name == "project" assert project_config.manifest_path is None + assert project_config.install_dbt_deps is True + + +def test_init_with_project_path_and_install_dbt_deps_succeeds(): + """ + Passing only dbt_project_path and install_dbt_deps should succeed and set install_dbt_deps to the value defined + """ + project_config = ProjectConfig(dbt_project_path="path/to/dbt/project", install_dbt_deps=False) + assert project_config.install_dbt_deps is False def test_init_with_manifest_path_and_project_path_succeeds(): From 9b18724609b7256d70fc56696512aa715cb4aaf5 Mon Sep 17 00:00:00 2001 From: AlexandrKhabarov <38005223+AlexandrKhabarov@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:09:38 +0300 Subject: [PATCH 13/38] Fix dbt project parsing `dbt_vars` behavior (#1543) `DbtToAirflowConverter` can pass dbt_vars to DbtGraph with the help of ProjectConfig or operator_args. If operator_args is used in `DbtToAirflowConverter` then it will lead to the issue with absence of dbt_vars in dbt ls command (faced rendering issue during usage of cosmos with project level variables) (cherry picked from commit 7016dd509ff2aee3d1c9edaa945d9886034b97a9) --- cosmos/dbt/graph.py | 4 +-- tests/dbt/test_graph.py | 62 ++++++++++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index a9291445ac..36da9b6e9d 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -376,8 +376,8 @@ def _add_vars_arg(self, cmd_args: list[str]) -> None: """ Change args list in-place so they include dbt vars, if they are set. """ - if self.project.dbt_vars: - cmd_args.extend(["--vars", json.dumps(self.project.dbt_vars, sort_keys=True)]) + if self.dbt_vars: + cmd_args.extend(["--vars", json.dumps(self.dbt_vars, sort_keys=True)]) @cached_property def dbt_ls_args(self) -> list[str]: diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4e8b896519..24b2a8bb83 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1508,8 +1508,7 @@ def test_load_via_dbt_ls_project_config_dbt_vars( """Tests that the dbt ls command in the subprocess has "--vars" with the project config dbt_vars.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 - dbt_vars = {"my_var1": "my_value1", "my_var2": "my_value2"} - project_config = ProjectConfig(dbt_vars=dbt_vars) + project_config = ProjectConfig(dbt_vars={"my_var1": "my_value1", "my_var2": "my_value2"}) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, @@ -1528,8 +1527,42 @@ def test_load_via_dbt_ls_project_config_dbt_vars( ) dbt_graph.load_via_dbt_ls() ls_command = mock_popen.call_args.args[0] + assert "--vars" not in ls_command + + +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) +@patch("cosmos.dbt.graph.Popen") +@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") +@patch("cosmos.config.RenderConfig.validate_dbt_command") +@patch.dict(sys.modules, {"dbt.cli.main": None}) +def test_load_via_dbt_ls_dbt_graph_dbt_vars( + mock_validate, mock_update_nodes, mock_popen, mock_use_case, tmp_dbt_project_dir +): + """Tests that the dbt ls command in the subprocess has "--vars" with the DbtGraph dbt_vars.""" + mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 + dbt_vars = {"my_var3": "my_value3"} + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + project=ProjectConfig(), + render_config=render_config, + execution_config=execution_config, + profile_config=profile_config, + dbt_vars=dbt_vars, + ) + dbt_graph.load_via_dbt_ls() + ls_command = mock_popen.call_args.args[0] assert "--vars" in ls_command - assert ls_command[ls_command.index("--vars") + 1] == '{"my_var1": "my_value1", "my_var2": "my_value2"}' + assert ls_command[ls_command.index("--vars") + 1] == json.dumps(dbt_vars, sort_keys=True) @patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @@ -1721,21 +1754,23 @@ def test_project_path_fails(): @pytest.mark.parametrize( - "render_config,project_config,expected_dbt_ls_args", + "render_config,project_config,dbt_vars,expected_dbt_ls_args", [ - (RenderConfig(), ProjectConfig(), []), - (RenderConfig(exclude=["package:snowplow"]), ProjectConfig(), ["--exclude", "package:snowplow"]), + (RenderConfig(), ProjectConfig(), None, []), + (RenderConfig(exclude=["package:snowplow"]), ProjectConfig(), None, ["--exclude", "package:snowplow"]), ( RenderConfig(select=["tag:prod", "config.materialized:incremental"]), ProjectConfig(), + None, ["--select", "tag:prod", "config.materialized:incremental"], ), - (RenderConfig(selector="nightly"), ProjectConfig(), ["--selector", "nightly"]), - (RenderConfig(), ProjectConfig(dbt_vars={"a": 1}), ["--vars", '{"a": 1}']), - (RenderConfig(), ProjectConfig(partial_parse=False), ["--no-partial-parse"]), + (RenderConfig(selector="nightly"), ProjectConfig(), None, ["--selector", "nightly"]), + (RenderConfig(), ProjectConfig(dbt_vars={"a": 1}), {"k": "v"}, ["--vars", '{"k": "v"}']), + (RenderConfig(), ProjectConfig(partial_parse=False), None, ["--no-partial-parse"]), ( RenderConfig(exclude=["1", "2"], select=["a", "b"], selector="nightly"), ProjectConfig(dbt_vars={"a": 1}, partial_parse=False), + {"k": "v"}, [ "--exclude", "1", @@ -1744,7 +1779,7 @@ def test_project_path_fails(): "a", "b", "--vars", - '{"a": 1}', + '{"k": "v"}', "--selector", "nightly", "--no-partial-parse", @@ -1752,10 +1787,11 @@ def test_project_path_fails(): ), ], ) -def test_dbt_ls_args(render_config, project_config, expected_dbt_ls_args): +def test_dbt_ls_args(render_config, project_config, dbt_vars, expected_dbt_ls_args): graph = DbtGraph( project=project_config, render_config=render_config, + dbt_vars=dbt_vars, ) assert graph.dbt_ls_args == expected_dbt_ls_args @@ -1768,8 +1804,8 @@ def test_dbt_ls_cache_key_args_sorts_envvars(): @patch("cosmos.dbt.graph.run_command") def test_run_dbt_deps(run_command_mock): - project_config = ProjectConfig(dbt_vars={"var-key": "var-value"}) - graph = DbtGraph(project=project_config) + project_config = ProjectConfig() + graph = DbtGraph(project=project_config, dbt_vars={"var-key": "var-value"}) graph.local_flags = [] graph.run_dbt_deps("dbt", "/some/path", {}) run_command_mock.assert_called_with( From 490821bc14eb9803d4e91d9a097f10aa8b9d121d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Mar 2025 21:21:27 +0000 Subject: [PATCH 14/38] =?UTF-8?q?=E2=AC=86=20[pre-commit.ci]=20pre-commit?= =?UTF-8?q?=20autoupdate=20(#1583)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.9.7 → v0.9.9](https://github.com/astral-sh/ruff-pre-commit/compare/v0.9.7...v0.9.9) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> (cherry picked from commit 08b85b6103ad3c09dcb976d63bdb2ce34a8ad64a) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index db9ba4d868..9f2a3a50cc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -57,7 +57,7 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.7 + rev: v0.9.9 hooks: - id: ruff args: From ad2d2ba26197c31ccea0e10da38851d80e94faea Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Tue, 4 Mar 2025 15:13:11 +0530 Subject: [PATCH 15/38] Update the GH bug issue template (#1586) Add the missing Execution mode in bug template (cherry picked from commit b15c5c8af818375c1e4bdee7d3f08f8469f5b5e9) --- .github/ISSUE_TEMPLATE/01-bug.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/ISSUE_TEMPLATE/01-bug.yml b/.github/ISSUE_TEMPLATE/01-bug.yml index ad663a79e0..2765661b89 100644 --- a/.github/ISSUE_TEMPLATE/01-bug.yml +++ b/.github/ISSUE_TEMPLATE/01-bug.yml @@ -52,9 +52,12 @@ body: label: ExecutionMode description: Which ExecutionMode are you using? options: + - "AIRFLOW_ASYNC" + - "AWS_ECS" - "AWS_EKS" - "AZURE_CONTAINER_INSTANCE" - "DOCKER" + - "GCP_CLOUD_RUN_JOB" - "KUBERNETES" - "LOCAL" - "VIRTUALENV" From 970604f97052acd515925e17086b437fa006156a Mon Sep 17 00:00:00 2001 From: Joppe Vos <44348300+joppevos@users.noreply.github.com> Date: Tue, 4 Mar 2025 11:10:29 +0100 Subject: [PATCH 16/38] Avoid reading the connection during DAG parsing of the async BigQuery operator (#1582) Removes unused Bigquery async arguments that trigger a validation of the profile at parsing time, which we want to avoid. Closes #1581 (cherry picked from commit 46912b6db088d096103e8b0a80c32ee77179203b) --- cosmos/operators/_asynchronous/bigquery.py | 15 +++++++++------ tests/operators/_asynchronous/test_bigquery.py | 10 ++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index e8879b0fe7..1d28f5da3e 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -73,9 +73,6 @@ def __init__( self.project_dir = project_dir self.profile_config = profile_config self.gcp_conn_id = self.profile_config.profile_mapping.conn_id # type: ignore - profile = self.profile_config.profile_mapping.profile # type: ignore - self.gcp_project = profile["project"] - self.dataset = profile["dataset"] self.extra_context = extra_context or {} self.configuration: dict[str, Any] = {} self.dbt_kwargs = dbt_kwargs or {} @@ -103,6 +100,8 @@ def __init__( self.async_context["profile_type"] = self.profile_config.get_profile_type() self.async_context["async_operator"] = BigQueryInsertJobOperator self.compiled_sql = "" + self.gcp_project = "" + self.dataset = "" @property def base_cmd(self) -> list[str]: @@ -145,10 +144,10 @@ def execute(self, context: Context, **kwargs: Any) -> None: super().execute(context=context) else: self.build_and_run_cmd(context=context, run_as_async=True, async_context=self.async_context) - self._store_compiled_sql(context=context) + self._store_template_fields(context=context) @provide_session - def _store_compiled_sql(self, context: Context, session: Session = NEW_SESSION) -> None: + def _store_template_fields(self, context: Context, session: Session = NEW_SESSION) -> None: from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.taskinstance import TaskInstance @@ -159,6 +158,10 @@ def _store_compiled_sql(self, context: Context, session: Session = NEW_SESSION) self.log.debug("Executed SQL is: %s", sql) self.compiled_sql = sql + profile = self.profile_config.profile_mapping.profile + self.gcp_project = profile["project"] + self.dataset = profile["dataset"] + # need to refresh the rendered task field record in the db because Airflow only does this # before executing the task, not after ti = context["ti"] @@ -188,5 +191,5 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any: """ job_id = super().execute_complete(context=context, event=event) self.log.info("Configuration is %s", str(self.configuration)) - self._store_compiled_sql(context=context) + self._store_template_fields(context=context) return job_id diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index f339c98805..40717cd62c 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -38,8 +38,6 @@ def test_dbt_run_airflow_async_bigquery_operator_init(profile_config_mock): assert operator.project_dir == "/path/to/project" assert operator.profile_config == profile_config_mock assert operator.gcp_conn_id == "google_cloud_default" - assert operator.gcp_project == "test_project" - assert operator.dataset == "test_dataset" def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock): @@ -134,15 +132,19 @@ def test_store_compiled_sql(mock_rendered_ti, mock_get_remote_sql, profile_confi mock_task_instance.task = operator mock_context = {"ti": mock_task_instance} - operator._store_compiled_sql(mock_context, session=mock_session) + operator._store_template_fields(mock_context, session=mock_session) + # check if gcp_project and dataset are set after the tasks gets executed assert operator.compiled_sql == "SELECT * FROM test_table;" + assert operator.dataset == "test_dataset" + assert operator.gcp_project == "test_project" + mock_rendered_ti.assert_called_once() mock_session.add.assert_called_once() mock_session.query().filter().delete.assert_called_once() -@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator._store_compiled_sql") +@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator._store_template_fields") def test_execute_complete(mock_store_sql, profile_config_mock): mock_context = Mock() mock_event = {"job_id": "test_job"} From 708217abddc254b2d89b9eaabdf8b9d71ea427d0 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 5 Mar 2025 00:02:42 +0530 Subject: [PATCH 17/38] Enable DAG example_cosmos_dbt_build.py in CI (#1573) closes: https://github.com/astronomer/astronomer-cosmos/issues/1568 (cherry picked from commit e31fc5b4f2b98b6792ec84bf14aca63a32ac7e95) --- tests/test_example_dags.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index d858812795..d0671ec952 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -81,8 +81,8 @@ def get_dag_bag() -> DagBag: if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) - # TODO: Fix https://github.com/astronomer/astronomer-cosmos/issues/1568 - file.writelines("example_cosmos_dbt_build.py\n") + if AIRFLOW_VERSION < Version("2.8.0"): + file.writelines("example_cosmos_dbt_build.py\n") print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) From ac92243a5ea99cbe2c84ec1812cb12b0b8d5489d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 11 Mar 2025 17:00:31 +0000 Subject: [PATCH 18/38] Fix: Workaround to incorrectly raised `gcsfs.retry.HttpError` (Invalid Credentials, 401) (#1598) Workaround to https://github.com/fsspec/gcsfs/issues/664 Since upgrading to `gcsfs==2025.3.0` from `2025.2.0`, we started facing this issue: ``` File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper return sync(self.loop, func, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync raise return_result File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner result[0] = await coro ^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/fsspec/asyn.py", line 696, in _exists await self._info(path, **kwargs) File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/core.py", line 1024, in _info exact = await self._get_object(path) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/core.py", line 557, in _get_object res = await self._call( ^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/core.py", line 477, in _call status, headers, info, contents = await self._request( ^^^^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/decorator.py", line 224, in fun return await caller(func, *(extras + args), **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/retry.py", line 165, in retry_request raise e File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/retry.py", line 135, in retry_request return await func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/core.py", line 461, in _request headers=self._get_headers(headers), ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/core.py", line 438, in _get_headers self.credentials.apply(out) File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/credentials.py", line 212, in apply self.maybe_refresh() File "/home/runner/.local/share/hatch/env/virtual/astronomer-cosmos/Za_bFbg4/tests.py3.11-2.9/lib/python3.11/site-packages/gcsfs/credentials.py", line 203, in maybe_refresh raise HttpError( gcsfs.retry.HttpError: Invalid Credentials, 401 ``` When I use the same credentials with `2025.2.0` things work as expected. This problem was spotted while using Apache Airflow in our CI: https://github.com/astronomer/astronomer-cosmos/actions/runs/13772013607/job/38566202965?pr=1596 We used this script to generate the credentials that work: ``` import json import urllib.parse with open("/Users/tati//Downloads/astronomer-dag-authoring-121145ad8a5a.json", "r") as file: json_content = json.load(file) url_encoded_content = urllib.parse.quote(json.dumps(json_content)) print(url_encoded_content) print(f'google-cloud-platform://?keyfile_dict={url_encoded_content}&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform') ``` (cherry picked from commit b04717ce5f40ecce5782f45924ca5e4e4ffccc10) --- pyproject.toml | 4 +++- scripts/test/pre-install-airflow.sh | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 79b7590460..122e06da28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,7 +74,9 @@ openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"] amazon = [ "apache-airflow-providers-amazon[s3fs]>=3.0.0", ] -google = ["apache-airflow-providers-google>=10.17.0"] + +# Due to issue https://github.com/fsspec/gcsfs/issues/664 +google = ["apache-airflow-providers-google>=10.17.0", "gcsfs<2025.3.0"] microsoft = ["apache-airflow-providers-microsoft-azure>=8.5.0"] all = [ "astronomer-cosmos[dbt-all]", diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index b9ee15eba9..edc87c9dc0 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -30,6 +30,9 @@ uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.t uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt +# Due to issue https://github.com/fsspec/gcsfs/issues/664 +uv pip install "gcsfs<2025.3.0" + if [ "$AIRFLOW_VERSION" = "2.4" ] || [ "$AIRFLOW_VERSION" = "2.5" ] || [ "$AIRFLOW_VERSION" = "2.6" ] ; then uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" From 4c4221add898b72d78ee3f33c516e3e0e290d7a4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 11 Mar 2025 23:09:00 +0530 Subject: [PATCH 19/38] =?UTF-8?q?=E2=AC=86=20[pre-commit.ci]=20pre-commit?= =?UTF-8?q?=20autoupdate=20(#1596)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.9.9 → v0.9.10](https://github.com/astral-sh/ruff-pre-commit/compare/v0.9.9...v0.9.10) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> (cherry picked from commit dd8b6c7f6fc220d90366370b8241816d98211571) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9f2a3a50cc..2371a05dcd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -57,7 +57,7 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.9 + rev: v0.9.10 hooks: - id: ruff args: From a9b8dfab791583d5f67ff62b37224fb2482f2951 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 12 Mar 2025 19:33:14 +0530 Subject: [PATCH 20/38] Fix the async execution mode read sql files for dbt packages (#1588) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit closes: https://github.com/astronomer/astronomer-cosmos/issues/1585 This PR modifies the DbtNode to include the packages, allowing us to correctly construct the path when reading the generated SQL files. In DBT projects with dbt_packages, the dbt run command generates SQL files within the respective dbt_packages folder inside the target/run directory, instead of the main project folder. Screenshot 2025-03-06 at 12 01 51 AM **With Setup task** Screenshot 2025-03-06 at 12 02 58 AM **Without Setup task** Screenshot 2025-03-06 at 12 03 34 AM (cherry picked from commit b309dac26f0afe53b8d0f0caec5ea9c667c6dcc1) --- cosmos/__init__.py | 2 +- cosmos/airflow/graph.py | 1 + cosmos/dbt/graph.py | 9 ++++++++- cosmos/operators/local.py | 5 +++-- tests/airflow/test_graph.py | 2 ++ tests/dbt/test_graph.py | 9 ++++++--- tests/operators/test_local.py | 18 +++++++++++++++++- 7 files changed, 38 insertions(+), 8 deletions(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index ceb5fce7e0..57de6fa53f 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.1a1" +__version__ = "1.9.1a2" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f96d3ba2a7..4a21bb68f4 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -267,6 +267,7 @@ def create_task_metadata( extra_context: dict[str, Any] = { "dbt_node_config": node.context_dict, "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, + "package_name": node.package_name, } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 36da9b6e9d..e2faf6578d 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -64,6 +64,7 @@ class DbtNode: resource_type: DbtResourceType depends_on: list[str] file_path: Path + package_name: str | None = None tags: list[str] = field(default_factory=lambda: []) config: dict[str, Any] = field(default_factory=lambda: {}) has_freshness: bool = False @@ -279,12 +280,17 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, except json.decoder.JSONDecodeError: logger.debug("Skipped dbt ls line: %s", line) else: + base_path = ( + project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore + ) + try: node = DbtNode( unique_id=node_dict["unique_id"], + package_name=node_dict.get("package_name"), resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=project_path / node_dict["original_file_path"], + file_path=base_path / node_dict["original_file_path"], tags=node_dict.get("tags", []), config=node_dict.get("config", {}), has_freshness=( @@ -821,6 +827,7 @@ def load_from_dbt_manifest(self) -> None: for unique_id, node_dict in resources.items(): node = DbtNode( unique_id=unique_id, + package_name=node_dict.get("package_name"), resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 21fa6ae915..71118cec15 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -406,8 +406,9 @@ def _cache_package_lockfile(self, tmp_project_dir: Path) -> None: _copy_cached_package_lockfile_to_project(latest_package_lockfile, tmp_project_dir) def _read_run_sql_from_target_dir(self, tmp_project_dir: str, sql_context: dict[str, Any]) -> str: - sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(str(self.project_dir))[-1].lstrip("/") - run_sql_path = Path(tmp_project_dir) / "target/run" / Path(self.project_dir).name / sql_relative_path + package_name = sql_context.get("package_name") or Path(self.project_dir).name + sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(package_name)[-1].lstrip("/") + run_sql_path = Path(tmp_project_dir) / "target/run" / Path(package_name).name / sql_relative_path with run_sql_path.open("r") as sql_file: sql_content: str = sql_file.read() return sql_content diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index a3d9474dcd..728b86283b 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -435,6 +435,7 @@ def test_create_task_metadata_unsupported(caplog): "resource_name": "my_model", "name": "my_model", }, + "package_name": None, }, ), ( @@ -476,6 +477,7 @@ def test_create_task_metadata_unsupported(caplog): "resource_name": "my_snapshot", "name": "my_snapshot", }, + "package_name": None, }, ), ], diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 24b2a8bb83..d3f3715fc8 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1349,7 +1349,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog): "model.some_package.some_model": DbtNode( unique_id="model.some_package.some_model", resource_type=DbtResourceType.MODEL, - file_path=Path("fake-project/models/some_model.sql"), + file_path=Path("some_package/models/some_model.sql"), tags=[], config={ "access": "protected", @@ -1383,6 +1383,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog): "unique_key": None, }, depends_on=["source.some_source"], + package_name="some_package", ), } nodes = parse_dbt_ls_output(Path("fake-project"), dbt_ls_output) @@ -1392,7 +1393,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog): def test_parse_dbt_ls_output(): - fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}' + fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "package_name": "fake-project", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}' expected_nodes = { "fake-unique-id": DbtNode( @@ -1402,6 +1403,7 @@ def test_parse_dbt_ls_output(): tags=[], config={}, depends_on=[], + package_name="fake-project", ), } nodes = parse_dbt_ls_output(Path("fake-project"), fake_ls_stdout) @@ -1410,7 +1412,7 @@ def test_parse_dbt_ls_output(): def test_parse_dbt_ls_output_with_json_without_tags_or_config(): - some_ls_stdout = '{"resource_type": "model", "name": "some-name", "original_file_path": "some-file-path.sql", "unique_id": "some-unique-id", "config": {}}' + some_ls_stdout = '{"resource_type": "model", "name": "some-name", "package_name": "some-project", "original_file_path": "some-file-path.sql", "unique_id": "some-unique-id", "config": {}}' expected_nodes = { "some-unique-id": DbtNode( @@ -1420,6 +1422,7 @@ def test_parse_dbt_ls_output_with_json_without_tags_or_config(): tags=[], config={}, depends_on=[], + package_name="some-project", ), } nodes = parse_dbt_ls_output(Path("some-project"), some_ls_stdout) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 3bcd78616d..8b7f0a4a98 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -5,7 +5,7 @@ import sys import tempfile from pathlib import Path -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, call, mock_open, patch import pytest from airflow import DAG @@ -1464,3 +1464,19 @@ def test_async_execution_teardown_delete_files(mock_unlink, mock_construct_dest_ ) operator._handle_async_execution(project_dir, {}, {"profile_type": "bigquery", "teardown_task": True}) mock_unlink.assert_called() + + +def test_read_run_sql_from_target_dir(): + tmp_project_dir = "/tmp/project" + sql_context = {"dbt_node_config": {"file_path": "/path/to/file.sql"}, "package_name": "package_name"} + + operator = DbtRunLocalOperator( + task_id="test", + project_dir="/tmp", + profile_config=profile_config, + ) + + expected_sql_content = "SELECT * FROM my_table;" + with patch("pathlib.Path.open", new_callable=mock_open, read_data=expected_sql_content): + result = operator._read_run_sql_from_target_dir(tmp_project_dir, sql_context) + assert result == expected_sql_content From 42bb17da71a57fa63943e79b96eaf0f72b075a5d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 07:01:56 +0000 Subject: [PATCH 21/38] Improve BQ async error handling (#1597) Currently, if someone attempts to run `simple_dag_async` without previously installing `apache-airflow-providers-google`, they will face this very ugly error: ``` Traceback (most recent call last): File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv/lib/python3.9/site-packages/airflow/models/dagbag.py", line 383, in parse loader.exec_module(new_module) File "", line 850, in exec_module File "", line 228, in _call_with_frames_removed File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/dags/simple_dag_async.py", line 21, in simple_dag_async = DbtDag( File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/dag.py", line 26, in __init__ DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/converter.py", line 328, in __init__ self.tasks_map = build_airflow_graph( File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 591, in build_airflow_graph task_or_group = conversion_function( # type: ignore File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 379, in generate_task_or_group task = create_airflow_task(task_meta, dag, task_group=model_task_group) File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/core/airflow.py", line 36, in get_airflow_task airflow_task = Operator( File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 506, in apply_defaults result = func(self, **kwargs, default_args=default_args) File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/airflow_async.py", line 75, in __init__ super().__init__( File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 506, in apply_defaults result = func(self, **kwargs, default_args=default_args) File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/_asynchronous/base.py", line 50, in __init__ async_operator_class = self.create_async_operator() File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/_asynchronous/base.py", line 69, in create_async_operator async_class_operator = _create_async_operator_class(profile_type, "DbtRun") File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/_asynchronous/base.py", line 34, in _create_async_operator_class raise ImportError(f"Error in loading class: {class_path}. Unable to find the specified operator class.") from e ImportError: Error in loading class: cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator. Unable to find the specified operator class. ``` The goal with this ticket is to give the same error handling as other parts of Cosmos by raising a more graceful error message. (cherry picked from commit 09bcb555ae634616b7a63c8851d5de5792098fc0) --- cosmos/operators/_asynchronous/bigquery.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 1d28f5da3e..007ea47145 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -5,7 +5,15 @@ from typing import TYPE_CHECKING, Any, Sequence import airflow -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator + +try: + from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +except ImportError: + raise ImportError( + "Could not import BigQueryInsertJobOperator. Ensure you've installed the Google Cloud provider separately or " + "with with `pip install apache-airflow-providers-google`." + ) + from airflow.utils.context import Context from airflow.utils.session import NEW_SESSION, provide_session from packaging.version import Version @@ -158,7 +166,12 @@ def _store_template_fields(self, context: Context, session: Session = NEW_SESSIO self.log.debug("Executed SQL is: %s", sql) self.compiled_sql = sql - profile = self.profile_config.profile_mapping.profile + if self.profile_config.profile_mapping is not None: + profile = self.profile_config.profile_mapping.profile + else: + raise CosmosValueError( + "The `profile_config.profile`_mapping attribute must be defined to use `ExecutionMode.AIRFLOW_ASYNC`" + ) self.gcp_project = profile["project"] self.dataset = profile["dataset"] From dfb92c8eb598559be3215ead6d5c684a8e9d3142 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 07:13:50 +0000 Subject: [PATCH 22/38] Fix path selector when `manifest.json` was created in MS Windows (#1601) Users who generated the `manifest.json` using MS Windows and attempted to use Cosmos path selectors after, such as `path:models/edr/run_results' were unable to do so, because the paths in Windows were different from the selector: ``` "model.elementary.model_run_results": { "database": "FDH_DEV_DB", "schema": "MONITORING", "name": "model_run_results", "resource_type": "model", "package_name": "elementary", "path": "edr\\run_results\\model_run_results.sql", "original_file_path": "models\\edr\\run_results\\model_run_results.sql", "unique_id": "model.elementary.model_run_results", "fqn": [ "elementary", "edr", "run_results", "model_run_results" ], ``` As observed in this example, the property `original_file_path` used the `\\` character as a divider in the path, but the selector checked using the Posix notation. Since Cosmos implements path selectors using: path_selection in str(node.file_path), we have to normalize the input for the filter to work. This issue only happened when using `LoadMode.DBT_MANIFEST` and not `LoadMode.DBT_LS` since dbt normalizes this internally when handling selectors as part of this command line. (cherry picked from commit 9a1c8fe58d2cd53998c6b7101e23863bbfac14f5) --- cosmos/dbt/graph.py | 9 ++++++++- tests/dbt/test_graph.py | 11 +++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index e2faf6578d..3db1a18398 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -46,6 +46,13 @@ logger = get_logger(__name__) +def _normalize_path(path: str) -> str: + """ + Converts a potentially Windows path string into a Posix-friendly path. + """ + return Path(path.replace("\\", "/")).as_posix() + + class CosmosLoadDbtException(Exception): """ Exception raised while trying to load a `dbt` project as a `DbtGraph` instance. @@ -830,7 +837,7 @@ def load_from_dbt_manifest(self) -> None: package_name=node_dict.get("package_name"), resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), + file_path=self.execution_config.project_path / _normalize_path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], has_freshness=( diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index d3f3715fc8..b8c338abda 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -30,6 +30,7 @@ DbtGraph, DbtNode, LoadMode, + _normalize_path, parse_dbt_ls_output, run_command, ) @@ -2026,3 +2027,13 @@ def test_get_dbt_ls_cache_remote_cache_dir( } assert result == expected_result + + +def test__normalize_path(): + """ + This normalizes the path (e.g. declared inside a manifest.json file) when it was created using MS Windows instead + of GNU Linux. + """ + original_value = "seeds\\seed_ifs_util_manual_event_id.csv" + expected_value = "seeds/seed_ifs_util_manual_event_id.csv" + assert _normalize_path(original_value) == expected_value From c2491ad1967fb93415d6293521e5e84e90421020 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 09:21:24 +0000 Subject: [PATCH 23/38] Fix log that prints 'Total filtered nodes' (#1603) The log that prints 'Total filtered nodes' printed the incorrect value (the total nodes instead of the actual filtered nodes). (cherry picked from commit 674f15ccda4e44674dff03c95be95008285bfa1e) --- cosmos/dbt/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 3db1a18398..9e744d5b30 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -543,7 +543,7 @@ def load( self.update_node_dependency() logger.info("Total nodes: %i", len(self.nodes)) - logger.info("Total filtered nodes: %i", len(self.nodes)) + logger.info("Total filtered nodes: %i", len(self.filtered_nodes)) def run_dbt_ls( self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] From 5719b94bd74969341fcbf8c7454117aed88cc1c6 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 12:05:26 +0000 Subject: [PATCH 24/38] Fix select behaviour using `LoadMode.MANIFEST` and a path with star (#1602) Let's say the dbt project has a file_path "gen2/models/parent.sql" ``` parent_node = DbtNode( unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent", resource_type=DbtResourceType.MODEL, depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id], file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", tags=["has_child", "is_child"], config={"materialized": "view", "tags": ["has_child", "is_child"]}, ) ``` When using Cosmos 1.9.0 with `LoadMode.MANIFEST` and trying to use: ``` RenderConfig(select="gen2/models/*") ``` The selector would not return any results. It would still work with `LoadMode.DBT_LS`. The goal of this PR is to solve this issue. (cherry picked from commit 0e1f81be2bc9148871b4834a6c3f1f84e61910f0) --- cosmos/dbt/selector.py | 4 ++-- tests/dbt/test_selector.py | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 0ac91e9f11..e410e21a7c 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -173,7 +173,7 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # Index nodes by name, we can improve performance by doing this once # for multiple GraphSelectors if PATH_SELECTOR in self.node_name: - path_selection = self.node_name[len(PATH_SELECTOR) :] + path_selection = self.node_name[len(PATH_SELECTOR) :].rstrip("*") root_nodes.update({node_id for node_id, node in nodes.items() if path_selection in str(node.file_path)}) elif TAG_SELECTOR in self.node_name: @@ -366,7 +366,7 @@ def _parse_tag_selector(self, item: str) -> None: def _parse_path_selector(self, item: str) -> None: index = len(PATH_SELECTOR) if self.project_dir: - self.paths.append(self.project_dir / Path(item[index:])) + self.paths.append(self.project_dir / Path(item[index:].rstrip("*"))) else: self.paths.append(Path(item[index:])) diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index ef82c5881e..7cef7cc16d 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -284,12 +284,31 @@ def test_select_nodes_with_test_by_intersection_and_tag_ancestry(): def test_select_nodes_by_select_path(): + # Path without star or graph selector selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models"]) expected = { parent_node.unique_id: parent_node, } assert selected == expected + # Path with star + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/*"]) + expected = { + parent_node.unique_id: parent_node, + } + assert selected == expected + + # Path with star and graph selector that retrieves descendants + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/*+"]) + expected = { + child_node.unique_id: child_node, + parent_node.unique_id: parent_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, + sibling3_node.unique_id: sibling3_node, + } + assert selected == expected + def test_select_nodes_with_slash_but_no_path_selector(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["gen2/models"]) From 4bc447bb7a2b1a44b4c1e499922a1c3090d05ff1 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 13 Mar 2025 17:36:30 +0530 Subject: [PATCH 25/38] Run async in CI DAG without setup/teardown task (#1599) Closes: https://github.com/astronomer/astronomer-cosmos/issues/1595 (cherry picked from commit 7ac2f6196ac9088aec1d84b67971beec2faabae9) --- cosmos/operators/_asynchronous/bigquery.py | 6 +-- cosmos/operators/local.py | 9 ++--- .../operators/_asynchronous/test_bigquery.py | 4 +- tests/operators/test_local.py | 4 +- tests/test_example_dags.py | 39 ++++++++++++++----- 5 files changed, 41 insertions(+), 21 deletions(-) diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 007ea47145..60c2ac6c0a 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -23,7 +23,7 @@ from cosmos.dataset import get_dataset_alias_name from cosmos.exceptions import CosmosValueError from cosmos.operators.local import AbstractDbtLocalBase -from cosmos.settings import enable_setup_async_task, remote_target_path, remote_target_path_conn_id +from cosmos.settings import remote_target_path, remote_target_path_conn_id if TYPE_CHECKING: # pragma: no cover from sqlalchemy.orm import Session @@ -142,7 +142,7 @@ def get_remote_sql(self) -> str: return sql # type: ignore def execute(self, context: Context, **kwargs: Any) -> None: - if enable_setup_async_task: + if settings.enable_setup_async_task: self.configuration = { "query": { "query": self.get_remote_sql(), @@ -159,7 +159,7 @@ def _store_template_fields(self, context: Context, session: Session = NEW_SESSIO from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.taskinstance import TaskInstance - if not enable_setup_async_task: + if not settings.enable_setup_async_task: self.log.info("SQL cannot be made available, skipping registration of compiled_sql template field") return sql = self.get_remote_sql().strip() diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 71118cec15..8aa517d2ad 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -37,8 +37,6 @@ from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError, CosmosDbtRunError, CosmosValueError from cosmos.settings import ( - enable_setup_async_task, - enable_teardown_async_task, remote_target_path, remote_target_path_conn_id, ) @@ -409,6 +407,7 @@ def _read_run_sql_from_target_dir(self, tmp_project_dir: str, sql_context: dict[ package_name = sql_context.get("package_name") or Path(self.project_dir).name sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(package_name)[-1].lstrip("/") run_sql_path = Path(tmp_project_dir) / "target/run" / Path(package_name).name / sql_relative_path + with run_sql_path.open("r") as sql_file: sql_content: str = sql_file.read() return sql_content @@ -484,11 +483,11 @@ def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None self.callback(tmp_project_dir, **self.callback_args) def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_context: dict[str, Any]) -> None: - if async_context.get("teardown_task") and enable_teardown_async_task: + if async_context.get("teardown_task") and settings.enable_teardown_async_task: self._delete_sql_files(Path(tmp_project_dir), "run") return - if enable_setup_async_task: + if settings.enable_setup_async_task: self._upload_sql_files(tmp_project_dir, "run") else: sql = self._read_run_sql_from_target_dir(tmp_project_dir, async_context) @@ -532,7 +531,7 @@ def run_command( if self.install_deps: self._install_dependencies(tmp_dir_path, flags, env) - if run_as_async and not enable_setup_async_task: + if run_as_async and not settings.enable_setup_async_task: self._mock_dbt_adapter(async_context) full_cmd = cmd + flags diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index 40717cd62c..783fa53e95 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -52,9 +52,9 @@ def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock): @patch.object(DbtRunAirflowAsyncBigqueryOperator, "build_and_run_cmd") -def test_dbt_run_airflow_async_bigquery_operator_execute(mock_build_and_run_cmd, profile_config_mock, monkeypatch): +@patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) +def test_dbt_run_airflow_async_bigquery_operator_execute(mock_build_and_run_cmd, profile_config_mock): """Test execute calls build_and_run_cmd with correct parameters.""" - monkeypatch.setattr("cosmos.operators._asynchronous.bigquery.enable_setup_async_task", False) operator = DbtRunAirflowAsyncBigqueryOperator( task_id="test_task", project_dir="/path/to/project", diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 8b7f0a4a98..f07b1eb2f2 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1431,10 +1431,10 @@ def test_mock_dbt_adapter_unsupported_profile_type(): @patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.execute") @patch("cosmos.operators.local.AbstractDbtLocalBase._read_run_sql_from_target_dir") -def test_async_execution_without_start_task(mock_read_sql, mock_bq_execute, monkeypatch): +@patch("cosmos.operators.local.settings.enable_setup_async_task", False) +def test_async_execution_without_start_task(mock_read_sql, mock_bq_execute): from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator - monkeypatch.setattr("cosmos.operators.local.enable_setup_async_task", False) mock_read_sql.return_value = "select * from 1;" operator = DbtRunLocalOperator( task_id="test", diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index d0671ec952..5f325a22c5 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -2,6 +2,7 @@ import warnings from pathlib import Path +from unittest.mock import patch try: from functools import cache @@ -97,15 +98,7 @@ def get_dag_ids() -> list[str]: return dag_bag.dag_ids -@pytest.mark.skipif( - AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", -) -@pytest.mark.integration -@pytest.mark.parametrize("dag_id", get_dag_ids()) -def test_example_dag(session, dag_id: str): - if dag_id in KUBERNETES_DAGS: - return +def run_dag(dag_id: str): dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) @@ -133,3 +126,31 @@ def test_example_dag(session, dag_id: str): ) else: test_utils.run_dag(dag) + + +@pytest.mark.skipif( + AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) +@pytest.mark.integration +@pytest.mark.parametrize("dag_id", get_dag_ids()) +def test_example_dag(session, dag_id: str): + if dag_id in KUBERNETES_DAGS: + return + run_dag(dag_id) + + +async_dag_ids = ["simple_dag_async"] + + +@pytest.mark.skipif( + AIRFLOW_VERSION < Version("2.8") or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) +@pytest.mark.integration +@patch("cosmos.operators.local.settings.enable_setup_async_task", False) +@patch("cosmos.operators.local.settings.enable_teardown_async_task", False) +@patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) +def test_async_example_dag_without_setup_task(session): + for dag_id in async_dag_ids: + run_dag(dag_id) From 6f7dabf60e3d7e9d1f408a3a068dc75a3fd077dc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 13:29:57 +0000 Subject: [PATCH 26/38] Add test case that fully covers recent select issue (#1604) Makes sure the fixes: - Fix path selector when manifest.json was created in MS Windows (#1601) - Fix select behaviour using LoadMode.MANIFEST and a path with star (#1602) Work from an end-to-end perspective, solving Astro customer's original issue. (cherry picked from commit 313889217c45d4868bd14abd32c8d75442581e30) --- tests/dbt/test_graph.py | 26 ++++++++++++++++++++++++++ tests/sample/small_manifest.json | 27 +++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 tests/sample/small_manifest.json diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index b8c338abda..d027057947 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -41,6 +41,7 @@ DBT_PROJECT_NAME = "jaffle_shop" ALTERED_DBT_PROJECT_NAME = "altered_jaffle_shop" SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json" +SAMPLE_SMALL_MANIFEST = Path(__file__).parent.parent / "sample/small_manifest.json" SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json" SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json" @@ -263,6 +264,31 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{project_name}/models/{model_filepath}" +def test_load_via_manifest_with_ms_windows_manifest_and_star_selector(): + # This test is based on a real user-case that in 1.9.0 and before would return an empty list of filtered nodes + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR, # this value is not used in DAG rendering when the manifest is given + manifest_path=SAMPLE_SMALL_MANIFEST, + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig(select=["path:models/edr*+"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) + dbt_graph.load_from_dbt_manifest() + + assert len(dbt_graph.nodes) == 1 + assert len(dbt_graph.filtered_nodes) == 1 + + @pytest.mark.parametrize( "project_name,manifest_filepath,model_filepath", [(DBT_PROJECT_NAME, SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], diff --git a/tests/sample/small_manifest.json b/tests/sample/small_manifest.json new file mode 100644 index 0000000000..ee0448bdc6 --- /dev/null +++ b/tests/sample/small_manifest.json @@ -0,0 +1,27 @@ +{ + "nodes": { + "model.elementary.model_run_results": { + "alias": "model_run_results", + "checksum": { + "checksum": "99fba47e42516fbdc31e2546f687e3f780f21eaa5ac303e1ed22d23262ac5ec9", + "name": "sha256" + }, + "config": {}, + "database": "FDH_DEV_DB", + "fqn": [ + "elementary", + "edr", + "run_results", + "model_run_results" + ], + "name": "model_run_results", + "original_file_path": "models\\edr\\run_results\\model_run_results.sql", + "package_name": "elementary", + "path": "edr\\run_results\\model_run_results.sql", + "resource_type": "model", + "schema": "MONITORING", + "tags": [], + "unique_id": "model.elementary.model_run_results" + } + } +} From 68cf910d859e2bf9c17ec83906f97c15c883d5c1 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 13 Mar 2025 19:06:22 +0530 Subject: [PATCH 27/38] Add CI job to test multiple dbt versions for the async DAG (#1535) This PR introduces a new CI job named `Run-Integration-Tests-DBT-Async` to ensure compatibility of the async example DAG with multiple dbt versions. It achieves this by adding a third dimension to the `pyproject.toml` matrix, enabling the CI to run the DAG across a list of dbt versions. Additionally, this PR includes a new test file: `tests/test_async_example_dag.py`. While we already have `tests/test_example_dags.py`, certain dbt versions have shown parsing issues with some example DAGs, which can cause CI failures unrelated to the async DAG. To prevent this, the new file is a modified copy that exclusively tests `simple_async_dag`, with other DAGs ignored via `.airflowignore`. This ensures that the CI job focuses on validating the async DAG without being affected by unrelated parsing errors. closes: #1489 (cherry picked from commit 372d388a958d745494748fddc60642c38fb0e609) --- .github/workflows/test.yml | 128 +++++++++++++++++++------- dev/dags/simple_dag_async.py | 10 +- pyproject.toml | 2 + scripts/test/integration-dbt-async.sh | 29 ++++++ scripts/test/integration.sh | 1 + scripts/test/performance.sh | 1 + scripts/test/unit-cov.sh | 1 + scripts/test/unit.sh | 1 + tests/test_async_example_dag.py | 73 +++++++++++++++ 9 files changed, 209 insertions(+), 37 deletions(-) create mode 100644 scripts/test/integration-dbt-async.sh create mode 100644 tests/test_async_example_dag.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 475090f5cb..d14ae5de72 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,7 @@ jobs: architecture: "x64" - run: pip3 install hatch - - run: hatch run tests.py3.9-2.8:type-check + - run: hatch run tests.py3.9-2.8-1.9:type-check Run-Unit-Tests: runs-on: ubuntu-latest @@ -40,6 +40,7 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] + dbt-version: ["1.9"] exclude: - python-version: "3.11" airflow-version: "2.4" @@ -75,7 +76,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -86,16 +87,16 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-cov - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -106,6 +107,7 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10", "3.11"] airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] + dbt-version: ["1.9"] exclude: - python-version: "3.11" airflow-version: "2.4" @@ -139,7 +141,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}--${{ matrix.dbt-version }}${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -150,12 +152,12 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration env: AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ @@ -182,7 +184,7 @@ jobs: - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -193,6 +195,7 @@ jobs: matrix: python-version: ["3.11"] airflow-version: ["2.6"] + dbt-version: ["1.9"] services: postgres: @@ -216,7 +219,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: integration-expensive-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: integration-expensive-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -227,12 +230,12 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup - DATABRICKS_UNIQUE_ID="${{github.run_id}}" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-expensive + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-setup + DATABRICKS_UNIQUE_ID="${{github.run_id}}" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-expensive env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres @@ -256,7 +259,7 @@ jobs: - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-expensive-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-expensive-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -274,6 +277,7 @@ jobs: matrix: python-version: [ "3.11" ] airflow-version: [ "2.8" ] + dbt-version: [ "1.5" ] services: postgres: image: postgres @@ -296,7 +300,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: integration-dbt-1-5-4-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: integration-dbt-1-5-4-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -307,11 +311,11 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt 1.5.4 run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-dbt-1-5-4 + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbt-1-5-4 env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres @@ -335,10 +339,68 @@ jobs: AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/" AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn + Run-Integration-Tests-DBT-Async: + needs: Authorize + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: [ "3.11" ] + airflow-version: [ "2.10" ] + dbt-version: ["1.5", "1.6", "1.7", "1.8", "1.9"] + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/cache@v4 + with: + path: | + ~/.cache/pip + .local/share/hatch/ + key: integration-dbt-async-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install packages and dependencies + run: | + python -m pip install uv + uv pip install --system hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze + + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} + run: | + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbt-async + env: + AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + AIRFLOW__COSMOS__ENABLE_CACHE: 0 + AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/" + AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn + DBT_ADAPTER_VERSION: ${{ matrix.dbt-version }} + - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-dbt-1-5-4-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-dbt-async-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -354,6 +416,7 @@ jobs: matrix: python-version: ["3.11"] airflow-version: ["2.8"] + dbt-version: ["1.9"] num-models: [1, 10, 50, 100, 500] services: postgres: @@ -376,7 +439,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: perf-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: perf-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -387,13 +450,13 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Run performance tests against against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Run performance tests against against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} id: run-performance-tests run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-performance-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-performance + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-performance-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-performance # read the performance results and set them as an env var for the next step # format: NUM_MODELS={num_models}\nTIME={end - start}\n @@ -423,6 +486,7 @@ jobs: matrix: python-version: [ "3.12" ] airflow-version: [ "2.10" ] + dbt-version: [ "1.9" ] steps: - uses: actions/checkout@v4 with: @@ -432,7 +496,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: coverage-integration-kubernetes-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: coverage-integration-kubernetes-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -446,12 +510,12 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - name: Run kubernetes tests run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-kubernetes-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-kubernetes env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres @@ -477,7 +541,7 @@ jobs: - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-kubernetes-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-kubernetes-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true diff --git a/dev/dags/simple_dag_async.py b/dev/dags/simple_dag_async.py index 23e9836cff..d9afba493d 100644 --- a/dev/dags/simple_dag_async.py +++ b/dev/dags/simple_dag_async.py @@ -3,11 +3,14 @@ from pathlib import Path from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import TestBehavior from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +DBT_ADAPTER_VERSION = os.getenv("DBT_ADAPTER_VERSION", "1.9") + profile_config = ProfileConfig( profile_name="default", target_name="dev", @@ -26,12 +29,9 @@ profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.AIRFLOW_ASYNC, - async_py_requirements=["dbt-bigquery"], - ), - render_config=RenderConfig( - select=["path:models"], - # test_behavior=TestBehavior.NONE + async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"], ), + render_config=RenderConfig(select=["path:models"], test_behavior=TestBehavior.NONE), # normal dag parameters schedule_interval=None, start_date=datetime(2023, 1, 1), diff --git a/pyproject.toml b/pyproject.toml index 122e06da28..24a00c4962 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,6 +162,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] +dbt = ["1.5", "1.6", "1.7", "1.8", "1.9"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ @@ -176,6 +177,7 @@ test-integration = 'sh scripts/test/integration.sh' test-kubernetes = "sh scripts/test/integration-kubernetes.sh" test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh" test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh' +test-integration-dbt-async = 'sh scripts/test/integration-dbt-async.sh {matrix:dbt}' test-integration-expensive = 'sh scripts/test/integration-expensive.sh' test-integration-setup = 'sh scripts/test/integration-setup.sh' test-performance = 'sh scripts/test/performance.sh' diff --git a/scripts/test/integration-dbt-async.sh b/scripts/test/integration-dbt-async.sh new file mode 100644 index 0000000000..b4ff9e9195 --- /dev/null +++ b/scripts/test/integration-dbt-async.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +set -x +set -e + +DBT_VERSION="$1" +echo "DBT_VERSION:" +echo "$DBT_VERSION" + + +pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y +pip install "dbt-postgres==$DBT_VERSION" "dbt-databricks==$DBT_VERSION" "dbt-bigquery==$DBT_VERSION" +export SOURCE_RENDERING_BEHAVIOR=all +rm -rf airflow.*; \ +airflow db init; \ + +if [ "$DBT_VERSION" = "1.7" ]; then + # Otherwise, we will get the following error: + # stderr: MessageToJson() got an unexpected keyword argument 'including_default_value_fields' + echo "DBT version is 1.7 — Installing protobuf==4.25.6..." + pip install protobuf==4.25.6 +fi + +rm -rf dbt/jaffle_shop/dbt_packages; +pytest -vv \ + --cov=cosmos \ + --cov-report=term-missing \ + --cov-report=xml \ + "tests/test_async_example_dag.py::test_example_dag[simple_dag_async]" diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 985c99df57..bf19b4248f 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -19,5 +19,6 @@ pytest -vv \ --cov-report=xml \ -m 'integration' \ --ignore=tests/perf \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_k8s_dags.py \ -k 'not ( example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes)' diff --git a/scripts/test/performance.sh b/scripts/test/performance.sh index 2023026d3b..97941e86b7 100644 --- a/scripts/test/performance.sh +++ b/scripts/test/performance.sh @@ -2,5 +2,6 @@ pytest -vv \ -s \ -m 'perf' \ --ignore=tests/test_example_dags.py \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py diff --git a/scripts/test/unit-cov.sh b/scripts/test/unit-cov.sh index 8d8fe3589a..50cd268c40 100644 --- a/scripts/test/unit-cov.sh +++ b/scripts/test/unit-cov.sh @@ -6,5 +6,6 @@ pytest \ -m "not (integration or perf)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py diff --git a/scripts/test/unit.sh b/scripts/test/unit.sh index b80aab72ea..39069c423a 100644 --- a/scripts/test/unit.sh +++ b/scripts/test/unit.sh @@ -3,5 +3,6 @@ pytest \ -m "not (integration or perf)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py diff --git a/tests/test_async_example_dag.py b/tests/test_async_example_dag.py new file mode 100644 index 0000000000..5d70af6a64 --- /dev/null +++ b/tests/test_async_example_dag.py @@ -0,0 +1,73 @@ +# We already have tests/test_example_dags.py, but it doesn’t run against multiple dbt versions in CI. +# Some dbt versions have shown parsing issues with certain example DAGs — something we may need to address over time. +# With PR #1535, the goal is to test the async example DAG across multiple dbt versions. To prevent the CI job from +# failing early due to unrelated DAG parsing errors, PR #1535 introduces this new test_async_example_dag.py file. +# This file replicates tests/test_example_dags.py but excludes all DAGs except simple_async_dag by adding them to +# .airflowignore. This ensures the CI job focuses solely on testing simple_async_dag over multiple dbt versions +# without being disrupted by other DAG parsing issues. + +from __future__ import annotations + +from pathlib import Path + +try: + from functools import cache +except ImportError: + from functools import lru_cache as cache + + +import airflow +import pytest +from airflow.models.dagbag import DagBag +from airflow.utils.db import create_default_connections +from airflow.utils.session import provide_session +from packaging.version import Version + +EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" +ALL_FILES_TO_IGNORE = [ + f.name for f in EXAMPLE_DAGS_DIR.iterdir() if f.is_file() and f.suffix == ".py" and f.name != "simple_dag_async.py" +] + +AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" +AIRFLOW_VERSION = Version(airflow.__version__) + + +@provide_session +def get_session(session=None): + create_default_connections(session) + return session + + +@pytest.fixture() +def session(): + return get_session() + + +@cache +def get_dag_bag() -> DagBag: + """Create a DagBag by adding the files that are not supported to .airflowignore""" + + with open(AIRFLOW_IGNORE_FILE, "w+") as file: + for dagfile in ALL_FILES_TO_IGNORE: + print(f"Adding {dagfile} to .airflowignore") + file.writelines([f"{dagfile}\n"]) + + print(".airflowignore contents: ") + print(AIRFLOW_IGNORE_FILE.read_text()) + db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) + assert db.dags + assert not db.import_errors + return db + + +def get_dag_ids() -> list[str]: + dag_bag = get_dag_bag() + return dag_bag.dag_ids + + +@pytest.mark.integration +@pytest.mark.parametrize("dag_id", get_dag_ids()) +def test_example_dag(session, dag_id: str): + dag_bag = get_dag_bag() + dag = dag_bag.get_dag(dag_id) + dag.test() From 0eb96eb753dbd823a1f91af3669a853cb07e52dd Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 13 Mar 2025 20:03:32 +0530 Subject: [PATCH 28/38] Update Changelog --- CHANGELOG.rst | 27 ++++++++++++++++++++++++++- cosmos/__init__.py | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index faa4468f66..2ea8e085a0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,12 +1,37 @@ Changelog ========= -1.9.1a1 (2025-02-20) +1.9.1a3 (2025-03-13) -------------------- Bug Fixes * Fix import error in dbt bigquery adapter mock for ``dbt-bigquery<1.8`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1548 +* Fix ``operator_args`` override configuration by @ghjklw in #1558 +* Fix missing ``install_dbt_deps`` in ``ProjectConfig`` ``__init__`` method by @ghjklw in #1556 +* Fix dbt project parsing ``dbt_vars`` behavior passed via ``operator_args`` by @AlexandrKhabarov in #1543 +* Avoid reading the connection during DAG parsing of the async BigQuery operator by @joppevos in #1582 +* Fix: Workaround to incorrectly raised ``gcsfs.retry.HttpError`` (Invalid Credentials, 401) by @tatiana in #1598 +* Fix the async execution mode read sql files for dbt packages by @pankajastro in #1588 +* Improve BQ async error handling by @tatiana in #1597 +* Fix path selector when ``manifest.json`` is created using MS Windows by @tatiana in #1601 +* Fix log that prints 'Total filtered nodes' by @tatiana in #1603 +* Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602 + +Docs + +* Improve MWAA getting-started docs by removing unused imports by @jx2lee in #1562 + +Others + +* Disable ``example_cosmos_dbt_build.py`` DAG in CI by @pankajastro in #1567 +* Upgrade GitHub Actions Ubuntu version by @tatiana in #1561 +* Update GitHub bug issue template by @pankajastro in #1586 +* Enable DAG ``example_cosmos_dbt_build.py`` in CI by @pankajastro in #1573 +* Run async DAG in DAG without setup/teardown task by @pankajastro in #1599 +* Add test case that fully covers recent select issue by @tatiana in #1604 +* Add CI job to test multiple dbt versions for the async DAG by @pankajkoti in #1535 +* Pre-commit updates: #1560, #1583, #1596 1.9.0 (2025-02-19) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 57de6fa53f..03d150be2c 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.1a2" +__version__ = "1.9.1a3" from cosmos.airflow.dag import DbtDag From cabdf2dc2150915a2ab85500cdac680c7d789473 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Fri, 28 Feb 2025 10:13:04 +0100 Subject: [PATCH 29/38] Support `on_warning_callback` with `TestBehavior.BUILD` and `ExecutionMode.LOCAL` (#1571) As of now, when we set TestBehavior.BUILD, we are not leveraging the method on_warning_callback that is available for Test nodes and Source Nodes. I have added the parsing to DbtBuildLocalOperator in order to fix it. I tested it locally and I got positive results Related: https://github.com/astronomer/astronomer-cosmos/issues/1569 (cherry picked from commit ddea39cf483451650244d93ce78a4f32b724e8ca) --- cosmos/airflow/graph.py | 1 + cosmos/operators/local.py | 30 +++++++++++++++++++++++++++++- tests/operators/test_local.py | 12 +++++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 4a21bb68f4..209a6d9614 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -271,6 +271,7 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, "build", include_resource_type=True diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 8aa517d2ad..7559552f30 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -762,8 +762,36 @@ class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator): template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] - def __init__(self, *args: Any, **kwargs: Any) -> None: + def __init__(self, *args: Any, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + self.on_warning_callback = on_warning_callback + self.extract_issues: Callable[..., tuple[list[str], list[str]]] + + def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, context: Context) -> None: + """ + Handles warnings by extracting log issues, creating additional context, and calling the + on_warning_callback with the updated context. + + :param result: The result object from the build and run command. + :param context: The original airflow context in which the build and run command was executed. + """ + if self.invocation_mode == InvocationMode.SUBPROCESS: + self.extract_issues = extract_freshness_warn_msg + elif self.invocation_mode == InvocationMode.DBT_RUNNER: + self.extract_issues = dbt_runner.extract_message_by_status + + test_names, test_results = self.extract_issues(result) + + warning_context = dict(context) + warning_context["test_names"] = test_names + warning_context["test_results"] = test_results + + self.on_warning_callback and self.on_warning_callback(warning_context) + + def execute(self, context: Context, **kwargs: Any) -> None: + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) + if self.on_warning_callback: + self._handle_warnings(result, context) class DbtLSLocalOperator(DbtLSMixin, DbtLocalBaseOperator): diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index f07b1eb2f2..e5a8589ecc 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -664,7 +664,17 @@ def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_proje on_warning_callback=on_warning_callback, invocation_mode=invocation_mode, ) - run_operator >> test_operator + + build_operator = DbtBuildLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="build", + append_env=True, + on_warning_callback=on_warning_callback, + invocation_mode=invocation_mode, + ) + + run_operator >> build_operator >> test_operator run_test_dag(dag) assert on_warning_callback.called From ff824f871eed90944a3c8dd7ba40c2573de344c8 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 13 Mar 2025 20:19:12 +0530 Subject: [PATCH 30/38] Include PR #1571 --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2ea8e085a0..05abccaae2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -17,6 +17,7 @@ Bug Fixes * Fix path selector when ``manifest.json`` is created using MS Windows by @tatiana in #1601 * Fix log that prints 'Total filtered nodes' by @tatiana in #1603 * Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602 +* Support ``on_warning_callback`` with ``TestBehavior.BUILD`` and ``ExecutionMode.LOCAL`` by @corsettigyg in #1571 Docs From da29296a81889a3e8d31c10d262f487c7cf4ddf8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 15:56:58 +0000 Subject: [PATCH 31/38] Improve unit tests speed from 89s to 14s (#1600) A few tests, such as `test_configure_remote_target_path_no_remote_target`, were taking a long time when using `hatch run tests.py3.9-2.9:test-cov`. Time to run this command before these changes: 89.34s Time to run this command after these changes: 14.50s Also fix unittest that was failing locally. (cherry picked from commit d494dcdce3c67d7ce4ed92339951e4c3bdd578b4) --- cosmos/io.py | 7 ++-- tests/dbt/test_graph.py | 2 +- tests/operators/test_local.py | 1 + tests/test_io.py | 69 ++++++++++++++++++----------------- 4 files changed, 41 insertions(+), 38 deletions(-) diff --git a/cosmos/io.py b/cosmos/io.py index 0cce873e59..f065d8bf13 100644 --- a/cosmos/io.py +++ b/cosmos/io.py @@ -8,7 +8,6 @@ from cosmos import settings from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP from cosmos.exceptions import CosmosValueError -from cosmos.settings import remote_target_path, remote_target_path_conn_id def upload_to_aws_s3( @@ -136,14 +135,14 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: """Configure the remote target path if it is provided.""" from airflow.version import version as airflow_version - if not remote_target_path: + if not settings.remote_target_path: return None, None _configured_target_path = None - target_path_str = str(remote_target_path) + target_path_str = str(settings.remote_target_path) - remote_conn_id = remote_target_path_conn_id + remote_conn_id = settings.remote_target_path_conn_id if not remote_conn_id: target_path_schema = urlparse(target_path_str).scheme remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment] diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index d027057947..5d79675646 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1881,7 +1881,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "afbced719302d5b1efdfb191c617e349" + assert hash_dir == "391db5c7e1fb90214d829dd0476059a1" else: assert hash_dir == "0148da6f5f7fd260c9fa55c3b3c45168" diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index e5a8589ecc..bbafd0fda2 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1457,6 +1457,7 @@ def test_async_execution_without_start_task(mock_read_sql, mock_bq_execute): mock_bq_execute.assert_called_once() +@pytest.mark.integration @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("pathlib.Path.rglob") @patch("cosmos.operators.local.AbstractDbtLocalBase._construct_dest_file_path") diff --git a/tests/test_io.py b/tests/test_io.py index 7410f05883..816ead519d 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -66,12 +66,13 @@ def test_upload_artifacts_to_azure_wasb(dummy_kwargs): assert hook_instance.load_file.call_count == 2 +@patch("cosmos.io.settings.remote_target_path", None) +@patch("cosmos.io.settings.remote_target_path_conn_id", None) def test_configure_remote_target_path_no_remote_target(): """Test _configure_remote_target_path when no remote target path is set.""" - with patch("cosmos.settings.remote_target_path", None): - from cosmos.io import _configure_remote_target_path + from cosmos.io import _configure_remote_target_path - assert _configure_remote_target_path() == (None, None) + assert _configure_remote_target_path() == (None, None) def test_construct_dest_file_path(dummy_kwargs): @@ -120,49 +121,51 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.io.remote_target_path") -def test_configure_remote_target_path_no_conn_id(mock_remote_target_path): +@patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") +@patch("cosmos.io.settings.remote_target_path_conn_id", None) +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.io.urlparse") +def test_configure_remote_target_path_no_conn_id(mock_urlparse, mock_object_storage): """Test when no remote_conn_id is provided, but conn_id is resolved from scheme.""" - mock_remote_target_path.return_value = "s3://bucket/path/to/file" + mock_urlparse.return_value.scheme = "s3" mock_storage_path = MagicMock() - with patch("cosmos.io.urlparse") as mock_urlparse: - mock_urlparse.return_value.scheme = "s3" - with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: - mock_object_storage.return_value = mock_storage_path - mock_storage_path.exists.return_value = True + mock_storage_path.exists.return_value = True + mock_object_storage.return_value = mock_storage_path + result = _configure_remote_target_path() - result = _configure_remote_target_path() - assert result == (mock_object_storage.return_value, _default_s3_conn) + assert result == (mock_object_storage.return_value, _default_s3_conn) @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.io.remote_target_path") -def test_configure_remote_target_path_conn_id_is_none(mock_remote_target_path): +@patch("cosmos.io.settings.remote_target_path", "abcd://bucket/path/to/file") +@patch("cosmos.io.settings.remote_target_path_conn_id", None) +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.io.urlparse") +def test_configure_remote_target_path_conn_id_is_none(mock_urlparse, mock_object_storage): """Test when conn_id cannot be resolved and is None.""" - mock_remote_target_path.return_value = "abcd://bucket/path/to/file" mock_storage_path = MagicMock() - with patch("cosmos.io.urlparse") as mock_urlparse: - mock_urlparse.return_value.scheme = "abcd" - with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: - mock_object_storage.return_value = mock_storage_path - mock_storage_path.exists.return_value = True - result = _configure_remote_target_path() - assert result == (None, None) + mock_urlparse.return_value.scheme = "abcd" + mock_storage_path.exists.return_value = True + + result = _configure_remote_target_path() + assert result == (None, None) @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", False) -@patch("cosmos.io.remote_target_path") -def test_configure_remote_target_path_airflow_io_unavailable(mock_remote_target_path): +@patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.io.urlparse") +def test_configure_remote_target_path_airflow_io_unavailable(mock_urlparse, mock_object_storage): """Test when AIRFLOW_IO_AVAILABLE is False.""" - mock_remote_target_path.return_value = "s3://bucket/path/to/file" + mock_urlparse.return_value.scheme = "s3" + mock_storage_path = MagicMock() - with patch("cosmos.io.urlparse") as mock_urlparse: - mock_urlparse.return_value.scheme = "s3" - with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: - mock_object_storage.return_value = mock_storage_path - mock_storage_path.exists.return_value = True - with pytest.raises(CosmosValueError) as exc_info: - _configure_remote_target_path() + mock_storage_path.exists.return_value = True + mock_object_storage.return_value = mock_storage_path + + with pytest.raises(CosmosValueError) as exc_info: + _configure_remote_target_path() + assert "Object Storage feature is unavailable" in str(exc_info.value) From c78b37d109b128acabe507adb81cb6b98c93eed8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 16:45:26 +0000 Subject: [PATCH 32/38] Fix `DbtRunLocalOperator.partial()` support (#1609) Since Cosmos 1.9.0, users who attempted to use: ``` DbtRunLocalOperator.partial(task_id="foo", project_dir="foo") ``` Started facing the issue: ``` File /usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py:284, in partial(operator_class, task_id, dag, task_group, start_date, end_date, owner, email, params, resources, trigger_rule, depends_on_past, ignore_first_depends_on_past, wait_for_past_depends_before_skipping, wait_for_downstream, retries, queue, pool, pool_slots, execution_timeout, max_retry_delay, retry_delay, retry_exponential_backoff, priority_weight, weight_rule, sla, map_index_template, max_active_tis_per_dag, max_active_tis_per_dagrun, on_execute_callback, on_failure_callback, on_success_callback, on_retry_callback, on_skipped_callback, run_as_user, executor, executor_config, inlets, outlets, doc, doc_md, doc_json, doc_yaml, doc_rst, task_display_name, logger_name, allow_nested_operators, **kwargs) 281 from airflow.models.dag import DagContext 282 from airflow.utils.task_group import TaskGroupContext --> 284 validate_mapping_kwargs(operator_class, "partial", kwargs) 286 dag = dag or DagContext.get_current_dag() 287 if dag: File /usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py:123, in validate_mapping_kwargs(op, func, value) 121 names = ", ".join(repr(n) for n in unknown_args) 122 error = f"unexpected keyword arguments {names}" --> 123 raise TypeError(f"{op.name}.{func}() got {error}") TypeError: DbtRunLocalOperator.partial() got an unexpected keyword argument 'project_dir'` ``` This was introduced given the changes in how Cosmos operators subclass, that was introduced to allow to dynamically chose which Airflow operator is run during DAG rendering time. Closes: #1546 To validate it, we introduced a new small dbt project and an example DAG, and it can be tested by running: ``` airflow dags test example_task_mapping ``` Co-authored-by: Ash Berlin-Taylor (cherry picked from commit c8c148b14eed74c415b7eb67045c336ffc906e5d) --- cosmos/operators/base.py | 21 +++++++++++ dev/dags/dbt/simple/dbt_project.yml | 8 ++++ dev/dags/dbt/simple/models/example_model.sql | 1 + dev/dags/dbt/simple/profiles.yml | 12 ++++++ dev/dags/example_task_mapping.py | 39 ++++++++++++++++++++ 5 files changed, 81 insertions(+) create mode 100644 dev/dags/dbt/simple/dbt_project.yml create mode 100644 dev/dags/dbt/simple/models/example_model.sql create mode 100644 dev/dags/dbt/simple/profiles.yml create mode 100644 dev/dags/example_task_mapping.py diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 18019ab92b..552172f746 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import logging import os from abc import ABCMeta, abstractmethod @@ -142,6 +143,26 @@ def __init__( self.extra_context = extra_context or {} kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + __init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + + def __init_subclass__(cls) -> None: + super().__init_subclass__() + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Since this class is subclassed by all Cosmos operators, to do this here allows to avoid to have this + # logic explicitly in all subclasses + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + cls.__init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(cls.__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: """ Builds the set of environment variables to be exposed for the bash command. diff --git a/dev/dags/dbt/simple/dbt_project.yml b/dev/dags/dbt/simple/dbt_project.yml new file mode 100644 index 0000000000..4fa380408b --- /dev/null +++ b/dev/dags/dbt/simple/dbt_project.yml @@ -0,0 +1,8 @@ +name: 'my_dbt_project' +version: '1.0.0' +profile: 'default' + +models: + my_dbt_project: + example: + materialized: table diff --git a/dev/dags/dbt/simple/models/example_model.sql b/dev/dags/dbt/simple/models/example_model.sql new file mode 100644 index 0000000000..583e918924 --- /dev/null +++ b/dev/dags/dbt/simple/models/example_model.sql @@ -0,0 +1 @@ +SELECT 1 AS id, 'example' AS name diff --git a/dev/dags/dbt/simple/profiles.yml b/dev/dags/dbt/simple/profiles.yml new file mode 100644 index 0000000000..224f565f4a --- /dev/null +++ b/dev/dags/dbt/simple/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/dev/dags/example_task_mapping.py b/dev/dags/example_task_mapping.py new file mode 100644 index 0000000000..15da345e7b --- /dev/null +++ b/dev/dags/example_task_mapping.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import DAG + +from cosmos.config import ProfileConfig +from cosmos.operators.local import DbtRunLocalOperator +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +# Define the DAG +with DAG( + dag_id="example_task_mapping", + start_date=datetime(2024, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + + dbt_partial = DbtRunLocalOperator.partial( + task_id="dbt_run", project_dir=DBT_ROOT_PATH / "simple", profile_config=profile_config, emit_datasets=False + ) + + dbt_run = dbt_partial.expand(select=["example_model"]) # Only run the specific model + + dbt_run From 41875a788ccb2d86243877cca3c12939d9ed0722 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 13 Mar 2025 22:18:28 +0530 Subject: [PATCH 33/38] Include PR #1600, PR #1609 --- CHANGELOG.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 05abccaae2..df9da4c34d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -18,6 +18,7 @@ Bug Fixes * Fix log that prints 'Total filtered nodes' by @tatiana in #1603 * Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602 * Support ``on_warning_callback`` with ``TestBehavior.BUILD`` and ``ExecutionMode.LOCAL`` by @corsettigyg in #1571 +* Fix ``DbtRunLocalOperator.partial()`` support by @tatiana in #1609 Docs @@ -32,6 +33,7 @@ Others * Run async DAG in DAG without setup/teardown task by @pankajastro in #1599 * Add test case that fully covers recent select issue by @tatiana in #1604 * Add CI job to test multiple dbt versions for the async DAG by @pankajkoti in #1535 +* Improve unit tests speed from 89s to 14s by @tatiana in #1600 * Pre-commit updates: #1560, #1583, #1596 From b164c39f286b3f9d1aed4ec47a824d70f1efe94b Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Thu, 13 Mar 2025 17:53:04 +0100 Subject: [PATCH 34/38] fix: container_name is null for ecs integration (#1592) ## Description ### TL/DR * pas `container_name` to kwargs * use a default value for **aws_conn_id** ### Long version The current implementation of ECS integration implies passing the `container_name` as part of the operator_args. e.g. ``` operator_args = { "container_name": "main", ... } ``` Anyhow, this lead to errors like this: ``` [2025-03-07, 16:40:34 UTC] {ecs.py:515} INFO - EcsOperator overrides: {'containerOverrides': [{'name': None, 'command': ['dbt', '--no-partial-parse', 'run', '--models', 'my_first_dbt_model'], 'environment': [{'name': 'AIRFLOW_CTX_DAG_OWNER', 'value': '***'}, {'name': 'AIRFLOW_CTX_DAG_ID', 'value': 'example_cosmos'}, {'name': 'AIRFLOW_CTX_TASK_ID', 'value': 'dbt_task_group.my_first_dbt_model.run'}, {'name': 'AIRFLOW_CTX_EXECUTION_DATE', 'value': '2025-03-07T16:40:31.716155+00:00'}, {'name': 'AIRFLOW_CTX_TRY_NUMBER', 'value': '1'}, {'name': 'AIRFLOW_CTX_DAG_RUN_ID', 'value': 'manual__2025-03-07T16:40:31.716155+00:00'}, {'name': 'EXTRA_VAR', 'value': 'extra_value'}]}]} ``` The container name is None, leading to a failure in how boto3 invokes the container. The issue was due to the fact that `container_name` was not passed to the kwargs, therefore, the container_name was not set properly to the value that was set to cosmos. ### Full logs
2025-03-10, 12:49:41 UTC] {ecs.py:512} INFO - Running ECS Task - Task
definition: dbt - on cluster dbt
[2025-03-10, 12:49:41 UTC] {ecs.py:515} INFO - EcsOperator overrides:
{'containerOverrides': [{'name': None, 'command': ['dbt',
'--no-partial-parse', 'run', '--models', 'my_first_dbt_model'],
'environment': [{'name': 'AIRFLOW_CTX_DAG_OWNER', 'value': '***'},
{'name': 'AIRFLOW_CTX_DAG_ID', 'value': 'example_cosmos'}, {'name':
'AIRFLOW_CTX_TASK_ID', 'value':
'dbt_task_group.my_first_dbt_model.run'}, {'name':
'AIRFLOW_CTX_EXECUTION_DATE', 'value':
'2025-03-10T12:49:28.878404+00:00'}, {'name': 'AIRFLOW_CTX_TRY_NUMBER',
'value': '1'}, {'name': 'AIRFLOW_CTX_DAG_RUN_ID', 'value':
'manual__2025-03-10T12:49:28.878404+00:00'}, {'name': 'EXTRA_VAR',
'value': 'extra_value'}]}]}
[2025-03-10, 12:49:41 UTC] {base.py:84} INFO - Retrieving connection
'aws_default'
[2025-03-10, 12:49:44 UTC] {credentials.py:1147} INFO - Found
credentials in environment variables.
[2025-03-10, 12:49:44 UTC] {taskinstance.py:3313} ERROR - Task failed
with exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 768, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 734, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cosmos/operators/base.py",
line 278, in execute
self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
File
"/home/airflow/.local/lib/python3.12/site-packages/cosmos/operators/aws_ecs.py",
line 98, in build_and_run_cmd
    result = EcsRunTaskOperator.execute(self, context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 424, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/operators/ecs.py",
line 526, in execute
    self._start_task()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/operators/ecs.py",
line 626, in _start_task
    response = self.client.run_task(**run_opts)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/botocore/client.py",
line 569, in _api_call
    return self._make_api_call(operation_name, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/botocore/client.py",
line 980, in _make_api_call
    request_dict = self._convert_to_request_dict(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/botocore/client.py",
line 1047, in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/botocore/validate.py",
line 381, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter overrides.containerOverrides[0].name, value:
None, type: , valid types: 
## Related Issue(s) I didn't created any issue - but I just thought to propose a fix. ## Breaking Change? It does because user have to use `dbt_container_name` with ECS, but it's currently broken. ## Checklist - [ ] I have made corresponding changes to the documentation (if required) - [ ] I have added tests that prove my fix is effective or that my feature works (cherry picked from commit 483ca7cf61d4a2b3b3d186d910105442ea0f1f49) --- cosmos/operators/aws_ecs.py | 7 +++++-- tests/operators/test_aws_ecs.py | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 2decbcc6c4..98061269ff 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -21,6 +21,8 @@ logger = get_logger(__name__) +DEFAULT_CONN_ID = "aws_default" +DEFAULT_CONTAINER_NAME = "dbt" DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} try: @@ -47,10 +49,10 @@ class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignor def __init__( self, # arguments required by EcsRunTaskOperator - aws_conn_id: str, cluster: str, task_definition: str, - container_name: str, + container_name: str = DEFAULT_CONTAINER_NAME, + aws_conn_id: str = DEFAULT_CONN_ID, # profile_config: ProfileConfig | None = None, command: list[str] | None = None, @@ -66,6 +68,7 @@ def __init__( "aws_conn_id": aws_conn_id, "task_definition": task_definition, "cluster": cluster, + "container_name": container_name, "overrides": None, } ) diff --git a/tests/operators/test_aws_ecs.py b/tests/operators/test_aws_ecs.py index 230c1616a2..af9ed226d4 100644 --- a/tests/operators/test_aws_ecs.py +++ b/tests/operators/test_aws_ecs.py @@ -185,6 +185,7 @@ def test_dbt_aes_ecs_overrides_parameter(): assert "containerOverrides" in actual_overrides actual_container_overrides = actual_overrides["containerOverrides"][0] + assert actual_container_overrides["name"] == "my-dbt-container-name" assert isinstance(actual_container_overrides["command"], list), "`command` should be of type list" assert "environment" in actual_container_overrides From a7d71cc133c82ebc175c2eb4dc54623bf249e534 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 13 Mar 2025 22:34:42 +0530 Subject: [PATCH 35/38] Include PR #1592 --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index df9da4c34d..e7748816de 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -19,6 +19,7 @@ Bug Fixes * Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602 * Support ``on_warning_callback`` with ``TestBehavior.BUILD`` and ``ExecutionMode.LOCAL`` by @corsettigyg in #1571 * Fix ``DbtRunLocalOperator.partial()`` support by @tatiana in #1609 +* fix: ``container_name`` is null for ecs integration by @nicor88 in #1592 Docs From 343643b1e4bd208464f68081b03f6151075a0170 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 13 Mar 2025 22:47:20 +0530 Subject: [PATCH 36/38] Release 1.9.1a4 --- CHANGELOG.rst | 2 +- cosmos/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e7748816de..1f7c3216a7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.9.1a3 (2025-03-13) +1.9.1a4 (2025-03-13) -------------------- Bug Fixes diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 03d150be2c..5f97dc81aa 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.1a3" +__version__ = "1.9.1a4" from cosmos.airflow.dag import DbtDag From 75aca269832aeb1a8809601c36d43ffcd411837c Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 19:40:31 +0000 Subject: [PATCH 37/38] Release 1.9.1 --- CHANGELOG.rst | 2 +- cosmos/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1f7c3216a7..7c2430d4ca 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.9.1a4 (2025-03-13) +1.9.1 (2025-03-13) -------------------- Bug Fixes diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 5f97dc81aa..df6d93258d 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.1a4" +__version__ = "1.9.1" from cosmos.airflow.dag import DbtDag From 06bdcec24dc9507decce851956396bfcce88d1ab Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 19:44:43 +0000 Subject: [PATCH 38/38] Update CHANGELOG.rst --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7c2430d4ca..7e18695ce8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -18,7 +18,7 @@ Bug Fixes * Fix log that prints 'Total filtered nodes' by @tatiana in #1603 * Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602 * Support ``on_warning_callback`` with ``TestBehavior.BUILD`` and ``ExecutionMode.LOCAL`` by @corsettigyg in #1571 -* Fix ``DbtRunLocalOperator.partial()`` support by @tatiana in #1609 +* Fix ``DbtRunLocalOperator.partial()`` support by @tatiana @ashb in #1609 * fix: ``container_name`` is null for ecs integration by @nicor88 in #1592 Docs