Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ def run_command( # noqa: C901
self.handle_exception(result)
return result
if is_openlineage_common_available:
self.calculate_openlineage_events_completes(env, tmp_dir_path)
self.calculate_openlineage_events_completes(env, tmp_dir_path, full_cmd)
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
# Airflow 3 does not support associating 'openlineage_events_completes' with task_instance,
# in that case we're storing as self.openlineage_events_completes
Expand All @@ -711,7 +711,10 @@ def run_command( # noqa: C901
return result

def calculate_openlineage_events_completes(
self, env: dict[str, str | os.PathLike[Any] | bytes], project_dir: Path
self,
env: dict[str, str | os.PathLike[Any] | bytes],
project_dir: Path,
dbt_command_line: list[str] | None = None,
) -> None:
Comment thread
ichirotakami marked this conversation as resolved.
"""
Use openlineage-integration-common to extract lineage events from the artifacts generated after running the dbt
Expand All @@ -728,13 +731,17 @@ def calculate_openlineage_events_completes(
for key, value in env.items():
os.environ[key] = str(value)

openlineage_processor = DbtLocalArtifactProcessor(
processor_kwargs: dict[str, Any] = dict(
producer=OPENLINEAGE_PRODUCER,
job_namespace=settings.LINEAGE_NAMESPACE,
project_dir=project_dir,
profile_name=self.profile_config.profile_name,
target=self.profile_config.target_name,
)
sig = inspect.signature(DbtLocalArtifactProcessor.__init__)
if "dbt_command_line" in sig.parameters and dbt_command_line is not None:
processor_kwargs["dbt_command_line"] = dbt_command_line
openlineage_processor = DbtLocalArtifactProcessor(**processor_kwargs)
# Do not raise exception if a command is unsupported, following the openlineage-dbt processor:
# https://github.com/OpenLineage/OpenLineage/blob/bdcaf828ebc117e0e5ffc5fab44ff8886eb7836b/integration/common/openlineage/common/provider/dbt/processor.py#L141
openlineage_processor.should_raise_on_unsupported_command = False
Expand Down
61 changes: 60 additions & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,12 +1234,71 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo
should_store_compiled_sql=False,
)

dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR)
dbt_base_operator.calculate_openlineage_events_completes(
env={}, project_dir=DBT_PROJ_DIR, dbt_command_line=dbt_base_operator.base_cmd + dbt_base_operator.dbt_cmd_flags
)
Comment thread
ichirotakami marked this conversation as resolved.

assert instance.parse.called
assert "Unable to parse OpenLineage events" in caplog.text


@patch("cosmos.operators.local.DbtLocalBaseOperator._handle_post_execution")
@patch("cosmos.operators.local.DbtLocalBaseOperator.handle_exception")
@patch("cosmos.operators.local.DbtLocalArtifactProcessor")
@patch("cosmos.operators.local.is_openlineage_common_available", True)
@patch("cosmos.config.ProfileConfig.ensure_profile")
@patch("cosmos.operators.local.DbtLocalBaseOperator.invoke_dbt")
@patch("cosmos.operators.local.DbtLocalBaseOperator._clone_project")
@patch("cosmos.operators.local.tempfile.TemporaryDirectory")
def test_run_command_passes_full_cmd_with_profiles_dir_to_openlineage_processor(
mock_tmp_dir,
mock_clone_project,
mock_invoke_dbt,
mock_ensure_profile,
mock_processor,
mock_handle_exception,
mock_handle_post_execution,
tmp_path,
):
"""Tests that run_command forwards the full dbt CLI (including --profiles-dir) to DbtLocalArtifactProcessor."""
profile_path = tmp_path / "profiles" / "profiles.yml"
mock_tmp_dir.return_value.__enter__.return_value = str(tmp_path)
mock_ensure_profile.return_value.__enter__.return_value = (profile_path, {})
mock_invoke_dbt.return_value = MagicMock()

mock_processor_instance = MagicMock()
mock_processor_instance.parse.return_value = MagicMock(completes=[])
mock_processor.return_value = mock_processor_instance

# Simulate DbtLocalArtifactProcessor.__init__ accepting dbt_command_line
mock_sig = MagicMock()
mock_sig.parameters = {"dbt_command_line": MagicMock()}

operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
emit_datasets=False,
install_deps=False,
invocation_mode=InvocationMode.SUBPROCESS,
)

with patch("cosmos.operators.local.inspect.signature", return_value=mock_sig):
operator.run_command(
cmd=["dbt", "cmd"],
env={},
context={"run_id": "test_run_id", "task_instance": MagicMock()},
)

mock_processor.assert_called_once()
call_kwargs = mock_processor.call_args.kwargs
assert "dbt_command_line" in call_kwargs
assert "--profiles-dir" in call_kwargs["dbt_command_line"]
# Verify the base command is also present (full_cmd = cmd + flags)
assert "dbt" in call_kwargs["dbt_command_line"]
assert "cmd" in call_kwargs["dbt_command_line"]


@pytest.mark.parametrize(
"operator_class,expected_template",
[
Expand Down
Loading