From 2b4c097222049ebab68b51c741e3aab3e698942b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 06:08:56 +0000 Subject: [PATCH 1/5] Mark integration tests as such There were a few tests taking a long time to run when using `hatch run tests.py3.9-2.9:test-cov`. This PR marks them as integration tests, since they try to connect to Object Storages --- tests/test_io.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_io.py b/tests/test_io.py index 816ead519d..cec7c88145 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -120,6 +120,7 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): assert mock_copy.call_count == 2 +@pytest.mark.integration @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) @@ -136,6 +137,7 @@ def test_configure_remote_target_path_no_conn_id(mock_urlparse, mock_object_stor assert result == (mock_object_storage.return_value, _default_s3_conn) +@pytest.mark.integration @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "abcd://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) From 0ffeb704e02f0a892b2945452d67ac432fbe7b16 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 11:59:50 +0000 Subject: [PATCH 2/5] Make tests be unittests --- tests/test_io.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_io.py b/tests/test_io.py index cec7c88145..816ead519d 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -120,7 +120,6 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): assert mock_copy.call_count == 2 -@pytest.mark.integration @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) @@ -137,7 +136,6 @@ def test_configure_remote_target_path_no_conn_id(mock_urlparse, mock_object_stor assert result == (mock_object_storage.return_value, _default_s3_conn) -@pytest.mark.integration @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.io.settings.remote_target_path", "abcd://bucket/path/to/file") @patch("cosmos.io.settings.remote_target_path_conn_id", None) From 5d082782e23b3e2a5ecfbbd25a20e067877d334f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 14:08:12 +0000 Subject: [PATCH 3/5] Add test case --- dev/dags/dbt/simple/dbt_project.yml | 8 ++++ dev/dags/dbt/simple/models/example_model.sql | 1 + dev/dags/dbt/simple/profiles.yml | 12 ++++++ dev/dags/example_task_mapping.py | 39 ++++++++++++++++++++ 4 files changed, 60 insertions(+) create mode 100644 dev/dags/dbt/simple/dbt_project.yml create mode 100644 dev/dags/dbt/simple/models/example_model.sql create mode 100644 dev/dags/dbt/simple/profiles.yml create mode 100644 dev/dags/example_task_mapping.py diff --git a/dev/dags/dbt/simple/dbt_project.yml b/dev/dags/dbt/simple/dbt_project.yml new file mode 100644 index 0000000000..4fa380408b --- /dev/null +++ b/dev/dags/dbt/simple/dbt_project.yml @@ -0,0 +1,8 @@ +name: 'my_dbt_project' +version: '1.0.0' +profile: 'default' + +models: + my_dbt_project: + example: + materialized: table diff --git a/dev/dags/dbt/simple/models/example_model.sql b/dev/dags/dbt/simple/models/example_model.sql new file mode 100644 index 0000000000..583e918924 --- /dev/null +++ b/dev/dags/dbt/simple/models/example_model.sql @@ -0,0 +1 @@ +SELECT 1 AS id, 'example' AS name diff --git a/dev/dags/dbt/simple/profiles.yml b/dev/dags/dbt/simple/profiles.yml new file mode 100644 index 0000000000..224f565f4a --- /dev/null +++ b/dev/dags/dbt/simple/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/dev/dags/example_task_mapping.py b/dev/dags/example_task_mapping.py new file mode 100644 index 0000000000..15da345e7b --- /dev/null +++ b/dev/dags/example_task_mapping.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import DAG + +from cosmos.config import ProfileConfig +from cosmos.operators.local import DbtRunLocalOperator +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +# Define the DAG +with DAG( + dag_id="example_task_mapping", + start_date=datetime(2024, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + + dbt_partial = DbtRunLocalOperator.partial( + task_id="dbt_run", project_dir=DBT_ROOT_PATH / "simple", profile_config=profile_config, emit_datasets=False + ) + + dbt_run = dbt_partial.expand(select=["example_model"]) # Only run the specific model + + dbt_run From 20712784dedaa4e2fab7b5b6f351406884020aa3 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 15:56:22 +0000 Subject: [PATCH 4/5] Fix so that previous example works --- cosmos/operators/base.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 18019ab92b..3e3efd3778 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import logging import os from abc import ABCMeta, abstractmethod @@ -142,6 +143,26 @@ def __init__( self.extra_context = extra_context or {} kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + __init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + + def __init_subclass__(cls, **kwargs): # type: ignore + super().__init_subclass__(**kwargs) + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Since this class is subclassed by all Cosmos operators, to do this here allows to avoid to have this + # logic explicitly in all subclasses + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + cls.__init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(cls.__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: """ Builds the set of environment variables to be exposed for the bash command. From c2e31ad87276dc28ab1fd9b9064f0e1dd615956e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 13 Mar 2025 16:08:12 +0000 Subject: [PATCH 5/5] Try to fix type issues --- cosmos/operators/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 3e3efd3778..552172f746 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -151,8 +151,8 @@ def __init__( if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) } - def __init_subclass__(cls, **kwargs): # type: ignore - super().__init_subclass__(**kwargs) + def __init_subclass__(cls) -> None: + super().__init_subclass__() # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes # Since this class is subclassed by all Cosmos operators, to do this here allows to avoid to have this # logic explicitly in all subclasses