Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,6 @@ jobs:
python-version: [ "3.11" ]
airflow-version: [ "2.10", "3.0" ]
dbt-version: ["1.5", "1.6", "1.7", "1.8", "1.9"]
# TODO: Add support for dbt 1.7 for Airflow 3.0.
# Issue dbt 1.7: https://github.com/astronomer/astronomer-cosmos/issues/1709
exclude:
- python-version: "3.11"
airflow-version: "3.0"
dbt-version: "1.7"
services:
postgres:
image: postgres
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ celerybeat.pid
# Environments
.venv
env/
# mock-venv created by Cosmos unit tests
mock-venv/
venv/
ENV/
env.bak/
Expand Down Expand Up @@ -156,6 +158,7 @@ dev/dags/.airflowignore
airflow.cfg
airflow.db
standalone_admin_password.txt
simple_auth_manager_passwords.json.generated
webserver_config.py

# VI
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.6
rev: v0.11.7
hooks:
- id: ruff
args:
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class DbtResourceType(aenum.Enum): # type: ignore
SEED = "seed"
TEST = "test"
SOURCE = "source"
EXPOSURE = "exposure"

@classmethod
def _missing_value_(cls, value): # type: ignore
Expand Down
38 changes: 37 additions & 1 deletion cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TAG_SELECTOR = "tag:"
CONFIG_SELECTOR = "config."
SOURCE_SELECTOR = "source:"
EXPOSURE_SELECTOR = "exposure:"
RESOURCE_TYPE_SELECTOR = "resource_type:"
EXCLUDE_RESOURCE_TYPE_SELECTOR = "exclude_resource_type:"
PLUS_SELECTOR = "+"
Expand Down Expand Up @@ -82,6 +83,7 @@ class GraphSelector:
resource_type:resource_name
source:source_name
exclude_resource_type:resource_name
exposure:exposure_name

https://docs.getdbt.com/reference/node-selection/graph-operators
"""
Expand Down Expand Up @@ -197,7 +199,7 @@ def select_node_descendants(self, nodes: dict[str, DbtNode], root_id: str, selec
previous_generation = new_generation
depth -= 1

def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901
"""
Given a dictionary with the original dbt project nodes, applies the current graph selector to
identify the subset of nodes that matches the selection criteria.
Expand Down Expand Up @@ -229,6 +231,18 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
}
)

elif EXPOSURE_SELECTOR in self.node_name:
exposure_selection = self.node_name[len(EXPOSURE_SELECTOR) :]

# match node.resource_type == EXPOSURE, node.resource_name == exposure_selection
root_nodes.update(
{
node_id
for node_id, node in nodes.items()
if node.resource_type == DbtResourceType.EXPOSURE and node.resource_name == exposure_selection
}
)

elif CONFIG_SELECTOR in self.node_name:
config_selection_key, config_selection_value = self.node_name[len(CONFIG_SELECTOR) :].split(":")
# currently tags, materialized, schema and meta are the only supported config keys
Expand Down Expand Up @@ -332,6 +346,7 @@ def __init__(self, project_dir: Path | None, statement: str):
self.other: list[str] = []
self.graph_selectors: list[GraphSelector] = []
self.sources: list[str] = []
self.exposures: list[str] = []
self.resource_types: list[str] = []
self.exclude_resource_types: list[str] = []
self.load_from_statement(statement)
Expand All @@ -345,6 +360,7 @@ def is_empty(self) -> bool:
or self.graph_selectors
or self.other
or self.sources
or self.exposures
or self.resource_types
or self.exclude_resource_types
)
Expand Down Expand Up @@ -384,6 +400,8 @@ def _handle_no_precursors_or_descendants(self, item: str, node_name: str) -> Non
self._parse_config_selector(item)
elif node_name.startswith(SOURCE_SELECTOR):
self._parse_source_selector(item)
elif node_name.startswith(EXPOSURE_SELECTOR):
self._parse_exposure_selector(item)
elif node_name.startswith(RESOURCE_TYPE_SELECTOR):
self._parse_resource_type_selector(item)
elif node_name.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR):
Expand Down Expand Up @@ -433,6 +451,11 @@ def _parse_source_selector(self, item: str) -> None:
source_name = item[index:].strip()
self.sources.append(source_name)

def _parse_exposure_selector(self, item: str) -> None:
index = len(EXPOSURE_SELECTOR)
exposure_name = item[index:].strip()
self.exposures.append(exposure_name)

def __repr__(self) -> str:
return (
"SelectorConfig("
Expand All @@ -441,6 +464,7 @@ def __repr__(self) -> str:
+ f"config={self.config}, "
+ f"sources={self.sources}, "
+ f"resource={self.resource_types}, "
+ f"exposures={self.exposures}, "
+ f"exclude_resource={self.exclude_resource_types}, "
+ f"other={self.other}, "
+ f"graph_selectors={self.graph_selectors})"
Expand Down Expand Up @@ -565,6 +589,9 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool:
if self.config.sources and not self._is_source_matching(node):
return False

if self.config.exposures and not self._is_exposure_matching(node):
return False

return True

def _is_resource_type_matching(self, node: DbtNode) -> bool:
Expand All @@ -585,6 +612,14 @@ def _is_source_matching(self, node: DbtNode) -> bool:
return False
return True

def _is_exposure_matching(self, node: DbtNode) -> bool:
"""Checks if the node's exposure is a subset of the config's exposure."""
if node.resource_type != DbtResourceType.EXPOSURE:
return False
if node.resource_name not in self.config.exposures:
return False
return True

def _is_tags_subset(self, node: DbtNode) -> bool:
"""Checks if the node's tags are a subset of the config's tags."""
if not (set(self.config.tags) <= set(node.tags)):
Expand Down Expand Up @@ -713,6 +748,7 @@ def validate_filters(exclude: list[str], select: list[str]) -> None:
or filter_parameter.startswith(RESOURCE_TYPE_SELECTOR)
or filter_parameter.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR)
or filter_parameter.startswith(SOURCE_SELECTOR)
or filter_parameter.startswith(EXPOSURE_SELECTOR)
or PLUS_SELECTOR in filter_parameter
or any([filter_parameter.startswith(CONFIG_SELECTOR + config) for config in SUPPORTED_CONFIG])
):
Expand Down
9 changes: 5 additions & 4 deletions cosmos/operators/_asynchronous/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

from typing import Any

try: # Airflow 3
from airflow.sdk.bases.operator import BaseOperator
except ImportError: # Airflow 2
from airflow.models import BaseOperator
from airflow.utils.context import Context

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2


class DbtRunAirflowAsyncDatabricksOperator(BaseOperator): # type: ignore[misc]
def __init__(self, *args: Any, **kwargs: Any):
Expand Down
38 changes: 30 additions & 8 deletions cosmos/operators/aws_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
DEFAULT_CONTAINER_NAME = "dbt"
DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2

try:
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
except ImportError: # pragma: no cover
Expand Down Expand Up @@ -76,21 +81,38 @@ def __init__(
"overrides": None,
}
)
super().__init__(**kwargs)

# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
base_operator_args = set(inspect.signature(EcsRunTaskOperator.__init__).parameters.keys())
default_args = kwargs.get("default_args", {})
operator_kwargs = {}

operator_args: set[str] = set()
for clazz in EcsRunTaskOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
base_kwargs["aws_conn_id"] = aws_conn_id
EcsRunTaskOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
EcsRunTaskOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
35 changes: 28 additions & 7 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import inspect
from typing import TYPE_CHECKING, Any, Callable, Sequence

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2
if TYPE_CHECKING: # pragma: no cover
try:
from airflow.sdk.definitions.context import Context
Expand Down Expand Up @@ -68,20 +72,37 @@ def __init__(
"registry_conn_id": registry_conn_id,
}
)
super().__init__(**kwargs)
# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
base_operator_args = set(inspect.signature(AzureContainerInstancesOperator.__init__).parameters.keys())

default_args = kwargs.get("default_args", {})
operator_kwargs = {}
operator_args: set[str] = set()
for clazz in AzureContainerInstancesOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
AzureContainerInstancesOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
AzureContainerInstancesOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
35 changes: 28 additions & 7 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import inspect
from typing import TYPE_CHECKING, Any, Callable, Sequence

try:
from airflow.sdk.bases.operator import BaseOperator # Airflow 3
except ImportError:
from airflow.models import BaseOperator # Airflow 2
if TYPE_CHECKING: # pragma: no cover
try:
from airflow.sdk.definitions.context import Context
Expand Down Expand Up @@ -58,21 +62,38 @@ def __init__(
"Airflow connections are not available in the Docker container for the mapping to work."
)

super().__init__(image=image, **kwargs)
# In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
# and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
# initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
# Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
# from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
# initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
kwargs["image"] = image
base_operator_args = set(inspect.signature(DockerOperator.__init__).parameters.keys())

default_args = kwargs.get("default_args", {})
operator_kwargs = {}
operator_args: set[str] = set()
for clazz in DockerOperator.__mro__:
operator_args.update(inspect.signature(clazz.__init__).parameters.keys())
if clazz == BaseOperator:
break
for arg in operator_args:
try:
operator_kwargs[arg] = kwargs[arg]
except KeyError:
pass

base_kwargs = {}
for arg_key, arg_value in kwargs.items():
if arg_key in base_operator_args:
base_kwargs[arg_key] = arg_value
base_kwargs["task_id"] = kwargs["task_id"]
DockerOperator.__init__(self, **base_kwargs)
for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}:
try:
base_kwargs[arg] = kwargs[arg]
except KeyError:
try:
base_kwargs[arg] = default_args[arg]
except KeyError:
pass
AbstractDbtBase.__init__(self, **base_kwargs)
DockerOperator.__init__(self, **operator_kwargs)

def build_and_run_cmd(
self,
Expand Down
Loading
Loading