From e721abc97167104708d3b1e8000c9838bffc7891 Mon Sep 17 00:00:00 2001 From: Ichiro Takami Date: Wed, 15 Apr 2026 10:58:17 +0100 Subject: [PATCH 1/4] add dbt_command_line argument in calculate_openlineage_events_completes --- cosmos/operators/local.py | 5 +++-- tests/operators/test_local.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b77c112099..26abf77e9f 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -690,7 +690,7 @@ def run_command( # noqa: C901 self.handle_exception(result) return result if is_openlineage_common_available: - self.calculate_openlineage_events_completes(env, tmp_dir_path) + self.calculate_openlineage_events_completes(env, tmp_dir_path, full_cmd) if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: # Airflow 3 does not support associating 'openlineage_events_completes' with task_instance, # in that case we're storing as self.openlineage_events_completes @@ -711,7 +711,7 @@ def run_command( # noqa: C901 return result def calculate_openlineage_events_completes( - self, env: dict[str, str | os.PathLike[Any] | bytes], project_dir: Path + self, env: dict[str, str | os.PathLike[Any] | bytes], project_dir: Path, dbt_command_line: list[str] ) -> None: """ Use openlineage-integration-common to extract lineage events from the artifacts generated after running the dbt @@ -734,6 +734,7 @@ def calculate_openlineage_events_completes( project_dir=project_dir, profile_name=self.profile_config.profile_name, target=self.profile_config.target_name, + dbt_command_line=dbt_command_line, ) # Do not raise exception if a command is unsupported, following the openlineage-dbt processor: # https://github.com/OpenLineage/OpenLineage/blob/bdcaf828ebc117e0e5ffc5fab44ff8886eb7836b/integration/common/openlineage/common/provider/dbt/processor.py#L141 diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 0fa59adee4..485d89ef1e 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1234,7 +1234,7 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo should_store_compiled_sql=False, ) - dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR) + dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR, dbt_command_line=dbt_base_operator.base_cmd + dbt_base_operator.dbt_cmd_flags) assert instance.parse.called assert "Unable to parse OpenLineage events" in caplog.text From 7a658c2e82622e864e9b33535fed092bff564a09 Mon Sep 17 00:00:00 2001 From: Ichiro Takami Date: Wed, 15 Apr 2026 11:28:12 +0100 Subject: [PATCH 2/4] run precommit --- tests/operators/test_local.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 485d89ef1e..9d48510683 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1234,7 +1234,9 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo should_store_compiled_sql=False, ) - dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR, dbt_command_line=dbt_base_operator.base_cmd + dbt_base_operator.dbt_cmd_flags) + dbt_base_operator.calculate_openlineage_events_completes( + env={}, project_dir=DBT_PROJ_DIR, dbt_command_line=dbt_base_operator.base_cmd + dbt_base_operator.dbt_cmd_flags + ) assert instance.parse.called assert "Unable to parse OpenLineage events" in caplog.text From 6fec990ac6be5b93bfb251e453c6d732de9f55fb Mon Sep 17 00:00:00 2001 From: ichirotakami <138510975+ichirotakami@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:42:16 +0100 Subject: [PATCH 3/4] address feedback (#1) --- cosmos/operators/local.py | 12 ++++++-- tests/operators/test_local.py | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 26abf77e9f..4fed25075a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -711,7 +711,10 @@ def run_command( # noqa: C901 return result def calculate_openlineage_events_completes( - self, env: dict[str, str | os.PathLike[Any] | bytes], project_dir: Path, dbt_command_line: list[str] + self, + env: dict[str, str | os.PathLike[Any] | bytes], + project_dir: Path, + dbt_command_line: list[str] | None = None, ) -> None: """ Use openlineage-integration-common to extract lineage events from the artifacts generated after running the dbt @@ -728,14 +731,17 @@ def calculate_openlineage_events_completes( for key, value in env.items(): os.environ[key] = str(value) - openlineage_processor = DbtLocalArtifactProcessor( + processor_kwargs: dict[str, Any] = dict( producer=OPENLINEAGE_PRODUCER, job_namespace=settings.LINEAGE_NAMESPACE, project_dir=project_dir, profile_name=self.profile_config.profile_name, target=self.profile_config.target_name, - dbt_command_line=dbt_command_line, ) + sig = inspect.signature(DbtLocalArtifactProcessor.__init__) + if "dbt_command_line" in sig.parameters and dbt_command_line is not None: + processor_kwargs["dbt_command_line"] = dbt_command_line + openlineage_processor = DbtLocalArtifactProcessor(**processor_kwargs) # Do not raise exception if a command is unsupported, following the openlineage-dbt processor: # https://github.com/OpenLineage/OpenLineage/blob/bdcaf828ebc117e0e5ffc5fab44ff8886eb7836b/integration/common/openlineage/common/provider/dbt/processor.py#L141 openlineage_processor.should_raise_on_unsupported_command = False diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 9d48510683..459b54ecc0 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1242,6 +1242,62 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo assert "Unable to parse OpenLineage events" in caplog.text +@patch("cosmos.operators.local.DbtLocalBaseOperator._handle_post_execution") +@patch("cosmos.operators.local.DbtLocalBaseOperator.handle_exception") +@patch("cosmos.operators.local.DbtLocalArtifactProcessor") +@patch("cosmos.operators.local.is_openlineage_common_available", True) +@patch("cosmos.config.ProfileConfig.ensure_profile") +@patch("cosmos.operators.local.DbtLocalBaseOperator.invoke_dbt") +@patch("cosmos.operators.local.DbtLocalBaseOperator._clone_project") +@patch("cosmos.operators.local.tempfile.TemporaryDirectory") +def test_run_command_passes_full_cmd_with_profiles_dir_to_openlineage_processor( + mock_tmp_dir, + mock_clone_project, + mock_invoke_dbt, + mock_ensure_profile, + mock_processor, + mock_handle_exception, + mock_handle_post_execution, + tmp_path, +): + """Tests that run_command forwards the full dbt CLI (including --profiles-dir) to DbtLocalArtifactProcessor.""" + profile_path = tmp_path / "profiles" / "profiles.yml" + mock_tmp_dir.return_value.__enter__.return_value = str(tmp_path) + mock_ensure_profile.return_value.__enter__.return_value = (profile_path, {}) + mock_invoke_dbt.return_value = MagicMock() + + mock_processor_instance = MagicMock() + mock_processor_instance.parse.return_value = MagicMock(completes=[]) + mock_processor.return_value = mock_processor_instance + + # Simulate DbtLocalArtifactProcessor.__init__ accepting dbt_command_line + mock_sig = MagicMock() + mock_sig.parameters = {"dbt_command_line": MagicMock()} + + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + emit_datasets=False, + install_deps=False, + ) + + with patch("cosmos.operators.local.inspect.signature", return_value=mock_sig): + operator.run_command( + cmd=["dbt", "cmd"], + env={}, + context={"run_id": "test_run_id", "task_instance": MagicMock()}, + ) + + mock_processor.assert_called_once() + call_kwargs = mock_processor.call_args.kwargs + assert "dbt_command_line" in call_kwargs + assert "--profiles-dir" in call_kwargs["dbt_command_line"] + # Verify the base command is also present (full_cmd = cmd + flags) + assert "dbt" in call_kwargs["dbt_command_line"] + assert "cmd" in call_kwargs["dbt_command_line"] + + @pytest.mark.parametrize( "operator_class,expected_template", [ From 476ffe85ee9a0ee4a8c88d0b39ac97e582920406 Mon Sep 17 00:00:00 2001 From: Ichiro Takami Date: Fri, 17 Apr 2026 09:57:56 +0100 Subject: [PATCH 4/4] add invocation_mode --- tests/operators/test_local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 459b54ecc0..23647f8451 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1280,6 +1280,7 @@ def test_run_command_passes_full_cmd_with_profiles_dir_to_openlineage_processor( project_dir="my/dir", emit_datasets=False, install_deps=False, + invocation_mode=InvocationMode.SUBPROCESS, ) with patch("cosmos.operators.local.inspect.signature", return_value=mock_sig):