Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ def run_command( # noqa: C901
return result

def calculate_openlineage_events_completes(
self, env: dict[str, str | os.PathLike[Any] | bytes], project_dir: Path, dbt_command_line: list[str]
self,
env: dict[str, str | os.PathLike[Any] | bytes],
project_dir: Path,
dbt_command_line: list[str] | None = None,
) -> None:
"""
Use openlineage-integration-common to extract lineage events from the artifacts generated after running the dbt
Expand All @@ -728,14 +731,17 @@ def calculate_openlineage_events_completes(
for key, value in env.items():
os.environ[key] = str(value)

openlineage_processor = DbtLocalArtifactProcessor(
processor_kwargs: dict[str, Any] = dict(
producer=OPENLINEAGE_PRODUCER,
job_namespace=settings.LINEAGE_NAMESPACE,
project_dir=project_dir,
profile_name=self.profile_config.profile_name,
target=self.profile_config.target_name,
dbt_command_line=dbt_command_line,
)
sig = inspect.signature(DbtLocalArtifactProcessor.__init__)
if "dbt_command_line" in sig.parameters and dbt_command_line is not None:
processor_kwargs["dbt_command_line"] = dbt_command_line
openlineage_processor = DbtLocalArtifactProcessor(**processor_kwargs)
# Do not raise exception if a command is unsupported, following the openlineage-dbt processor:
# https://github.com/OpenLineage/OpenLineage/blob/bdcaf828ebc117e0e5ffc5fab44ff8886eb7836b/integration/common/openlineage/common/provider/dbt/processor.py#L141
openlineage_processor.should_raise_on_unsupported_command = False
Expand Down
56 changes: 56 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,62 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo
assert "Unable to parse OpenLineage events" in caplog.text


@patch("cosmos.operators.local.DbtLocalBaseOperator._handle_post_execution")
@patch("cosmos.operators.local.DbtLocalBaseOperator.handle_exception")
@patch("cosmos.operators.local.DbtLocalArtifactProcessor")
@patch("cosmos.operators.local.is_openlineage_common_available", True)
@patch("cosmos.config.ProfileConfig.ensure_profile")
@patch("cosmos.operators.local.DbtLocalBaseOperator.invoke_dbt")
@patch("cosmos.operators.local.DbtLocalBaseOperator._clone_project")
@patch("cosmos.operators.local.tempfile.TemporaryDirectory")
def test_run_command_passes_full_cmd_with_profiles_dir_to_openlineage_processor(
mock_tmp_dir,
mock_clone_project,
mock_invoke_dbt,
mock_ensure_profile,
mock_processor,
mock_handle_exception,
mock_handle_post_execution,
tmp_path,
):
"""Tests that run_command forwards the full dbt CLI (including --profiles-dir) to DbtLocalArtifactProcessor."""
profile_path = tmp_path / "profiles" / "profiles.yml"
mock_tmp_dir.return_value.__enter__.return_value = str(tmp_path)
mock_ensure_profile.return_value.__enter__.return_value = (profile_path, {})
mock_invoke_dbt.return_value = MagicMock()

mock_processor_instance = MagicMock()
mock_processor_instance.parse.return_value = MagicMock(completes=[])
mock_processor.return_value = mock_processor_instance

# Simulate DbtLocalArtifactProcessor.__init__ accepting dbt_command_line
mock_sig = MagicMock()
mock_sig.parameters = {"dbt_command_line": MagicMock()}

operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
emit_datasets=False,
install_deps=False,
)

with patch("cosmos.operators.local.inspect.signature", return_value=mock_sig):
operator.run_command(
cmd=["dbt", "cmd"],
env={},
context={"run_id": "test_run_id", "task_instance": MagicMock()},
)

mock_processor.assert_called_once()
call_kwargs = mock_processor.call_args.kwargs
assert "dbt_command_line" in call_kwargs
assert "--profiles-dir" in call_kwargs["dbt_command_line"]
# Verify the base command is also present (full_cmd = cmd + flags)
assert "dbt" in call_kwargs["dbt_command_line"]
assert "cmd" in call_kwargs["dbt_command_line"]


@pytest.mark.parametrize(
"operator_class,expected_template",
[
Expand Down