From 47f764c4be28ef074288805e4592791c7d030fb7 Mon Sep 17 00:00:00 2001 From: Ichiro Takami Date: Thu, 16 Apr 2026 14:28:03 +0100 Subject: [PATCH 1/2] address feedback --- cosmos/operators/local.py | 12 ++++++-- tests/operators/test_local.py | 55 +++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 26abf77e9f..4fed25075a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -711,7 +711,10 @@ def run_command( # noqa: C901 return result def calculate_openlineage_events_completes( - self, env: dict[str, str | os.PathLike[Any] | bytes], project_dir: Path, dbt_command_line: list[str] + self, + env: dict[str, str | os.PathLike[Any] | bytes], + project_dir: Path, + dbt_command_line: list[str] | None = None, ) -> None: """ Use openlineage-integration-common to extract lineage events from the artifacts generated after running the dbt @@ -728,14 +731,17 @@ def calculate_openlineage_events_completes( for key, value in env.items(): os.environ[key] = str(value) - openlineage_processor = DbtLocalArtifactProcessor( + processor_kwargs: dict[str, Any] = dict( producer=OPENLINEAGE_PRODUCER, job_namespace=settings.LINEAGE_NAMESPACE, project_dir=project_dir, profile_name=self.profile_config.profile_name, target=self.profile_config.target_name, - dbt_command_line=dbt_command_line, ) + sig = inspect.signature(DbtLocalArtifactProcessor.__init__) + if "dbt_command_line" in sig.parameters and dbt_command_line is not None: + processor_kwargs["dbt_command_line"] = dbt_command_line + openlineage_processor = DbtLocalArtifactProcessor(**processor_kwargs) # Do not raise exception if a command is unsupported, following the openlineage-dbt processor: # https://github.com/OpenLineage/OpenLineage/blob/bdcaf828ebc117e0e5ffc5fab44ff8886eb7836b/integration/common/openlineage/common/provider/dbt/processor.py#L141 openlineage_processor.should_raise_on_unsupported_command = False diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 9d48510683..0489157a7b 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1241,6 +1241,61 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo assert instance.parse.called assert "Unable to parse OpenLineage events" in caplog.text +@patch("cosmos.operators.local.DbtLocalBaseOperator._handle_post_execution") +@patch("cosmos.operators.local.DbtLocalBaseOperator.handle_exception") +@patch("cosmos.operators.local.DbtLocalArtifactProcessor") +@patch("cosmos.operators.local.is_openlineage_common_available", True) +@patch("cosmos.config.ProfileConfig.ensure_profile") +@patch("cosmos.operators.local.DbtLocalBaseOperator.invoke_dbt") +@patch("cosmos.operators.local.DbtLocalBaseOperator._clone_project") +@patch("cosmos.operators.local.tempfile.TemporaryDirectory") +def test_run_command_passes_full_cmd_with_profiles_dir_to_openlineage_processor( + mock_tmp_dir, + mock_clone_project, + mock_invoke_dbt, + mock_ensure_profile, + mock_processor, + mock_handle_exception, + mock_handle_post_execution, + tmp_path, +): + """Tests that run_command forwards the full dbt CLI (including --profiles-dir) to DbtLocalArtifactProcessor.""" + profile_path = tmp_path / "profiles" / "profiles.yml" + mock_tmp_dir.return_value.__enter__.return_value = str(tmp_path) + mock_ensure_profile.return_value.__enter__.return_value = (profile_path, {}) + mock_invoke_dbt.return_value = MagicMock() + + mock_processor_instance = MagicMock() + mock_processor_instance.parse.return_value = MagicMock(completes=[]) + mock_processor.return_value = mock_processor_instance + + # Simulate DbtLocalArtifactProcessor.__init__ accepting dbt_command_line + mock_sig = MagicMock() + mock_sig.parameters = {"dbt_command_line": MagicMock()} + + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + emit_datasets=False, + install_deps=False, + ) + + with patch("cosmos.operators.local.inspect.signature", return_value=mock_sig): + operator.run_command( + cmd=["dbt", "cmd"], + env={}, + context={"run_id": "test_run_id", "task_instance": MagicMock()}, + ) + + mock_processor.assert_called_once() + call_kwargs = mock_processor.call_args.kwargs + assert "dbt_command_line" in call_kwargs + assert "--profiles-dir" in call_kwargs["dbt_command_line"] + # Verify the base command is also present (full_cmd = cmd + flags) + assert "dbt" in call_kwargs["dbt_command_line"] + assert "cmd" in call_kwargs["dbt_command_line"] + @pytest.mark.parametrize( "operator_class,expected_template", From 0817a2f4d59799eb1adecb182b7ceff879e572ef Mon Sep 17 00:00:00 2001 From: Ichiro Takami Date: Thu, 16 Apr 2026 14:36:18 +0100 Subject: [PATCH 2/2] precommit updates --- tests/operators/test_local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 0489157a7b..459b54ecc0 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1241,6 +1241,7 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo assert instance.parse.called assert "Unable to parse OpenLineage events" in caplog.text + @patch("cosmos.operators.local.DbtLocalBaseOperator._handle_post_execution") @patch("cosmos.operators.local.DbtLocalBaseOperator.handle_exception") @patch("cosmos.operators.local.DbtLocalArtifactProcessor")