diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 18019ab92b..552172f746 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import logging import os from abc import ABCMeta, abstractmethod @@ -142,6 +143,26 @@ def __init__( self.extra_context = extra_context or {} kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + __init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + + def __init_subclass__(cls) -> None: + super().__init_subclass__() + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Since this class is subclassed by all Cosmos operators, to do this here allows to avoid to have this + # logic explicitly in all subclasses + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + cls.__init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(cls.__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: """ Builds the set of environment variables to be exposed for the bash command. diff --git a/dev/dags/dbt/simple/dbt_project.yml b/dev/dags/dbt/simple/dbt_project.yml new file mode 100644 index 0000000000..4fa380408b --- /dev/null +++ b/dev/dags/dbt/simple/dbt_project.yml @@ -0,0 +1,8 @@ +name: 'my_dbt_project' +version: '1.0.0' +profile: 'default' + +models: + my_dbt_project: + example: + materialized: table diff --git a/dev/dags/dbt/simple/models/example_model.sql b/dev/dags/dbt/simple/models/example_model.sql new file mode 100644 index 0000000000..583e918924 --- /dev/null +++ b/dev/dags/dbt/simple/models/example_model.sql @@ -0,0 +1 @@ +SELECT 1 AS id, 'example' AS name diff --git a/dev/dags/dbt/simple/profiles.yml b/dev/dags/dbt/simple/profiles.yml new file mode 100644 index 0000000000..224f565f4a --- /dev/null +++ b/dev/dags/dbt/simple/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/dev/dags/example_task_mapping.py b/dev/dags/example_task_mapping.py new file mode 100644 index 0000000000..15da345e7b --- /dev/null +++ b/dev/dags/example_task_mapping.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import DAG + +from cosmos.config import ProfileConfig +from cosmos.operators.local import DbtRunLocalOperator +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +# Define the DAG +with DAG( + dag_id="example_task_mapping", + start_date=datetime(2024, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + + dbt_partial = DbtRunLocalOperator.partial( + task_id="dbt_run", project_dir=DBT_ROOT_PATH / "simple", profile_config=profile_config, emit_datasets=False + ) + + dbt_run = dbt_partial.expand(select=["example_model"]) # Only run the specific model + + dbt_run