From d89271f6515f3c9a25e393ee3bb3dd299f4a9e03 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 29 Apr 2025 11:57:23 +0530 Subject: [PATCH 01/10] Change Vitrual env mini env path to string --- dev/dags/example_virtualenv_mini.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_virtualenv_mini.py b/dev/dags/example_virtualenv_mini.py index 849847a25b..660ea3d75c 100644 --- a/dev/dags/example_virtualenv_mini.py +++ b/dev/dags/example_virtualenv_mini.py @@ -30,6 +30,6 @@ append_env=True, py_system_site_packages=False, py_requirements=["dbt-postgres"], - virtualenv_dir=Path("/tmp/persistent-venv2"), + virtualenv_dir="/tmp/persistent-venv2", ) seed_operator From 1f1947c8c2fe3b3eb52a9c23aa04ae4c8dbd7572 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 29 Apr 2025 12:17:14 +0530 Subject: [PATCH 02/10] Ensure virtualenv_dir path exists --- cosmos/operators/virtualenv.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index bc26fe232b..78df804e8a 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -76,14 +76,16 @@ def __init__( py_requirements: list[str] | None = None, pip_install_options: list[str] | None = None, py_system_site_packages: bool = False, - virtualenv_dir: Path | None = None, + virtualenv_dir: str | None = None, is_virtualenv_dir_temporary: bool = False, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] self.pip_install_options = pip_install_options or [] self.py_system_site_packages = py_system_site_packages - self.virtualenv_dir = virtualenv_dir + self.virtualenv_dir = Path(virtualenv_dir) if virtualenv_dir else None + if self.virtualenv_dir: + self.virtualenv_dir.mkdir(parents=True, exist_ok=True) self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary self.max_retries_lock = settings.virtualenv_max_retries_lock self._py_bin: str | None = None From 7d2d27118ad46a942b5d4c6c629c6aaf7f69a8d0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 29 Apr 2025 11:58:07 +0530 Subject: [PATCH 03/10] =?UTF-8?q?=E2=AC=86=20[pre-commit.ci]=20pre-commit?= =?UTF-8?q?=20autoupdate=20(#1720)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.11.6 → v0.11.7](https://github.com/astral-sh/ruff-pre-commit/compare/v0.11.6...v0.11.7) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 58e0f0b1b1..528cb19255 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -57,7 +57,7 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.11.6 + rev: v0.11.7 hooks: - id: ruff args: From 7d3234a43ed6ed2039a2e893586d7d5a2f6e72f3 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 29 Apr 2025 13:27:01 +0530 Subject: [PATCH 04/10] Add files not needed for git tracking to .gitignore (#1723) The PR adds the following files to `.gitignore` 1. `mock-venv`: generated when running cosmos unit tests locally, e.g. `hatch run tests.py3.11-3.0-1.9:test-cov` 2. `simple_auth_manager_passwords.json.generated`: Airflow 3 standalone generated password file --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 0af8bcb446..3d11b2aa0d 100644 --- a/.gitignore +++ b/.gitignore @@ -113,6 +113,8 @@ celerybeat.pid # Environments .venv env/ +# mock-venv created by Cosmos unit tests +mock-venv/ venv/ ENV/ env.bak/ @@ -156,6 +158,7 @@ dev/dags/.airflowignore airflow.cfg airflow.db standalone_admin_password.txt +simple_auth_manager_passwords.json.generated webserver_config.py # VI From b38fdb592db9827a892d833dfdf34426bd93ad7c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 29 Apr 2025 13:28:45 +0530 Subject: [PATCH 05/10] Use latest minor versions for dbt adapters to get in compatibility fixes (#1719) While the initial x.x.0 versions of dbt adapters may have compatibility issues with certain dependencies, I observed that these incompatibilities are progressively addressed in later patch releases. The issue reported in #1709 appears to stem from dbt-bigquery accessing a protected member `google.cloud.bigquery._helpers._CELLDATA_FROM_JSON`, which was removed in `google-cloud-bigquery 3.31.0`. This access I believe was introduced in [dbt-bigquery#974](https://github.com/dbt-labs/dbt-bigquery/pull/974) and later removed in [dbt-bigquery#1061](https://github.com/dbt-labs/dbt-bigquery/pull/1061), with the fix subsequently back-ported to the 1.7 series via [dbt-bigquery#1074](https://github.com/dbt-labs/dbt-bigquery/pull/1074). Therefore, I believe that relying on the latest patch versions is a better approach to avoid such issues, rather than individually resolving discrepancies. This PR aligns with that strategy. closes: #1709 --------- Co-authored-by: Tatiana Al-Chueyr --- .github/workflows/test.yml | 6 ------ scripts/test/integration-dbt-async.sh | 27 +++++++++++++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b8a1a35eb0..93d4bebdbf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -356,12 +356,6 @@ jobs: python-version: [ "3.11" ] airflow-version: [ "2.10", "3.0" ] dbt-version: ["1.5", "1.6", "1.7", "1.8", "1.9"] - # TODO: Add support for dbt 1.7 for Airflow 3.0. - # Issue dbt 1.7: https://github.com/astronomer/astronomer-cosmos/issues/1709 - exclude: - - python-version: "3.11" - airflow-version: "3.0" - dbt-version: "1.7" services: postgres: image: postgres diff --git a/scripts/test/integration-dbt-async.sh b/scripts/test/integration-dbt-async.sh index eefce0bb97..2dbd2a247e 100644 --- a/scripts/test/integration-dbt-async.sh +++ b/scripts/test/integration-dbt-async.sh @@ -8,9 +8,16 @@ DBT_VERSION="$1" echo "DBT_VERSION:" echo "$DBT_VERSION" +# Calculate next minor version +NEXT_MINOR_VERSION=$(echo "$DBT_VERSION" | awk -F. '{print $1"."$2+1}') + +echo "Installing dbt adapters for DBT_VERSION=$DBT_VERSION (<$NEXT_MINOR_VERSION)" pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y -pip install "dbt-postgres==$DBT_VERSION" "dbt-databricks==$DBT_VERSION" "dbt-bigquery==$DBT_VERSION" +pip install -U \ + "dbt-postgres>=$DBT_VERSION,<$NEXT_MINOR_VERSION" \ + "dbt-databricks>=$DBT_VERSION,<$NEXT_MINOR_VERSION" \ + "dbt-bigquery>=$DBT_VERSION,<$NEXT_MINOR_VERSION" # apache-airflow-core 3.0.0 requires pydantic>=2.11.0, but the above dbt adapters in case of version 1.6 and 1.9 install # pydantic 1.10.22 which make it incompatible. @@ -24,6 +31,17 @@ if [ "$DBT_VERSION" = "1.6" ] || [ "$DBT_VERSION" = "1.9" ]; then pip install "pydantic>2.11.0" fi +# As on 28th April, 2025, the latest patch dbt-bigquery 1.6.13 on the 1.6 minor is not yet compatible with the latest +# release of google-cloud-bigquery 3.31.0 as it tries to access a protected attribute which no longer exists in that +# latest version for google-cloud-bigquery and gives the below error: +# stderr: module 'google.cloud.bigquery._helpers' has no attribute '_CELLDATA_FROM_JSON'. +# Hence, we need to install the previous version of google-cloud-bigquery <3.31.0 that still has the protected attribute +# 'google.cloud.bigquery._helpers._CELLDATA_FROM_JSON' available to make it compatible with dbt-bigquery 1.6.13. +if [ "$DBT_VERSION" = "1.6" ]; then + echo "DBT_VERSION is $DBT_VERSION, installing google-cloud-bigquery<3.31.0 for compatibility issue." + pip install "google-cloud-bigquery<3.31.0" +fi + export SOURCE_RENDERING_BEHAVIOR=all rm -rf airflow.* @@ -37,13 +55,6 @@ else airflow db init fi -if [ "$DBT_VERSION" = "1.7" ]; then - # Otherwise, we will get the following error: - # stderr: MessageToJson() got an unexpected keyword argument 'including_default_value_fields' - echo "DBT version is 1.7 — Installing protobuf==4.25.6..." - pip install protobuf==4.25.6 -fi - rm -rf dbt/jaffle_shop/dbt_packages pytest -vv \ --cov=cosmos \ From 2fe32c6b5b963c8683e789c112c700d78e4f9835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Malo=20Jaffr=C3=A9?= <8469951+ghjklw@users.noreply.github.com> Date: Tue, 29 Apr 2025 11:09:11 +0200 Subject: [PATCH 06/10] Implement DBT exposure selector (#1717) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement dbt exposure selector, following exactly the same pattern as source. This is just an updated version of #1559 with the requested additional unit test... sorry for the delay 🙈 Closes #1551 --- cosmos/constants.py | 1 + cosmos/dbt/selector.py | 38 +++++++- docs/configuration/selecting-excluding.rst | 2 + tests/dbt/test_selector.py | 100 +++++++++++++++++++++ 4 files changed, 140 insertions(+), 1 deletion(-) diff --git a/cosmos/constants.py b/cosmos/constants.py index a151b331a8..e00f715816 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -140,6 +140,7 @@ class DbtResourceType(aenum.Enum): # type: ignore SEED = "seed" TEST = "test" SOURCE = "source" + EXPOSURE = "exposure" @classmethod def _missing_value_(cls, value): # type: ignore diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 94f32141d5..7dcef3c066 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -21,6 +21,7 @@ TAG_SELECTOR = "tag:" CONFIG_SELECTOR = "config." SOURCE_SELECTOR = "source:" +EXPOSURE_SELECTOR = "exposure:" RESOURCE_TYPE_SELECTOR = "resource_type:" EXCLUDE_RESOURCE_TYPE_SELECTOR = "exclude_resource_type:" PLUS_SELECTOR = "+" @@ -82,6 +83,7 @@ class GraphSelector: resource_type:resource_name source:source_name exclude_resource_type:resource_name + exposure:exposure_name https://docs.getdbt.com/reference/node-selection/graph-operators """ @@ -197,7 +199,7 @@ def select_node_descendants(self, nodes: dict[str, DbtNode], root_id: str, selec previous_generation = new_generation depth -= 1 - def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: + def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901 """ Given a dictionary with the original dbt project nodes, applies the current graph selector to identify the subset of nodes that matches the selection criteria. @@ -229,6 +231,18 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: } ) + elif EXPOSURE_SELECTOR in self.node_name: + exposure_selection = self.node_name[len(EXPOSURE_SELECTOR) :] + + # match node.resource_type == EXPOSURE, node.resource_name == exposure_selection + root_nodes.update( + { + node_id + for node_id, node in nodes.items() + if node.resource_type == DbtResourceType.EXPOSURE and node.resource_name == exposure_selection + } + ) + elif CONFIG_SELECTOR in self.node_name: config_selection_key, config_selection_value = self.node_name[len(CONFIG_SELECTOR) :].split(":") # currently tags, materialized, schema and meta are the only supported config keys @@ -332,6 +346,7 @@ def __init__(self, project_dir: Path | None, statement: str): self.other: list[str] = [] self.graph_selectors: list[GraphSelector] = [] self.sources: list[str] = [] + self.exposures: list[str] = [] self.resource_types: list[str] = [] self.exclude_resource_types: list[str] = [] self.load_from_statement(statement) @@ -345,6 +360,7 @@ def is_empty(self) -> bool: or self.graph_selectors or self.other or self.sources + or self.exposures or self.resource_types or self.exclude_resource_types ) @@ -384,6 +400,8 @@ def _handle_no_precursors_or_descendants(self, item: str, node_name: str) -> Non self._parse_config_selector(item) elif node_name.startswith(SOURCE_SELECTOR): self._parse_source_selector(item) + elif node_name.startswith(EXPOSURE_SELECTOR): + self._parse_exposure_selector(item) elif node_name.startswith(RESOURCE_TYPE_SELECTOR): self._parse_resource_type_selector(item) elif node_name.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR): @@ -433,6 +451,11 @@ def _parse_source_selector(self, item: str) -> None: source_name = item[index:].strip() self.sources.append(source_name) + def _parse_exposure_selector(self, item: str) -> None: + index = len(EXPOSURE_SELECTOR) + exposure_name = item[index:].strip() + self.exposures.append(exposure_name) + def __repr__(self) -> str: return ( "SelectorConfig(" @@ -441,6 +464,7 @@ def __repr__(self) -> str: + f"config={self.config}, " + f"sources={self.sources}, " + f"resource={self.resource_types}, " + + f"exposures={self.exposures}, " + f"exclude_resource={self.exclude_resource_types}, " + f"other={self.other}, " + f"graph_selectors={self.graph_selectors})" @@ -565,6 +589,9 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool: if self.config.sources and not self._is_source_matching(node): return False + if self.config.exposures and not self._is_exposure_matching(node): + return False + return True def _is_resource_type_matching(self, node: DbtNode) -> bool: @@ -585,6 +612,14 @@ def _is_source_matching(self, node: DbtNode) -> bool: return False return True + def _is_exposure_matching(self, node: DbtNode) -> bool: + """Checks if the node's exposure is a subset of the config's exposure.""" + if node.resource_type != DbtResourceType.EXPOSURE: + return False + if node.resource_name not in self.config.exposures: + return False + return True + def _is_tags_subset(self, node: DbtNode) -> bool: """Checks if the node's tags are a subset of the config's tags.""" if not (set(self.config.tags) <= set(node.tags)): @@ -713,6 +748,7 @@ def validate_filters(exclude: list[str], select: list[str]) -> None: or filter_parameter.startswith(RESOURCE_TYPE_SELECTOR) or filter_parameter.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR) or filter_parameter.startswith(SOURCE_SELECTOR) + or filter_parameter.startswith(EXPOSURE_SELECTOR) or PLUS_SELECTOR in filter_parameter or any([filter_parameter.startswith(CONFIG_SELECTOR + config) for config in SUPPORTED_CONFIG]) ): diff --git a/docs/configuration/selecting-excluding.rst b/docs/configuration/selecting-excluding.rst index 0ed301f4b2..eb4927e16b 100644 --- a/docs/configuration/selecting-excluding.rst +++ b/docs/configuration/selecting-excluding.rst @@ -29,6 +29,8 @@ The ``select`` and ``exclude`` parameters are lists, with values like the follow - ``source:my_source``: include/exclude nodes that have the source ``my_source`` and are of resource_type ``source`` - ``source:my_source+``: include/exclude nodes that have the source ``my_source`` and their children - ``source:my_source.my_table``: include/exclude nodes that have the source ``my_source`` and the table ``my_table`` +- ``exposure:my_exposure``: include/exclude nodes that have the exposure ``my_exposure`` and are of resource_type ``exposure`` +- ``exposure:+my_exposure``: include/exclude nodes that have the exposure ``my_exposure`` and their parents .. note:: diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 89d481ca54..86c1e7278b 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -933,6 +933,62 @@ def test_select_nodes_by_source_name(): assert selected == expected +def test_select_nodes_by_exposure_name(): + """ + Test selecting a single exposure node by exact name 'exposure:exposure_name'. + The code in _should_include_node requires node.resource_type == EXPOSURE + AND node.name == "exposure_name". + """ + local_nodes = dict(sample_nodes) + exposure_node = DbtNode( + unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.exposure_name", + resource_type=DbtResourceType.EXPOSURE, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "exposures/my_exposure.yml", + tags=[], + config={}, + ) + + local_nodes[exposure_node.unique_id] = exposure_node + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, + nodes=local_nodes, + select=["exposure:exposure_name"], + ) + expected = {exposure_node.unique_id: exposure_node} + assert selected == expected + + +def test_select_exposure_nodes_by_graph_ancestry(): + """ + Test selecting an exposure node and its directs ancestors using the syntax '+exposure:exposure_name'. + """ + + local_nodes = dict(sample_nodes) + exposure_node = DbtNode( + unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.exposure_name", + resource_type=DbtResourceType.EXPOSURE, + depends_on=[parent_node.unique_id], + file_path=SAMPLE_PROJ_PATH / "exposures/my_exposure.yml", + tags=[], + config={}, + ) + + local_nodes[exposure_node.unique_id] = exposure_node + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, + nodes=local_nodes, + select=["+exposure:exposure_name"], + ) + expected = { + exposure_node.unique_id: exposure_node, + parent_node.unique_id: parent_node, + grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, + } + assert selected == expected + + def test_exclude_nodes_by_resource_type_seed(): """ Test excluding any seed node via 'resource_type:seed'. @@ -1017,3 +1073,47 @@ def test_source_selector(): source_node_match.unique_id: source_node_match, } assert selected == expected, f"Expected only {source_node_match.unique_id} to match" + + +def test_exposure_selector(): + """ + Covers: + 1) exposure_selection = self.node_name[len(EXPOSURE_SELECTOR):] + 2) root_nodes.update(...) in that exposure logic + 3) __repr__ for SelectorConfig + 4) the line 'if node.resource_name not in self.config.exposures: return False' + """ + local_nodes = dict(sample_nodes) + + exposure_node_match = DbtNode( + unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.my_exposure", + resource_type=DbtResourceType.EXPOSURE, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "exposures/my_exposure.yml", + tags=[], + config={}, + ) + exposure_node_mismatch = DbtNode( + unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.another_exposure", + resource_type=DbtResourceType.EXPOSURE, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "exposures/another_exposure.yml", + tags=[], + config={}, + ) + local_nodes[exposure_node_match.unique_id] = exposure_node_match + local_nodes[exposure_node_mismatch.unique_id] = exposure_node_mismatch + + select_statement = ["exposure:my_exposure"] + + config = SelectorConfig(SAMPLE_PROJ_PATH, select_statement[0]) + + config_repr = repr(config) + assert "my_exposure" in config_repr, "Expected 'my_exposure' to appear in the config repr" + + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=local_nodes, select=select_statement) + + expected = { + exposure_node_match.unique_id: exposure_node_match, + } + assert selected == expected, f"Expected only {exposure_node_match.unique_id} to match" From bb90a2136178ca2402cf1696c98ea662551fdbd0 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Tue, 29 Apr 2025 11:27:43 +0200 Subject: [PATCH 07/10] Allow multiple callbacks (#1693) During some experimentation with [callbacks](https://astronomer.github.io/astronomer-cosmos/configuration/callbacks.html#example-using-callbacks-with-a-single-operator) I have noticed that we can only use a single callback per time, which can be very limiting if I want to add custom callbacks together with the default ones provided by cosmos. This PR is very simple and aims to allow multiple callbacks to be passed as a list of functions, that will then be executed sequentially. --- cosmos/operators/local.py | 8 ++++++-- tests/operators/test_local.py | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 0f36a79233..ba987f2e4a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -160,7 +160,7 @@ def __init__( invocation_mode: InvocationMode | None = None, install_deps: bool = True, copy_dbt_packages: bool = settings.default_copy_dbt_packages, - callback: Callable[[str], None] | None = None, + callback: Callable[[str], None] | list[Callable[[str], None]] | None = None, callback_args: dict[str, Any] | None = None, should_store_compiled_sql: bool = True, should_upload_compiled_sql: bool = False, @@ -508,7 +508,11 @@ def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None self._upload_sql_files(tmp_project_dir, "compiled") if self.callback: self.callback_args.update({"context": context}) - self.callback(tmp_project_dir, **self.callback_args) + if isinstance(self.callback, list): + for callback_fn in self.callback: + callback_fn(tmp_project_dir, **self.callback_args) + else: + self.callback(tmp_project_dir, **self.callback_args) def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_context: dict[str, Any]) -> None: if async_context.get("teardown_task") and settings.enable_teardown_async_task: diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index fbdc8cb3f3..2fb0391cf1 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1562,3 +1562,26 @@ def test_test_clone_project(create_symlinks_mock, copy_dbt_packages_mock, caplog assert f"Cloning project to writable temp directory {tmp_dir_path} from {project_dir}" in caplog.text assert "Copying dbt packages to temporary folder." in caplog.text assert "Completed copying dbt packages to temporary folder." in caplog.text + + +@patch("cosmos.operators.local.AbstractDbtLocalBase.store_freshness_json") +@patch("cosmos.operators.local.AbstractDbtLocalBase.store_compiled_sql") +@patch("cosmos.operators.local.AbstractDbtLocalBase._override_rtif") +def test_handle_post_execution_with_multiple_callbacks( + mock_override_rtif, mock_store_compiled_sql, mock_store_freshness_json +): + + multiple_callbacks = [MagicMock(), MagicMock(), MagicMock()] + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + callback=multiple_callbacks, + callback_args={"arg1": "value1"}, + ) + + context = {"dag_run": MagicMock(), "task": MagicMock()} + operator._handle_post_execution("/tmp/project_dir", context) + + for callback_fn in multiple_callbacks: + callback_fn.assert_called_once_with("/tmp/project_dir", arg1="value1", context=context) From 1793b0ed8eb976bda5ded8fe714edd1459983c5d Mon Sep 17 00:00:00 2001 From: John Horan Date: Tue, 29 Apr 2025 10:31:15 +0100 Subject: [PATCH 08/10] Operator argument fix (#1648) Cosmos 1.9.0 introduced a change such that `AbstractDbtBase` no longer inherits from airflow `BaseOperator` and I believe this introduced two bugs. - In the past you could give arguments using airflows `default_args` and these would end up being passed to cosmos. So for example we were specifying the `project_dir` as a default arg, and in 1.9.0 these projects were broken because AbstractDbtBase was not longer getting a project_dir specified. So the first change is to check both kwargs and default_args for the values AbstractDbtBase requires. ~I've only made this change for the kubernetes operator, but I'd argue it should be done for the local a docker ones too.~ - The existing code tries to inspect `KubernetesPodOperator` for the args that it requires, but notably this approach misses arguments that are required by `BaseOperator`, e.g. the task_group. - ~I've made an effort to restore the convention in airflow that all arguments must be consumed by the operator. So an argument must either be consumed by `AbstractDbtBase` or it will be passed along to the operator, it may also be consumed by both classes.~ --- cosmos/operators/_asynchronous/databricks.py | 9 ++- cosmos/operators/aws_ecs.py | 38 ++++++++-- cosmos/operators/azure_container_instance.py | 35 +++++++-- cosmos/operators/docker.py | 35 +++++++-- cosmos/operators/gcp_cloud_run_job.py | 38 +++++++--- cosmos/operators/kubernetes.py | 37 ++++++++-- cosmos/operators/local.py | 56 ++++++++------ tests/operators/test_kubernetes.py | 77 +++++++++++++++++++- 8 files changed, 260 insertions(+), 65 deletions(-) diff --git a/cosmos/operators/_asynchronous/databricks.py b/cosmos/operators/_asynchronous/databricks.py index 3b0ce3882d..7bba2bb35d 100644 --- a/cosmos/operators/_asynchronous/databricks.py +++ b/cosmos/operators/_asynchronous/databricks.py @@ -3,12 +3,13 @@ from typing import Any -try: # Airflow 3 - from airflow.sdk.bases.operator import BaseOperator -except ImportError: # Airflow 2 - from airflow.models import BaseOperator from airflow.utils.context import Context +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + class DbtRunAirflowAsyncDatabricksOperator(BaseOperator): # type: ignore[misc] def __init__(self, *args: Any, **kwargs: Any): diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 8526ea22ce..7df69de3b0 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -29,6 +29,11 @@ DEFAULT_CONTAINER_NAME = "dbt" DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + try: from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator except ImportError: # pragma: no cover @@ -76,21 +81,38 @@ def __init__( "overrides": None, } ) - super().__init__(**kwargs) + # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit # Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed # from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly # initialize them (including the BaseOperator) here by segregating the required arguments for each parent class. - base_operator_args = set(inspect.signature(EcsRunTaskOperator.__init__).parameters.keys()) + default_args = kwargs.get("default_args", {}) + operator_kwargs = {} + + operator_args: set[str] = set() + for clazz in EcsRunTaskOperator.__mro__: + operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) + if clazz == BaseOperator: + break + for arg in operator_args: + try: + operator_kwargs[arg] = kwargs[arg] + except KeyError: + pass + base_kwargs = {} - for arg_key, arg_value in kwargs.items(): - if arg_key in base_operator_args: - base_kwargs[arg_key] = arg_value - base_kwargs["task_id"] = kwargs["task_id"] - base_kwargs["aws_conn_id"] = aws_conn_id - EcsRunTaskOperator.__init__(self, **base_kwargs) + for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}: + try: + base_kwargs[arg] = kwargs[arg] + except KeyError: + try: + base_kwargs[arg] = default_args[arg] + except KeyError: + pass + AbstractDbtBase.__init__(self, **base_kwargs) + EcsRunTaskOperator.__init__(self, **operator_kwargs) def build_and_run_cmd( self, diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index 64b88b7134..55a968af1b 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -3,6 +3,10 @@ import inspect from typing import TYPE_CHECKING, Any, Callable, Sequence +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 if TYPE_CHECKING: # pragma: no cover try: from airflow.sdk.definitions.context import Context @@ -68,20 +72,37 @@ def __init__( "registry_conn_id": registry_conn_id, } ) - super().__init__(**kwargs) # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit # Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed # from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly # initialize them (including the BaseOperator) here by segregating the required arguments for each parent class. - base_operator_args = set(inspect.signature(AzureContainerInstancesOperator.__init__).parameters.keys()) + + default_args = kwargs.get("default_args", {}) + operator_kwargs = {} + operator_args: set[str] = set() + for clazz in AzureContainerInstancesOperator.__mro__: + operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) + if clazz == BaseOperator: + break + for arg in operator_args: + try: + operator_kwargs[arg] = kwargs[arg] + except KeyError: + pass + base_kwargs = {} - for arg_key, arg_value in kwargs.items(): - if arg_key in base_operator_args: - base_kwargs[arg_key] = arg_value - base_kwargs["task_id"] = kwargs["task_id"] - AzureContainerInstancesOperator.__init__(self, **base_kwargs) + for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}: + try: + base_kwargs[arg] = kwargs[arg] + except KeyError: + try: + base_kwargs[arg] = default_args[arg] + except KeyError: + pass + AbstractDbtBase.__init__(self, **base_kwargs) + AzureContainerInstancesOperator.__init__(self, **operator_kwargs) def build_and_run_cmd( self, diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index f401f4a528..d40f737689 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -3,6 +3,10 @@ import inspect from typing import TYPE_CHECKING, Any, Callable, Sequence +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 if TYPE_CHECKING: # pragma: no cover try: from airflow.sdk.definitions.context import Context @@ -58,7 +62,6 @@ def __init__( "Airflow connections are not available in the Docker container for the mapping to work." ) - super().__init__(image=image, **kwargs) # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit @@ -66,13 +69,31 @@ def __init__( # from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly # initialize them (including the BaseOperator) here by segregating the required arguments for each parent class. kwargs["image"] = image - base_operator_args = set(inspect.signature(DockerOperator.__init__).parameters.keys()) + + default_args = kwargs.get("default_args", {}) + operator_kwargs = {} + operator_args: set[str] = set() + for clazz in DockerOperator.__mro__: + operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) + if clazz == BaseOperator: + break + for arg in operator_args: + try: + operator_kwargs[arg] = kwargs[arg] + except KeyError: + pass + base_kwargs = {} - for arg_key, arg_value in kwargs.items(): - if arg_key in base_operator_args: - base_kwargs[arg_key] = arg_value - base_kwargs["task_id"] = kwargs["task_id"] - DockerOperator.__init__(self, **base_kwargs) + for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}: + try: + base_kwargs[arg] = kwargs[arg] + except KeyError: + try: + base_kwargs[arg] = default_args[arg] + except KeyError: + pass + AbstractDbtBase.__init__(self, **base_kwargs) + DockerOperator.__init__(self, **operator_kwargs) def build_and_run_cmd( self, diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py index 55ec2d32c0..523c687f07 100644 --- a/cosmos/operators/gcp_cloud_run_job.py +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -28,6 +28,11 @@ DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + try: from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator @@ -72,7 +77,6 @@ def __init__( self.profile_config = profile_config self.command = command self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES - super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs) # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit @@ -84,17 +88,33 @@ def __init__( "project_id": project_id, "region": region, "job_name": job_name, - "command": command, - "environment_variables": environment_variables, } ) - base_operator_args = set(inspect.signature(CloudRunExecuteJobOperator.__init__).parameters.keys()) + + default_args = kwargs.get("default_args", {}) + operator_kwargs = {} + operator_args: set[str] = set() + for clazz in CloudRunExecuteJobOperator.__mro__: + operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) + if clazz == BaseOperator: + break + for arg in operator_args: + try: + operator_kwargs[arg] = kwargs[arg] + except KeyError: + pass + base_kwargs = {} - for arg_key, arg_value in kwargs.items(): - if arg_key in base_operator_args: - base_kwargs[arg_key] = arg_value - base_kwargs["task_id"] = kwargs["task_id"] - CloudRunExecuteJobOperator.__init__(self, **base_kwargs) + for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}: + try: + base_kwargs[arg] = kwargs[arg] + except KeyError: + try: + base_kwargs[arg] = default_args[arg] + except KeyError: + pass + AbstractDbtBase.__init__(self, **base_kwargs) + CloudRunExecuteJobOperator.__init__(self, **operator_kwargs) def build_and_run_cmd( self, diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 0ac7f909d3..bbc1563599 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -29,6 +29,11 @@ DbtTestMixin, ) +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + DBT_NO_TESTS_MSG = "Nothing to do" DBT_WARN_MSG = "WARN" @@ -64,20 +69,38 @@ class DbtKubernetesBaseOperator(AbstractDbtBase, KubernetesPodOperator): # type def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None: self.profile_config = profile_config - super().__init__(**kwargs) + # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit # Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed # from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly # initialize them (including the BaseOperator) here by segregating the required arguments for each parent class. - base_operator_args = set(inspect.signature(KubernetesPodOperator.__init__).parameters.keys()) + default_args = kwargs.get("default_args", {}) + operator_kwargs = {} + operator_args: set[str] = set() + for clazz in KubernetesPodOperator.__mro__: + operator_args.update(inspect.signature(clazz.__init__).parameters.keys()) + if clazz == BaseOperator: + break + for arg in operator_args: + try: + operator_kwargs[arg] = kwargs[arg] + except KeyError: + pass + base_kwargs = {} - for arg_key, arg_value in kwargs.items(): - if arg_key in base_operator_args: - base_kwargs[arg_key] = arg_value - base_kwargs["task_id"] = kwargs["task_id"] - KubernetesPodOperator.__init__(self, **base_kwargs) + for arg in {*inspect.signature(AbstractDbtBase.__init__).parameters.keys()}: + try: + base_kwargs[arg] = kwargs[arg] + except KeyError: + try: + base_kwargs[arg] = default_args[arg] + except KeyError: + pass + + AbstractDbtBase.__init__(self, **base_kwargs) + KubernetesPodOperator.__init__(self, **operator_kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: env_vars_dict: dict[str, str] = dict() diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index ba987f2e4a..5f76f02c4a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -17,11 +17,6 @@ import jinja2 from airflow import DAG from airflow.exceptions import AirflowException, AirflowSkipException - -try: # Airflow 3 - from airflow.sdk.bases.operator import BaseOperator -except ImportError: # Airflow 2 - from airflow.models import BaseOperator from airflow.models.taskinstance import TaskInstance if TYPE_CHECKING: # pragma: no cover @@ -49,6 +44,11 @@ remote_target_path_conn_id, ) +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + try: from airflow.datasets import Dataset from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor @@ -766,19 +766,31 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed # from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly # initialize them (including the BaseOperator) here by segregating the required arguments for each parent class. - abstract_dbt_local_base_kwargs = {} - base_operator_kwargs = {} - abstract_dbt_local_base_args_keys = ( - inspect.getfullargspec(AbstractDbtBase.__init__).args - + inspect.getfullargspec(AbstractDbtLocalBase.__init__).args - ) - base_operator_args = set(inspect.signature(BaseOperator.__init__).parameters.keys()) - for arg_key, arg_value in kwargs.items(): - if arg_key in abstract_dbt_local_base_args_keys: - abstract_dbt_local_base_kwargs[arg_key] = arg_value - if arg_key in base_operator_args: - base_operator_kwargs[arg_key] = arg_value - AbstractDbtLocalBase.__init__(self, **abstract_dbt_local_base_kwargs) + base_kwargs = {} + operator_kwargs = {} + operator_args = {*inspect.signature(BaseOperator.__init__).parameters.keys()} + + default_args = kwargs.get("default_args", {}) + + for arg in operator_args: + try: + operator_kwargs[arg] = kwargs[arg] + except KeyError: + pass + + for arg in { + *inspect.getfullargspec(AbstractDbtBase.__init__).args, + *inspect.getfullargspec(AbstractDbtLocalBase.__init__).args, + }: + try: + base_kwargs[arg] = kwargs[arg] + except KeyError: + try: + base_kwargs[arg] = default_args[arg] + except KeyError: + pass + + AbstractDbtLocalBase.__init__(self, **base_kwargs) if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: if ( kwargs.get("emit_datasets", True) @@ -791,12 +803,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str") dag_id = kwargs.get("dag") task_group_id = kwargs.get("task_group") - base_operator_kwargs["outlets"] = [ + operator_kwargs["outlets"] = [ DatasetAlias(name=get_dataset_alias_name(dag_id, task_group_id, self.task_id)) ] # type: ignore - if "task_id" in base_operator_kwargs: - base_operator_kwargs.pop("task_id") - BaseOperator.__init__(self, task_id=self.task_id, **base_operator_kwargs) + if "task_id" in operator_kwargs: + operator_kwargs.pop("task_id") + BaseOperator.__init__(self, task_id=self.task_id, **operator_kwargs) class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator): diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 27ddd399db..7ae7edd63f 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -3,20 +3,25 @@ import pytest from airflow import __version__ as airflow_version -from airflow.models import TaskInstance +from airflow.models import DAG, TaskInstance from airflow.utils.context import Context, context_merge from packaging import version from pendulum import datetime +from cosmos import ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig +from cosmos.airflow.task_group import DbtTaskGroup from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, DbtCloneKubernetesOperator, + DbtKubernetesBaseOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, + DbtRunOperationKubernetesOperator, DbtSeedKubernetesOperator, DbtSourceKubernetesOperator, DbtTestKubernetesOperator, ) +from cosmos.profiles import PostgresUserPasswordProfileMapping try: from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction @@ -462,3 +467,73 @@ def test_operator_execute_with_flags(operator_class, kwargs, expected_cmd): pod_args = get_or_create_pod.call_args.kwargs["pod_request_obj"].to_dict()["spec"]["containers"][0]["args"] assert expected_cmd == pod_args + + +DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" +DBT_PROJECT_NAME = "jaffle_shop" + + +@pytest.mark.integration +def test_kubernetes_task_group(): + profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), + ) + + group_id = "test_group" + image = "test_image" + with DAG( + "test-id-dbt-compile", + start_date=datetime(2022, 1, 1), + default_args={ + "image": image, + }, + ) as dag: + task_group = DbtTaskGroup( + group_id=group_id, + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.KUBERNETES, + ), + profile_config=profile_config, + ) + + assert all(t.task_id.startswith(f"{group_id}.") for t in task_group) + assert len(dag.tasks) == len([t for t in task_group]) + assert any(t for t in task_group if isinstance(t, DbtKubernetesBaseOperator)) + assert all(t.image == image for t in (t for t in task_group if isinstance(t, DbtKubernetesBaseOperator))) + + +@pytest.mark.integration +def test_kubernetes_default_args(): + profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), + ) + + image = "test_image" + with DAG( + "test-id-dbt-compile", + start_date=datetime(2022, 1, 1), + default_args={"project_dir": DBT_ROOT_PATH / "jaffle_shop", "image": image, "profile_config": profile_config}, + ): + dbt_run_operation = DbtRunOperationKubernetesOperator( + task_id="run_macro_command", + macro_name="macro", + ) + + assert dbt_run_operation.image == image + assert dbt_run_operation.project_dir == DBT_ROOT_PATH / "jaffle_shop" + assert dbt_run_operation.profile_config.target_name == profile_config.target_name From 6213e23bb46f4a4ec02155e093ac6a65eda62313 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Tue, 29 Apr 2025 11:48:18 +0200 Subject: [PATCH 09/10] Allow multiple callbacks (#1693) During some experimentation with [callbacks](https://astronomer.github.io/astronomer-cosmos/configuration/callbacks.html#example-using-callbacks-with-a-single-operator) I have noticed that we can only use a single callback per time, which can be very limiting if I want to add custom callbacks together with the default ones provided by cosmos. This PR is very simple and aims to allow multiple callbacks to be passed as a list of functions, that will then be executed sequentially. From f7b12fba1aae01c6daeb204f56f4c77cb734c334 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 29 Apr 2025 15:55:59 +0530 Subject: [PATCH 10/10] Revert type change --- cosmos/operators/virtualenv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 78df804e8a..9b61fa399d 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -76,14 +76,14 @@ def __init__( py_requirements: list[str] | None = None, pip_install_options: list[str] | None = None, py_system_site_packages: bool = False, - virtualenv_dir: str | None = None, + virtualenv_dir: Path | None = None, is_virtualenv_dir_temporary: bool = False, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] self.pip_install_options = pip_install_options or [] self.py_system_site_packages = py_system_site_packages - self.virtualenv_dir = Path(virtualenv_dir) if virtualenv_dir else None + self.virtualenv_dir = virtualenv_dir if self.virtualenv_dir: self.virtualenv_dir.mkdir(parents=True, exist_ok=True) self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary