From b66f49b07dbfdc8afbba29cf4c58c37bd596ed1b Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 13:23:04 +0100 Subject: [PATCH 1/9] Create dictionary and set using literals --- cosmos/dbt/parser/project.py | 2 +- cosmos/operators/airflow_async.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index 6339a68d8f..633c96618c 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -411,7 +411,7 @@ def _extract_model_tests( type=DbtModelType.DBT_TEST, path=path, dbt_vars=self.dbt_vars, - config=DbtModelConfig(upstream_models=set({model_name})), + config=DbtModelConfig(upstream_models={model_name}), ) tests[test_model.name] = test_model return tests diff --git a/cosmos/operators/airflow_async.py b/cosmos/operators/airflow_async.py index c0cb382b60..8273aa6327 100644 --- a/cosmos/operators/airflow_async.py +++ b/cosmos/operators/airflow_async.py @@ -61,10 +61,9 @@ def __init__( non_async_args |= set(inspect.signature(DbtLocalBaseOperator.__init__).parameters.keys()) non_async_args |= set(inspect.signature(AbstractDbtLocalBase.__init__).parameters.keys()) - dbt_kwargs = {} - - # Extract full_refresh from kwargs if present - dbt_kwargs["full_refresh"] = kwargs.pop("full_refresh", False) + dbt_kwargs = { + "full_refresh": kwargs.pop("full_refresh", False) + } for arg_key, arg_value in kwargs.items(): if arg_key == "task_id": From 1687dcfc274c7d842480bb80019fb370c8a21eeb Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 13:38:33 +0100 Subject: [PATCH 2/9] Remove unused transform_host methods --- cosmos/profiles/databricks/token.py | 4 ---- cosmos/profiles/trino/base.py | 4 ---- 2 files changed, 8 deletions(-) diff --git a/cosmos/profiles/databricks/token.py b/cosmos/profiles/databricks/token.py index 78d97eb77c..55229f8769 100644 --- a/cosmos/profiles/databricks/token.py +++ b/cosmos/profiles/databricks/token.py @@ -45,7 +45,3 @@ def profile(self) -> dict[str, Any | None]: # token should always get set as env var "token": self.get_env_var_format("token"), } - - def transform_host(self, host: str) -> str: - """Removes the https:// prefix.""" - return host.replace("https://", "") diff --git a/cosmos/profiles/trino/base.py b/cosmos/profiles/trino/base.py index 5554d340e2..9ae7ec64b3 100644 --- a/cosmos/profiles/trino/base.py +++ b/cosmos/profiles/trino/base.py @@ -46,7 +46,3 @@ def mock_profile(self) -> dict[str, Any]: mock_profile = super().mock_profile mock_profile["port"] = 99999 return mock_profile - - def transform_host(self, host: str) -> str: - """Replaces http:// or https:// with nothing.""" - return host.replace("http://", "").replace("https://", "") From 22a365d5ea7032b52fcac240b91d41b5b92034ab Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 14:02:13 +0100 Subject: [PATCH 3/9] Remove redundant parentheses --- dev/dags/basic_cosmos_task_group.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 175ca64458..8ed2151c3e 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -49,7 +49,10 @@ customers = DbtTaskGroup( group_id="customers", - project_config=ProjectConfig((DBT_PROJECT_PATH).as_posix(), dbt_vars={"var": "2"}), + project_config=ProjectConfig( + dbt_project_path=DBT_PROJECT_PATH.as_posix(), + dbt_vars={"var": "2"}, + ), render_config=RenderConfig( select=["path:seeds/raw_customers.csv"], enable_mock_profile=False, @@ -64,9 +67,7 @@ orders = DbtTaskGroup( group_id="orders", - project_config=ProjectConfig( - (DBT_PROJECT_PATH).as_posix(), - ), + project_config=ProjectConfig(DBT_PROJECT_PATH.as_posix()), render_config=RenderConfig( select=["path:seeds/raw_orders.csv"], enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping From a5e6a56eaf9c7e402247416860b453b86b2d5edd Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 14:07:52 +0100 Subject: [PATCH 4/9] Remove unreachable code --- cosmos/config.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index dd13c37040..163fd6731a 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -331,8 +331,6 @@ def get_profile_type(self) -> str: target_type = profile["outputs"][self.target_name]["type"] return str(target_type) - return "undefined" - def _get_profile_path(self, use_mock_values: bool = False) -> Path: """ Handle the profile caching mechanism. From 96a95c35a2d94c6e8247469b3b8f0a4ad432c9ba Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 15:24:18 +0100 Subject: [PATCH 5/9] Use equality operators to compare with None --- tests/dbt/test_graph.py | 2 +- tests/operators/test_virtualenv.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index ed061c3bd5..e5437587df 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1939,7 +1939,7 @@ def test_profile_created_correctly_with_profile_mapping( profile_config=profile_config, ) - assert dbt_graph.load_via_dbt_ls() == None + assert dbt_graph.load_via_dbt_ls() is None @patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index a9f28dadd6..77116e06a8 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -125,7 +125,7 @@ def test_run_command_without_virtualenv_dir( invocation_mode=InvocationMode.SUBPROCESS, vars={"variable": "value"}, ) - assert venv_operator.virtualenv_dir == None + assert venv_operator.virtualenv_dir is None venv_operator.run_command( cmd=["fake-dbt", "do-something"], env={}, context={"task_instance": MagicMock(), "run_id": "test_run_id"} ) From 2c58d5d37dffa4580396403012c6fc3c7fe93368 Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 16:58:21 +0100 Subject: [PATCH 6/9] Fix incorrect docstrings --- cosmos/converter.py | 8 ++++---- cosmos/core/airflow.py | 2 ++ cosmos/hooks/subprocess.py | 2 ++ cosmos/operators/_watcher/base.py | 2 ++ tests/utils.py | 6 +++--- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index c06a0db106..0f0f705f6f 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -97,11 +97,11 @@ def validate_arguments( Validate that mutually exclusive selectors filters have not been given. Validate deprecated arguments. - :param select: A list of dbt select arguments (e.g. 'config.materialized:incremental') - :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') - :param profile_config: ProfileConfig Object + :param render_config: Render configuration + :param profile_config: Profile configuration :param task_args: Arguments to be used to instantiate an Airflow Task - :param execution_mode: the current execution mode + :param execution_config: Execution configuration + :param project_config: Project configuration """ for field in ("tags", "paths"): select_items = retrieve_by_label(render_config.select, field) diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index 646e1ceb26..cd5be49adc 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -27,6 +27,8 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) Get the Airflow Operator class for a Task. :param task: The Task to get the Operator for + :param dag: The DAG to get the Operator for + :param task_group: The TaskGroup to get the Operator for :return: The Operator class :rtype: BaseOperator diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 8d016c887f..2f632d7682 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -59,6 +59,8 @@ def run_command( :param output_encoding: encoding to use for decoding stdout :param cwd: Working directory to run the command in. If None (default), the command is run in a temporary directory. + :param process_log_line: A callable to process log line + :return: :class:`namedtuple` containing: ``exit_code`` ``output``: the last line from stderr or stdout diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 4161dcf282..6b416cf9b4 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -106,6 +106,8 @@ def store_dbt_resource_status_from_log( extracts node status information, and pushes it to XCom for consumption by downstream watcher sensors. + :param line: A single line from dbt JSON logs. + :param extra_kwargs: Additional keywords arguments. :param tests_per_model: Mapping of model unique_id to list of test unique_ids associated with that model, as built by DbtGraph.update_node_dependency(). Empty dict when no tests exist. diff --git a/tests/utils.py b/tests/utils.py index 7c30e02666..c9a5440b70 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -169,12 +169,12 @@ def test_old_dag( def add_logger_if_needed(dag: DAG, ti: TaskInstance): """ - Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead + Add a formatted logger to the task instance so all logs are surfaced to the command line instead of into a task file. Since this is a local test run, it is much better for the user to see logs in the command line, rather than needing to search for a log file. - Args: - ti: The taskinstance that will receive a logger + :param dag: The DAG that will receive a logger + :param ti: The task instance that will receive a logger """ logging_format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") handler = logging.StreamHandler(sys.stdout) From 7a6eea023ccf71245433fbedc14dcff7500a8cb4 Mon Sep 17 00:00:00 2001 From: dnskr Date: Sat, 7 Mar 2026 19:09:28 +0100 Subject: [PATCH 7/9] Fix mutable default list argument --- cosmos/dbt/parser/output.py | 4 +++- cosmos/dbt/runner.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cosmos/dbt/parser/output.py b/cosmos/dbt/parser/output.py index 0882fbe5a3..936f379c9e 100644 --- a/cosmos/dbt/parser/output.py +++ b/cosmos/dbt/parser/output.py @@ -123,7 +123,7 @@ def clean_line(line: str) -> str: details="Use the `cosmos.dbt.runner.extract_message_by_status` instead.", ) # type: ignore[untyped-decorator] def extract_dbt_runner_issues( - result: dbtRunnerResult, status_levels: list[str] = ["warn"] + result: dbtRunnerResult, status_levels: list[str] | None = None ) -> tuple[list[str], list[str]]: # type: ignore[misc] """ Extracts messages from the dbt runner result and returns them as a formatted string. @@ -136,6 +136,8 @@ def extract_dbt_runner_issues( :return: two lists of strings, the first one containing the node names and the second one containing the node result message. """ + status_levels = ["warn"] if status_levels is None else status_levels + node_names = [] node_results = [] diff --git a/cosmos/dbt/runner.py b/cosmos/dbt/runner.py index be4f8f9e55..9d27bca4a5 100644 --- a/cosmos/dbt/runner.py +++ b/cosmos/dbt/runner.py @@ -108,7 +108,7 @@ def run_command( def extract_message_by_status( - result: dbtRunnerResult, status_levels: list[str] = ["warn"] + result: dbtRunnerResult, status_levels: list[str] | None = None ) -> tuple[list[str], list[str]]: """ Extracts messages from the dbt runner result and returns them as a formatted string. @@ -121,6 +121,8 @@ def extract_message_by_status( :return: two lists of strings, the first one containing the node names and the second one containing the node result message. """ + status_levels = ["warn"] if status_levels is None else status_levels + node_names = [] node_results = [] From b296cb807791128af3c3cce0dd735e51c7eccf46 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 7 Mar 2026 18:21:29 +0000 Subject: [PATCH 8/9] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/airflow_async.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cosmos/operators/airflow_async.py b/cosmos/operators/airflow_async.py index 8273aa6327..c49f97b066 100644 --- a/cosmos/operators/airflow_async.py +++ b/cosmos/operators/airflow_async.py @@ -61,9 +61,7 @@ def __init__( non_async_args |= set(inspect.signature(DbtLocalBaseOperator.__init__).parameters.keys()) non_async_args |= set(inspect.signature(AbstractDbtLocalBase.__init__).parameters.keys()) - dbt_kwargs = { - "full_refresh": kwargs.pop("full_refresh", False) - } + dbt_kwargs = {"full_refresh": kwargs.pop("full_refresh", False)} for arg_key, arg_value in kwargs.items(): if arg_key == "task_id": From 4943da0008d73a8475be3fb9b85e47440c730ffa Mon Sep 17 00:00:00 2001 From: dnskr Date: Tue, 10 Mar 2026 00:31:27 +0100 Subject: [PATCH 9/9] Revert "Remove unused transform_host methods" This reverts commit 1687dcfc274c7d842480bb80019fb370c8a21eeb. --- cosmos/profiles/databricks/token.py | 4 ++++ cosmos/profiles/trino/base.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/cosmos/profiles/databricks/token.py b/cosmos/profiles/databricks/token.py index 55229f8769..78d97eb77c 100644 --- a/cosmos/profiles/databricks/token.py +++ b/cosmos/profiles/databricks/token.py @@ -45,3 +45,7 @@ def profile(self) -> dict[str, Any | None]: # token should always get set as env var "token": self.get_env_var_format("token"), } + + def transform_host(self, host: str) -> str: + """Removes the https:// prefix.""" + return host.replace("https://", "") diff --git a/cosmos/profiles/trino/base.py b/cosmos/profiles/trino/base.py index 9ae7ec64b3..5554d340e2 100644 --- a/cosmos/profiles/trino/base.py +++ b/cosmos/profiles/trino/base.py @@ -46,3 +46,7 @@ def mock_profile(self) -> dict[str, Any]: mock_profile = super().mock_profile mock_profile["port"] = 99999 return mock_profile + + def transform_host(self, host: str) -> str: + """Replaces http:// or https:// with nothing.""" + return host.replace("http://", "").replace("https://", "")