Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import inspect
import logging
import os
from abc import ABCMeta, abstractmethod
Expand Down Expand Up @@ -142,6 +143,26 @@ def __init__(
self.extra_context = extra_context or {}
kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes

# The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes
# Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546
__init__._BaseOperatorMeta__param_names = { # type: ignore
name
for (name, param) in inspect.signature(__init__).parameters.items()
if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)
}

def __init_subclass__(cls) -> None:
super().__init_subclass__()
# The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes
# Since this class is subclassed by all Cosmos operators, to do this here allows to avoid to have this
# logic explicitly in all subclasses
# Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546
cls.__init__._BaseOperatorMeta__param_names = { # type: ignore
name
for (name, param) in inspect.signature(cls.__init__).parameters.items()
if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)
}

def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]:
"""
Builds the set of environment variables to be exposed for the bash command.
Expand Down
8 changes: 8 additions & 0 deletions dev/dags/dbt/simple/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: 'my_dbt_project'
version: '1.0.0'
profile: 'default'

models:
my_dbt_project:
example:
materialized: table
1 change: 1 addition & 0 deletions dev/dags/dbt/simple/models/example_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT 1 AS id, 'example' AS name
12 changes: 12 additions & 0 deletions dev/dags/dbt/simple/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
default:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: "{{ env_var('POSTGRES_PORT') | int }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
threads: 4
39 changes: 39 additions & 0 deletions dev/dags/example_task_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from datetime import datetime
from pathlib import Path

from airflow import DAG

from cosmos.config import ProfileConfig
from cosmos.operators.local import DbtRunLocalOperator
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))


profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)

# Define the DAG
with DAG(
dag_id="example_task_mapping",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:

dbt_partial = DbtRunLocalOperator.partial(
task_id="dbt_run", project_dir=DBT_ROOT_PATH / "simple", profile_config=profile_config, emit_datasets=False
)

dbt_run = dbt_partial.expand(select=["example_model"]) # Only run the specific model

dbt_run