Enable inlets and outlets using DBT Fusion on Airflow 3#2561
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes missing Airflow 3 inlets/outlets when running dbt-fusion via DbtRunOperationLocalOperator by ensuring OpenLineage’s DbtLocalArtifactProcessor can locate --profiles-dir (since dbt-fusion doesn’t write profiles_dir into run_results.json).
Changes:
- Pass the full dbt CLI command (
full_cmd) intocalculate_openlineage_events_completesand forward it asdbt_command_linetoDbtLocalArtifactProcessor. - Update the unit test that calls
calculate_openlineage_events_completesto match the new method signature.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
cosmos/operators/local.py |
Threads the dbt command line into the OpenLineage artifact processor so profiles discovery works with dbt-fusion on Airflow 3. |
tests/operators/test_local.py |
Updates a unit test invocation to include the new dbt_command_line argument. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2561 +/- ##
=======================================
Coverage 98.04% 98.05%
=======================================
Files 103 103
Lines 7589 7593 +4
=======================================
+ Hits 7441 7445 +4
Misses 148 148 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
tatiana
left a comment
There was a problem hiding this comment.
HI @ichirotakami , thank you very much for fixing this issue!
Please, could you address The feedback in the comments:
#2561 (comment)
#2561 (comment)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Hi @tatiana, thank you for your response. I have attempted to address the feedback with a new commit. Please let me know if you think I need to change something. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
cosmos/operators/local.py:727
- The
calculate_openlineage_events_completesdocstring is now missing any mention of the new optionaldbt_command_lineparameter, and it still states it "Return[s] a list of RunEvents" even though the method returnsNoneand mutatesself.openlineage_events_completes. Please update the docstring to reflect the current behavior and document howdbt_command_lineis used (e.g., to help OpenLineage locate--profiles-dirfor dbt-fusion runs).
def calculate_openlineage_events_completes(
self,
env: dict[str, str | os.PathLike[Any] | bytes],
project_dir: Path,
dbt_command_line: list[str] | None = None,
) -> None:
"""
Use openlineage-integration-common to extract lineage events from the artifacts generated after running the dbt
command. Relies on the following files:
* profiles
* {project_dir}/target/manifest.json
* {project_dir}/target/run_results.json
Return a list of RunEvents
"""
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tatiana
left a comment
There was a problem hiding this comment.
Thank you very much, @ichirotakami , it looks very good - and the context you gave in the issue was also amazing, it really helped understanding the underlining issue.
Bug Fixes * Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in #2559 * Prevent watcher producer skip propagating to downstream tasks via gateway task by @johnhoran and @tatiana in #2597 * Keep watcher sensor polling when producer is still running by @pankajkoti in #2592 * Fix circular import error in Cosmos plugin discovery under Astro Runtime by @tatiana in #2538 * Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in #2540 * Enable inlets and outlets using dbt Fusion on Airflow 3 by @ichirotakami in #2561 * Fix incorrectly skipped source downstream tasks in ``ExecutionMode.WATCHER`` by @pankajastro in #2563 * Fix duplicate logs in ``dbt build`` when source freshness is enabled by @pankajastro in #2564 * Warn and normalize when ``source_rendering_behavior=None`` is passed by @pankajastro in #2570 * Gracefully handle ``Variable.set()`` failures on Astro Remote Execution by @hkc-8010 in #2573 * Skip malformed YAML selectors instead of failing entirely by @YourRoyalLinus in #2577 Docs * Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in #2549 * Add redirect for moved partial-parsing docs page by @tatiana in #2550 * Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation by @tatiana in #2602 * Restore memory-optimised imports docs for Cosmos < 1.14.0 by @pankajkoti in #2604 Others * Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI by @pankajkoti in #2547 * Improve stability of cache hash unit tests by @tatiana in #2539 * Fix mypy 1.20.0 type check failures by @pankajkoti in #2546 * Fix CI failures caused by docs build memory exhaustion by @pankajkoti in #2580 * Fix dbt Fusion broken integration tests by @tatiana in #2581 * Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by @pankajkoti in #2593 * Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544 * Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by @dependabot in #2542 * Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607 closes: astronomer/oss-integrations-private#381
Description
This PR enables inlets and outlets on Airflow 3 using DBT Fusion when using
DbtRunOperationLocalOperator, by lettingDbtLocalArtifactProcessor()find--profiles-dirby passingdbt_command_linetocalculate_openlineage_events_completesas dbt-fusion does not writeprofiles_dirinto run_results.json.Please see #2560 for more details about the issue.
Related Issue(s)
closes: #2560
Breaking Change?
Checklist