Skip to content

Reduce XCom read/write for tracking node state and errors in ConsumerWatcher task#2471

Merged
tatiana merged 8 commits into
mainfrom
only_track_relevant_event
Mar 19, 2026
Merged

Reduce XCom read/write for tracking node state and errors in ConsumerWatcher task#2471
tatiana merged 8 commits into
mainfrom
only_track_relevant_event

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro commented Mar 17, 2026

This PR aims to reduce XCom load in the watcher flow by limiting which dbt log/events are persisted for consumer-side logging and by removing the per-event XCom read used for deduplication.

Changes:

  • Introduces an allowlist of dbt event names and only pushes per-node dbt events to XCom when the event is allowlisted.
  • Removes the “skip duplicate event” XCom read-before-write behavior.

@pankajastro pankajastro changed the title Fix: Reduce the xcom read-write for track node state and error in cos… Reduce XCom read/write for tracking node state and errors in ConsumerWatcher task Mar 17, 2026
@pankajastro pankajastro marked this pull request as ready for review March 17, 2026 16:04
Copilot AI review requested due to automatic review settings March 17, 2026 16:04
@pankajastro pankajastro requested review from a team and corsettigyg as code owners March 17, 2026 16:04
@pankajastro pankajastro marked this pull request as draft March 17, 2026 16:04
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to reduce XCom load in the watcher flow by limiting which dbt log/events are persisted for consumer-side logging and by removing the per-event XCom read used for deduplication.

Changes:

  • Introduces an allowlist of dbt event names and only pushes per-node dbt events to XCom when the event is allowlisted.
  • Removes the “skip duplicate event” XCom read-before-write behavior.
  • Updates unit tests for _process_dbt_log_event to align with allowlist-based behavior and to skip pushes when unique_id is missing.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
cosmos/operators/_watcher/base.py Adds _DBT_EVENTS_TYPE allowlist filtering and removes dedup XCom reads in _process_dbt_log_event.
tests/operators/_watcher/test_watcher_base.py Updates tests to validate allowlist-based pushing and skipping when unique_id is absent.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread tests/operators/_watcher/test_watcher_base.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 17, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.04%. Comparing base (c137be7) to head (3b7a9f1).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2471   +/-   ##
=======================================
  Coverage   98.04%   98.04%           
=======================================
  Files         103      103           
  Lines        7223     7223           
=======================================
  Hits         7082     7082           
  Misses        141      141           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajastro pankajastro added this to the Cosmos 1.14.0 milestone Mar 17, 2026
@pankajastro pankajastro self-assigned this Mar 17, 2026
@pankajastro pankajastro marked this pull request as ready for review March 17, 2026 17:53
Copilot AI review requested due to automatic review settings March 17, 2026 17:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates watcher log processing to only emit per-node dbt events for a fixed allowlist of dbt event types, and aligns unit tests with the new filtering behavior.

Changes:

  • Add _DBT_EVENTS_TYPE allowlist and gate _process_dbt_log_event pushes on info.name.
  • Remove prior “sensitive words” logic and the duplicate-event XCom suppression.
  • Update watcher and subprocess tests to validate allowlist behavior and revised XCom push counts.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
cosmos/operators/_watcher/base.py Adds dbt event allowlist filtering for XCom-pushed log events; removes prior duplicate-event suppression.
tests/operators/_watcher/test_watcher_base.py Updates unit tests to assert allowlist-based XCom pushing and no-push when unique_id is missing.
tests/hooks/test_subprocess.py Adjusts subprocess log parsing tests to expect only the status XCom push in terminal states.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/base.py
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is looking good to me, but I have some questions in line for my understanding and curiosity. Are we confident that with a higher number of threads, it won't cause an issue?

I am happy to get this in once the above question and in-line questions are answered, and if we get another pair of eyes for review here.

Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/base.py Outdated
Copilot AI review requested due to automatic review settings March 18, 2026 16:58
@pankajastro pankajastro force-pushed the only_track_relevant_event branch from 21c6769 to 947b048 Compare March 18, 2026 16:58
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR tightens dbt watcher log handling by introducing an explicit allowlist of dbt event types to decide which dbt log events are pushed into XCom, and updates unit tests to match the new behavior.

Changes:

  • Add allowlisted dbt event type sets (error/failure + node status) and filter _process_dbt_log_event pushes accordingly.
  • Update watcher-base tests to validate allowlist behavior and skip pushing when unique_id is missing.
  • Update subprocess watcher tests to reflect the reduced number of XCom pushes per processed log line.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
cosmos/operators/_watcher/base.py Introduces dbt event allowlists and filters which log events get pushed to XCom.
tests/operators/_watcher/test_watcher_base.py Updates tests to validate allowlist behavior and unique_id guard.
tests/hooks/test_subprocess.py Adjusts expectations for XCom push count based on updated event filtering.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/operators/_watcher/test_watcher_base.py Outdated
Comment thread cosmos/operators/_watcher/base.py
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings March 18, 2026 17:12
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request tightens dbt watcher log-event handling by introducing an explicit allowlist of dbt event types to persist to XCom, and updates unit tests to reflect the new filtering behavior.

Changes:

  • Add dbt event-type allowlist (error/failure events + node lifecycle events) and filter _process_dbt_log_event() pushes accordingly.
  • Remove XCom deduplication logic for dbt events and adjust watcher-base tests.
  • Update subprocess hook tests to align with the revised XCom push behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
cosmos/operators/_watcher/base.py Introduces _DBT_EVENT_ALLOWLIST and filters dbt events before pushing to XCom.
tests/operators/_watcher/test_watcher_base.py Updates tests to validate allowlist-based event pushing and new skip conditions.
tests/hooks/test_subprocess.py Adjusts expectations around safe_xcom_push call counts/behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/operators/_watcher/test_watcher_base.py Outdated
Comment thread cosmos/operators/_watcher/base.py Outdated
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings March 18, 2026 17:19
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refines WATCHER producer-side dbt log handling by switching from message substring matching to an explicit dbt event-name allowlist, and updates tests to reflect the new behavior.

Changes:

  • Introduces _DBT_EVENT_ALLOWLIST and filters _process_dbt_log_event() to only push selected dbt events to XCom.
  • Removes the prior “sensitive words” logic and the XCom de-duplication check in _process_dbt_log_event().
  • Updates watcher/subprocess tests to validate allowlisted event pushing and revised XCom push expectations.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
cosmos/operators/_watcher/base.py Adds allowlist-based filtering for dbt events before pushing _dbt_event XCom payloads.
tests/operators/_watcher/test_watcher_base.py Updates unit tests to assert allowlist-driven _dbt_event pushing and skipping when unique_id is missing.
tests/hooks/test_subprocess.py Updates subprocess log parsing test expectations for XCom pushes.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/_watcher/base.py
Comment thread tests/hooks/test_subprocess.py
@tatiana tatiana merged commit c13cc16 into main Mar 19, 2026
75 checks passed
@tatiana tatiana deleted the only_track_relevant_event branch March 19, 2026 11:05
@pankajastro pankajastro mentioned this pull request Mar 20, 2026
pankajastro added a commit that referenced this pull request Apr 7, 2026
1.14.0 (2026-04-07)
---------------------

Breaking Changes

* Drop support for Airflow versions earlier than **2.9** by
@jedcunningham in #2288
* Fix inclusion of package models and selection/exclusion behavior by
@pankajkoti in #2357
* ``ExecutionMode.WATCHER``: The per-node ``*_status`` XCom value is now
a dict (``{"status": "<status>", "outlet_uris": [...]}``) instead of a
plain string. Any custom code that reads these internal XCom keys
directly will need to be updated by @pankajkoti in #2507

Features

* Add cluster policy support for ``ExecutionMode.WATCHER`` sensor
retries by @astro-anand in #2293
* Add debug mode to track memory utilization by @tatiana in #2327
* Add FQN selection support for ``LoadMode.DBT_MANIFEST`` by
@pankajastro in #2375
* Introduce interceptors for Cosmos tasks by @tatiana in #2419
* Add config to allow disabling dag versioning by @pankajkoti in #2470
* Implement TaskGroups by models folder by @maximilianoarcieri and
@tatiana in #1566, #2469, and #2420
* feat: implement DbtTestWatcherOperator by @michal-mrazek in #2447
* Add source freshness aware execution for ``ExecutionMode.WATCHER`` by
@pankajastro and @tatiana in #2467

* Note: Like ``ExecutionMode.WATCHER``, this feature is experimental and
its interface and implementation can change in the future.
* Add Airflow 3.2 support by @pankajastro and @pankajkoti in #2472

Enhancements

* Add watcher mode support for dbt test node states by @michal-mrazek in
#2318
* Rename watcher-mode sensor retry queue and reuse it for producer tasks
by @pankajastro in #2331
* Fix leaked semaphore warnings in Airflow 3 by resetting dbt adapters
by @pankajkoti in #2335
* Improve dbt Fusion support and related tests by @tatiana in #2356
* Default Snowflake profile mappings to four threads by @tatiana in
#2374
* Attempt to remove Pydantic as a dependency by @tatiana in #2377
* Log dbt-core and adapter versions in watcher consumer tasks by
@pankajastro in #2412
* Log model errors in watcher consumer on dbt node failure by
@pankajastro in #2431
* Reduce XCom read/write for tracking node state and errors in
ConsumerWatcher task by @pankajastro in #2471
* Remove duplicate debug log in watcher subprocess path by @tatiana in
#2494
* Simplify and unify WATCHER implementation regardless of InvocationMode
by @tatiana in #2498
* Switch to lazy imports in cosmos/__init__.py by @pankajkoti in #2531

Bug Fixes

* Handle invalid YAML errors with ``LoadMode.DBT_MANIFEST`` and
``RenderConfig.selector`` by @YourRoyalLinus in #2316
* Populate ``compiled_sql`` for ``InvocationMode.SUBPROCESS`` in
``ExecutionMode.WATCHER`` by @pankajkoti in #2319
* Fix select/exclude type mismatch by @tatiana in #2364
* Set ``emit_datasets=False`` for ``DbtTest*`` operators by @pankajastro
in #2365
* Set correct queue priority for watcher producer tasks by @pankajastro
in #2372
* Preserve ``extra_context`` for watcher consumer task instances by
@pankajkoti in #2381
* Respect ``deferrable=False`` from ``operator_args`` on watcher
consumer sensors by @pankajkoti in #2384
* Fix watcher queue precedence and add documentation by @pankajastro in
#2391
* Do not set ``compiled_sql`` on ``ExecutionMode.WATCHER`` producers by
@pankajkoti in #2440
* Remove const attribute for ``__cosmos_telemetry_metadata__`` dag param
by @pankajkoti in #2466
* Remove timeout override from Cosmos watcher sensors by @tatiana and
@claude in #2478
* Remove forced ``retries=0`` from watcher producer operators by
@tatiana in #2479
* RFC: Add patch for newer versions of amazon provider when running dbt
on EKS by @aoelvp94 in #2481
* Fix ``cosmos_debug_max_memory_mb`` XCom not pushed in Watcher sensor
tasks by @tatiana in #2503
* Fix ``TestBehavior.NONE`` and ``TestBehavior.AFTER_ALL`` exclude
ignored with selectors in ``ExecutionMode.WATCHER`` by @pankajkoti in
#2511
* Move dataset emission for ``ExecutionMode.WATCHER`` from producer to
consumer sensors by @pankajkoti in #2507

Docs

* Document cluster policy configuration for ``ExecutionMode.WATCHER``
sensor tasks by @pankajastro in #2315
* Remove outdated docs for the dbt docs plugin with Airflow 3 by
@pankajastro in #2353
* Make Watcher DBT Execution Queue heading clickable by @pankajastro in
#2354
* Update ``ExecutionMode.WATCHER`` documentation regarding test node
implementation by @jroachgolf84 in #2355
* Fix ``pre_dbt_fusion`` configuration rendering by @pankajastro in
#2369
* Add documentation for including/excluding nodes based on FQN by
@pankajastro in #2371
* Update watcher execution mode documentation by @tatiana in #2380
* Add documentation for ``DbtSeedLocalOperator`` by @jroachgolf84 in
#2383
* Fix miscellaneous Sphinx warnings by @pankajastro in #2395
* Improve contributing documentation by @lzdanski in #2397
* Add **Get Started in 5 Minutes** guide by @lzdanski in #2398
* Add Sphinx redirects package for documentation redirects by @lzdanski
in #2407
* Restructure **Getting Started** and **Guides** sections by @lzdanski
in #2418
* Add open-source quickstart by @lzdanski in #2439
* Fix documentation redirects by @lzdanski in #2442
* Restructure and refactor reference documentation by @lzdanski in #2443
* Add execution modes decision documentation by @lzdanski in #2444
* Add **Core Concepts** page to Getting Started by @lzdanski in #2448
* Add guide: *How Cosmos Works* by @lzdanski in #2449
* Update **Getting Started** overview and index pages by @lzdanski in
#2452
* Add guide: *How Cosmos Runs dbt* by @lzdanski in #2453
* Fix miscellaneous documentation links by @lzdanski in #2454
* Add Mermaid diagrams and execution mode diagrams by @lzdanski and
@tatiana in #2459
* Add documentation for memory optimization options by @pankajastro in
#2340
* Fix typo in watcher execution mode docs by @evanvolgas in #2485
* Fix minor documentation issues by @evanvolgas in #2489
* Add troubleshooting note for dbt debug logs in ExecutionMode.WATCHER
by @tatiana in #2491
* docs: unify RST header styles across documentation by @jigangz in
#2473
* docs: fix env var for rich logging by @vricciardulli in #2514
* docs: update dbt project path example for Airflow 3 Astro
compatibility by @yeoreums in #2512
* Document missing Cosmos Airflow config settings in cosmos-conf.rst by
@tatiana in #2515
* Split security-privacy policy doc and add dependency cooldown by
@pankajkoti in #2519
* Add performance optimization and troubleshooting docs by @pankajkoti
in #2521
* Update copyright year to 2026 by @tayloramurphy in #2527
* docs: Updating "Project Policies" to "Policies" in menu bar by
@jroachgolf84 in #2526

Others

* Fix tests after removing support for Airflow versions earlier than 2.9
by @tatiana in #2321
* Enable listener tests for Airflow 3.1 by @pankajastro in #2348
* Accept ``int`` or ``float`` for ``cosmos_debug_max_memory_mb`` in
integration tests by @pankajkoti in #2352
* Update ``CODEOWNERS`` to prioritize ``oss-integrations`` by @tatiana
in #2359
* Fix automatic reviewer assignment in GitHub by @tatiana and @phanikumv
in #2360
* Improve PyPI tagging by @tatiana in #2363
* Add integration tests for dbt Fusion and ``ExecutionMode.WATCHER`` by
@tatiana in #2373
* Fix Zizmor check by @tatiana in #2376
* Remove ``methodtools`` dependency by @tatiana in #2378
* Improve comments on #2389 by @evanvolgas in #2394
* Refactor ``load_from_dbt_manifest`` to reduce code complexity by
@pankajkoti in #2399
* Refactor ``_handle_no_precursors_or_descendants`` to reduce complexity
by @pankajkoti in #2400
* Improve issue templates by @tatiana in #2401
* Avoid running tests when only docs change by @tatiana in #2402
* Add ``no-reload`` target for serving docs locally by @pankajkoti in
#2405
* Fix test hash checks on macOS by @tatiana in #2406
* Attempt deterministic dbt project copy in test fixtures by @pankajkoti
in #2409
* Pin ``virtualenv <21`` due to hatch incompatibility in CI by
@pankajkoti in #2410
* Revert virtualenv pin for hatch installation in CI by @pankajkoti in
#2426
* Add version comments for commit SHA pinned GitHub Actions by
@pankajkoti in #2436
* Fix ``hatch run docs:build`` issues by @tatiana in #2437
* Minor code improvements by @dnskr in #2446
* Pre-commit autoupdate by @pre-commit-ci in #2367, #2396, #2422, #2451,
#2468, #2495, and #2516
* Add file to support Claude understanding the Cosmos repository by
@tatiana in #2458
* Dependency updates by @dependabot in #2368, #2425, #2435, #2465,
#2475, #2504, #2518, and #2528
* Isolate Scarf telemetry integration test into its own CI job by
@pankajkoti and @claude in #2477
* ci: upgrade Airflow version to 3.1 in MyPy type-check job by @yeoreums
in #2506
* Add commit message guidelines to CLAUDE.md by @pankajkoti in #2509
* Extend skipping tests in CI for more non-code file changes by
@pankajkoti in #2510
* Add Dependabot pre-commit support with 7-day cooldown by @pankajkoti
in #2517
* Enforce zero warnings policy for documentation by @dnskr in #2513

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
@tatiana tatiana added the roadmap:P1 BOSS roadmap-committed work (priority P1) label Jun 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

roadmap:P1 BOSS roadmap-committed work (priority P1)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants