Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
2d0f303
slight change
johnhoran Mar 27, 2025
6e92d2b
kubernetes arg fix
johnhoran Mar 28, 2025
9b5de3b
revert test
johnhoran Mar 28, 2025
2ef45f3
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 28, 2025
906f34d
try catch instead
johnhoran Mar 31, 2025
8d68b0f
refactor
johnhoran Mar 31, 2025
acbac69
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 31, 2025
76da25c
Pass all args not consumed by AbstractDbtBase
johnhoran Apr 4, 2025
80536a1
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 4, 2025
694b70d
include default args
johnhoran Apr 4, 2025
7474174
pass remaining args
johnhoran Apr 4, 2025
3c1211d
back to original
johnhoran Apr 4, 2025
197765e
pass everything except consumed args
johnhoran Apr 7, 2025
8ee3a93
arg not in signiture
johnhoran Apr 7, 2025
250e37f
add to signiture
johnhoran Apr 7, 2025
f015148
completeness
johnhoran Apr 7, 2025
34669ff
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
52f4d3f
include all
johnhoran Apr 7, 2025
f16ab9b
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
d118044
fix sig arg
johnhoran Apr 7, 2025
9abaff6
remove space
johnhoran Apr 7, 2025
ccd685d
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
f6308a4
handle older provider versions
johnhoran Apr 7, 2025
7e5636a
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
7db0157
handle older version
johnhoran Apr 7, 2025
a826ee0
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
ae66765
copypasta errors
johnhoran Apr 7, 2025
522a824
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
f489007
forgot commas
johnhoran Apr 7, 2025
10c93c5
don't pass dbt_kwargs to runlocal
johnhoran Apr 8, 2025
b50e4c7
don't pass args not used downstream
johnhoran Apr 8, 2025
6a3c7b6
Merge branch 'main' into metaclass
johnhoran Apr 8, 2025
2a1f87f
Walk the mro
johnhoran Apr 9, 2025
85dc8b5
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 9, 2025
ef90377
sepecifically __init__ method
johnhoran Apr 9, 2025
d17225d
forgot gcp
johnhoran Apr 9, 2025
65fa469
type hints
johnhoran Apr 9, 2025
78b7908
Merge branch 'main' into metaclass
johnhoran Apr 17, 2025
a68be14
try imports
johnhoran Apr 22, 2025
ea197d0
revert argument convention
johnhoran Apr 23, 2025
c03869b
revert changes that aren't needed any longer
johnhoran Apr 23, 2025
623e857
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 23, 2025
75c0d43
revert change
johnhoran Apr 23, 2025
530ffc1
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 23, 2025
995e24d
Merge branch 'main' into metaclass
johnhoran Apr 23, 2025
b5b6a44
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 23, 2025
09fdce7
Merge branch 'main' into metaclass
johnhoran Apr 23, 2025
584f5d9
simplify get
johnhoran Apr 23, 2025
46d990e
add some tests
johnhoran Apr 23, 2025
1ecc0d2
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 23, 2025
ce30298
revert to nested
johnhoran Apr 23, 2025
e4e7b48
unused var lint
johnhoran Apr 23, 2025
31a42fd
cleanup test
johnhoran Apr 23, 2025
c7cf3db
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 23, 2025
3f54ee3
fix test
johnhoran Apr 24, 2025
035d577
add back start date
johnhoran Apr 24, 2025
3a2b0d5
try tests again
johnhoran Apr 25, 2025
5387d28
Merge branch 'main' into metaclass
pankajkoti Apr 29, 2025
4048f53
Merge branch 'main' into metaclass
tatiana Apr 29, 2025
248be16
Merge branch 'main' into metaclass
tatiana Apr 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cosmos/operators/_asynchronous/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

from typing import Any

try: # Airflow 3
from airflow.sdk.bases.operator import BaseOperator
except ImportError: # Airflow 2
from airflow.models import BaseOperator
from airflow.utils.context import Context

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2


class DbtRunAirflowAsyncDatabricksOperator(BaseOperator): # type: ignore[misc]
def __init__(self, *args: Any, **kwargs: Any):
Expand Down
38 changes: 30 additions & 8 deletions cosmos/operators/aws_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
DEFAULT_CONTAINER_NAME = "dbt"
DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

try:
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
except ImportError: # pragma: no cover
Expand Down Expand Up @@ -76,21 +81,38 @@ def __init__(
"overrides": None,
}
)
super().__init__(**kwargs)

# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
base_operator_args = set(inspect.signature(EcsRunTaskOperator.__init__).parameters.keys())
default_args = kwargs.get("default_args", {})
operator_kwargs = {}

operator_args: set[str] = set()
for clazz in EcsRunTaskOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
base_kwargs["aws_conn_id"] = aws_conn_id
EcsRunTaskOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
EcsRunTaskOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
35 changes: 28 additions & 7 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import inspect
from typing import TYPE_CHECKING, Any, Callable, Sequence

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2
if TYPE_CHECKING: # pragma: no cover
try:
from airflow.sdk.definitions.context import Context
Expand Down Expand Up @@ -68,20 +72,37 @@ def __init__(
"registry_conn_id": registry_conn_id,
}
)
super().__init__(**kwargs)
# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
base_operator_args = set(inspect.signature(AzureContainerInstancesOperator.__init__).parameters.keys())

default_args = kwargs.get("default_args", {})
operator_kwargs = {}
operator_args: set[str] = set()
for clazz in AzureContainerInstancesOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
AzureContainerInstancesOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
AzureContainerInstancesOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
35 changes: 28 additions & 7 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import inspect
from typing import TYPE_CHECKING, Any, Callable, Sequence

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2
if TYPE_CHECKING: # pragma: no cover
try:
from airflow.sdk.definitions.context import Context
Expand Down Expand Up @@ -58,21 +62,38 @@ def __init__(
"Airflow connections are not available in the Docker container for the mapping to work."
)

super().__init__(image=image, **kwargs)
# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
kwargs["image"] = image
base_operator_args = set(inspect.signature(DockerOperator.__init__).parameters.keys())

default_args = kwargs.get("default_args", {})
operator_kwargs = {}
operator_args: set[str] = set()
for clazz in DockerOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
DockerOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
DockerOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
38 changes: 29 additions & 9 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

try:
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator

Expand Down Expand Up @@ -72,7 +77,6 @@ def __init__(
self.profile_config = profile_config
self.command = command
self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES
super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs)
# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
Expand All @@ -84,17 +88,33 @@ def __init__(
"project_id": project_id,
"region": region,
"job_name": job_name,
"command": command,
"environment_variables": environment_variables,
Comment thread
pankajkoti marked this conversation as resolved.
}
)
base_operator_args = set(inspect.signature(CloudRunExecuteJobOperator.__init__).parameters.keys())

default_args = kwargs.get("default_args", {})
operator_kwargs = {}
operator_args: set[str] = set()
for clazz in CloudRunExecuteJobOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
CloudRunExecuteJobOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
CloudRunExecuteJobOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
37 changes: 30 additions & 7 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
DbtTestMixin,
)

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"

Expand Down Expand Up @@ -64,20 +69,38 @@ class DbtKubernetesBaseOperator(AbstractDbtBase, KubernetesPodOperator): # type

def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None:
self.profile_config = profile_config
super().__init__(**kwargs)

# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
base_operator_args = set(inspect.signature(KubernetesPodOperator.__init__).parameters.keys())
default_args = kwargs.get("default_args", {})
operator_kwargs = {}
operator_args: set[str] = set()
for clazz in KubernetesPodOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
KubernetesPodOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass

AbstractDbtBase.__init__(self, **base_kwargs)
KubernetesPodOperator.__init__(self, **operator_kwargs)

def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None:
env_vars_dict: dict[str, str] = dict()
Expand Down
56 changes: 34 additions & 22 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
import jinja2
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException

try: # Airflow 3
from airflow.sdk.bases.operator import BaseOperator
except ImportError: # Airflow 2
from airflow.models import BaseOperator
from airflow.models.taskinstance import TaskInstance

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -49,6 +44,11 @@
remote_target_path_conn_id,
)

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

try:
from airflow.datasets import Dataset
from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor
Expand Down Expand Up @@ -766,19 +766,31 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
abstract_dbt_local_base_kwargs = {}
base_operator_kwargs = {}
abstract_dbt_local_base_args_keys = (
inspect.getfullargspec(AbstractDbtBase.__init__).args
+ inspect.getfullargspec(AbstractDbtLocalBase.__init__).args
)
base_operator_args = set(inspect.signature(BaseOperator.__init__).parameters.keys())
for arg_key, arg_value in kwargs.items():
if arg_key in abstract_dbt_local_base_args_keys:
abstract_dbt_local_base_kwargs[arg_key] = arg_value
if arg_key in base_operator_args:
base_operator_kwargs[arg_key] = arg_value
AbstractDbtLocalBase.__init__(self, **abstract_dbt_local_base_kwargs)
base_kwargs = {}
operator_kwargs = {}
operator_args = {*inspect.signature(BaseOperator.__init__).parameters.keys()}

default_args = kwargs.get("default_args", {})

for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

for arg in {
*inspect.getfullargspec(AbstractDbtBase.__init__).args,
*inspect.getfullargspec(AbstractDbtLocalBase.__init__).args,
}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass

AbstractDbtLocalBase.__init__(self, **base_kwargs)
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
if (
kwargs.get("emit_datasets", True)
Expand All @@ -791,12 +803,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
# error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str")
dag_id = kwargs.get("dag")
task_group_id = kwargs.get("task_group")
base_operator_kwargs["outlets"] = [
operator_kwargs["outlets"] = [
DatasetAlias(name=get_dataset_alias_name(dag_id, task_group_id, self.task_id))
] # type: ignore
if "task_id" in base_operator_kwargs:
base_operator_kwargs.pop("task_id")
BaseOperator.__init__(self, task_id=self.task_id, **base_operator_kwargs)
if "task_id" in operator_kwargs:
operator_kwargs.pop("task_id")
BaseOperator.__init__(self, task_id=self.task_id, **operator_kwargs)


class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator):
Expand Down
Loading