From be0ba951c4208e5c12d8d56c81b10c1809db2d7a Mon Sep 17 00:00:00 2001 From: Joppe Vos Date: Sun, 3 Dec 2023 10:39:45 +0100 Subject: [PATCH 1/6] Replace flake8 with ruff --- .pre-commit-config.yaml | 6 ------ pyproject.toml | 9 +++++---- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0ea06541bd..5f6bc0c83e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -77,12 +77,6 @@ repos: name: mypy-python additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow] files: ^cosmos - - repo: https://github.com/pycqa/flake8 - rev: 6.1.0 - hooks: - - id: flake8 - entry: pflake8 - additional_dependencies: [pyproject-flake8] ci: autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks diff --git a/pyproject.toml b/pyproject.toml index fe399daee6..f8d9214b0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -255,10 +255,11 @@ no_warn_unused_ignores = true [tool.ruff] line-length = 120 +[tool.ruff.lint] +select = ["C901"] +[tool.ruff.lint.mccabe] +max-complexity = 6 + [tool.distutils.bdist_wheel] universal = true - -[tool.flake8] -max-complexity = 10 -select = "C" From 7239b02004096e1bbe23ce68f0d7890301fbd28c Mon Sep 17 00:00:00 2001 From: Joppe Vos Date: Sun, 3 Dec 2023 11:21:31 +0100 Subject: [PATCH 2/6] Split task dependencies into function --- cosmos/airflow/graph.py | 13 ++++++++- cosmos/converter.py | 28 +++++++++++-------- cosmos/dbt/selector.py | 62 +++++++++++++++++++++++++---------------- pyproject.toml | 1 - 4 files changed, 67 insertions(+), 37 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index af854d4f50..615c2a1240 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -18,6 +18,7 @@ from cosmos.core.graph.entities import Task as TaskMetadata from cosmos.dbt.graph import DbtNode from cosmos.log import get_logger +from typing import Union logger = get_logger(__name__) @@ -271,7 +272,17 @@ def build_airflow_graph( for leaf_node_id in leaves_ids: tasks_map[leaf_node_id] >> test_task - # Create the Airflow task dependencies between non-test nodes + create_airflow_task_dependencies(nodes, tasks_map) + + +def create_airflow_task_dependencies( + nodes: dict[str, DbtNode], tasks_map: dict[str, Union[TaskGroup, BaseOperator]] +) -> None: + """ + Create the Airflow task dependencies between non-test nodes. + :param nodes: Dictionary mapping dbt nodes (node.unique_id to node) + :param tasks_map: Dictionary mapping dbt nodes (node.unique_id to Airflow task) + """ for node_id, node in nodes.items(): for parent_node_id in node.depends_on: # depending on the node type, it will not have mapped 1:1 to tasks_map diff --git a/cosmos/converter.py b/cosmos/converter.py index 45c95c2e7c..637ef98269 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -3,9 +3,9 @@ from __future__ import annotations -import copy import inspect from typing import Any, Callable +import copy from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup @@ -21,6 +21,18 @@ logger = get_logger(__name__) +def migrate_to_new_interface( + execution_config: ExecutionConfig, project_config: ProjectConfig, render_config: RenderConfig +): + # We copy the configuration so the change does not affect other DAGs or TaskGroups + # that may reuse the same original configuration + render_config = copy.deepcopy(render_config) + execution_config = copy.deepcopy(execution_config) + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path + return execution_config, render_config + + def specific_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]: """ Extract kwargs specific to the cosmos.converter.DbtToAirflowConverter class initialization method. @@ -166,22 +178,16 @@ def __init__( ) -> None: project_config.validate_project() - if not execution_config: - execution_config = ExecutionConfig() - if not render_config: - render_config = RenderConfig() + execution_config = execution_config or ExecutionConfig() + render_config = render_config or RenderConfig() + operator_args = operator_args or {} validate_initial_user_config(execution_config, profile_config, project_config, render_config) # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: - # We copy the configuration so the change does not affect other DAGs or TaskGroups - # that may reuse the same original configuration - render_config = copy.deepcopy(render_config) - execution_config = copy.deepcopy(execution_config) - render_config.project_path = project_config.dbt_project_path - execution_config.project_path = project_config.dbt_project_path + execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config) validate_adapted_user_config(execution_config, project_config, render_config) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index c7eb893075..3e965e0393 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -388,7 +388,44 @@ def select_nodes( if not select and not exclude: return nodes - # validates select and exclude filters + validate_filters(exclude, select) + subset_ids = apply_select_filter(nodes, project_dir, select) + if select: + nodes = get_nodes_from_subset(nodes, subset_ids) + exclude_ids = apply_exclude_filter(nodes, project_dir, exclude) + subset_ids = set(nodes.keys()) - exclude_ids + + return get_nodes_from_subset(nodes, subset_ids) + + +def get_nodes_from_subset(nodes: dict[str, DbtNode], subset_ids: set[str]) -> dict[str, DbtNode]: + nodes = {id_: nodes[id_] for id_ in subset_ids} + return nodes + + +def apply_exclude_filter(nodes: dict[str, DbtNode], project_dir: Path | None, exclude: list[str]) -> set[str]: + exclude_ids: set[str] = set() + for statement in exclude: + config = SelectorConfig(project_dir, statement) + node_selector = NodeSelector(nodes, config) + exclude_ids.update(node_selector.select_nodes_ids_by_intersection) + return exclude_ids + + +def apply_select_filter(nodes: dict[str, DbtNode], project_dir: Path | None, select: list[str]) -> set[str]: + subset_ids: set[str] = set() + for statement in select: + config = SelectorConfig(project_dir, statement) + node_selector = NodeSelector(nodes, config) + select_ids = node_selector.select_nodes_ids_by_intersection + subset_ids.update(select_ids) + return subset_ids + + +def validate_filters(exclude: list[str], select: list[str]) -> None: + """ + Validate select and exclude filters. + """ filters = [["select", select], ["exclude", exclude]] for filter_type, filter in filters: for filter_parameter in filter: @@ -401,26 +438,3 @@ def select_nodes( continue elif ":" in filter_parameter: raise CosmosValueError(f"Invalid {filter_type} filter: {filter_parameter}") - - subset_ids: set[str] = set() - - for statement in select: - config = SelectorConfig(project_dir, statement) - node_selector = NodeSelector(nodes, config) - - select_ids = node_selector.select_nodes_ids_by_intersection - subset_ids.update(set(select_ids)) - - if select: - nodes = {id_: nodes[id_] for id_ in subset_ids} - - nodes_ids = set(nodes.keys()) - - exclude_ids: set[str] = set() - for statement in exclude: - config = SelectorConfig(project_dir, statement) - node_selector = NodeSelector(nodes, config) - exclude_ids.update(set(node_selector.select_nodes_ids_by_intersection)) - subset_ids = set(nodes_ids) - set(exclude_ids) - - return {id_: nodes[id_] for id_ in subset_ids} diff --git a/pyproject.toml b/pyproject.toml index f8d9214b0d..4e18d9cd4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -258,7 +258,6 @@ line-length = 120 [tool.ruff.lint] select = ["C901"] [tool.ruff.lint.mccabe] -max-complexity = 6 [tool.distutils.bdist_wheel] From e39fcfe2e8c2d494b4d50a739cfbb02976551004 Mon Sep 17 00:00:00 2001 From: Joppe Vos Date: Sun, 3 Dec 2023 18:45:12 +0100 Subject: [PATCH 3/6] Disable mccabe on LegacyDbtProject --- cosmos/dbt/parser/project.py | 2 +- pyproject.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index 278b1a0f73..a4df431c8e 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -274,7 +274,7 @@ class LegacyDbtProject: operator_args: Dict[str, Any] = field(default_factory=dict) - def __post_init__(self) -> None: + def __post_init__(self) -> None: # noqa: C901 """ Initializes the parser. """ diff --git a/pyproject.toml b/pyproject.toml index 4e18d9cd4d..6a42888e49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -258,6 +258,7 @@ line-length = 120 [tool.ruff.lint] select = ["C901"] [tool.ruff.lint.mccabe] +max-complexity = 8 [tool.distutils.bdist_wheel] From 18c388de480bf2c0c30dde571fe36b487d3549ba Mon Sep 17 00:00:00 2001 From: Joppe Vos Date: Mon, 4 Dec 2023 15:16:04 +0100 Subject: [PATCH 4/6] remove linting related changes --- .pre-commit-config.yaml | 6 ++++++ pyproject.toml | 9 ++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5f6bc0c83e..0ea06541bd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -77,6 +77,12 @@ repos: name: mypy-python additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow] files: ^cosmos + - repo: https://github.com/pycqa/flake8 + rev: 6.1.0 + hooks: + - id: flake8 + entry: pflake8 + additional_dependencies: [pyproject-flake8] ci: autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks diff --git a/pyproject.toml b/pyproject.toml index 6a42888e49..336c460600 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -255,11 +255,10 @@ no_warn_unused_ignores = true [tool.ruff] line-length = 120 -[tool.ruff.lint] -select = ["C901"] -[tool.ruff.lint.mccabe] -max-complexity = 8 - [tool.distutils.bdist_wheel] universal = true + +[tool.flake8] +max-complexity = 8 +select = "C" From 3e5232b289d5789b6e02b52d7de7cb06dd59d880 Mon Sep 17 00:00:00 2001 From: Joppe Vos Date: Mon, 4 Dec 2023 19:16:02 +0100 Subject: [PATCH 5/6] fix LegacyDbtProject with 'or' statements --- cosmos/dbt/parser/project.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index a4df431c8e..cadedef6c2 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -274,18 +274,14 @@ class LegacyDbtProject: operator_args: Dict[str, Any] = field(default_factory=dict) - def __post_init__(self) -> None: # noqa: C901 + def __post_init__(self) -> None: """ Initializes the parser. """ - if self.dbt_root_path is None: - self.dbt_root_path = "/usr/local/airflow/dags/dbt" - if self.dbt_models_dir is None: - self.dbt_models_dir = "models" - if self.dbt_snapshots_dir is None: - self.dbt_snapshots_dir = "snapshots" - if self.dbt_seeds_dir is None: - self.dbt_seeds_dir = "seeds" + self.dbt_root_path = self.dbt_root_path or "/usr/local/airflow/dags/dbt" + self.dbt_models_dir = self.dbt_models_dir or "models" + self.dbt_snapshots_dir = self.dbt_snapshots_dir or "snapshots" + self.dbt_seeds_dir = self.dbt_seeds_dir or "seeds" # set the project and model dirs self.project_dir = Path(os.path.join(self.dbt_root_path, self.project_name)) From d7454e9973a74bc1a1a254315f5fffd0e3907ea8 Mon Sep 17 00:00:00 2001 From: Joppe Vos Date: Tue, 5 Dec 2023 13:44:57 +0100 Subject: [PATCH 6/6] Refactor 'load_from_statement' --- cosmos/dbt/selector.py | 48 ++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 3e965e0393..76ec31a54b 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -210,27 +210,39 @@ def load_from_statement(self, statement: str) -> None: for item in items: if item.startswith(PATH_SELECTOR): - index = len(PATH_SELECTOR) - if self.project_dir: - self.paths.append(self.project_dir / Path(item[index:])) - else: - self.paths.append(Path(item[index:])) + self._parse_path_selector(item) elif item.startswith(TAG_SELECTOR): - index = len(TAG_SELECTOR) - self.tags.append(item[index:]) + self._parse_tag_selector(item) elif item.startswith(CONFIG_SELECTOR): - index = len(CONFIG_SELECTOR) - key, value = item[index:].split(":") - if key in SUPPORTED_CONFIG: - self.config[key] = value + self._parse_config_selector(item) else: - if item: - graph_selector = GraphSelector.parse(item) - if graph_selector is not None: - self.graph_selectors.append(graph_selector) - else: - self.other.append(item) - logger.warning("Unsupported select statement: %s", item) + self._parse_unknown_selector(item) + + def _parse_unknown_selector(self, item: str) -> None: + if item: + graph_selector = GraphSelector.parse(item) + if graph_selector is not None: + self.graph_selectors.append(graph_selector) + else: + self.other.append(item) + logger.warning("Unsupported select statement: %s", item) + + def _parse_config_selector(self, item: str) -> None: + index = len(CONFIG_SELECTOR) + key, value = item[index:].split(":") + if key in SUPPORTED_CONFIG: + self.config[key] = value + + def _parse_tag_selector(self, item: str) -> None: + index = len(TAG_SELECTOR) + self.tags.append(item[index:]) + + def _parse_path_selector(self, item: str) -> None: + index = len(PATH_SELECTOR) + if self.project_dir: + self.paths.append(self.project_dir / Path(item[index:])) + else: + self.paths.append(Path(item[index:])) def __repr__(self) -> str: return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other}, graph_selectors={self.graph_selectors})"