Skip to content

Fix ExecutionMode.WATCHER deadlock in Airflow 3.0 & 3.1#2087

Merged
tatiana merged 2 commits into
mainfrom
fix-deadlock
Nov 7, 2025
Merged

Fix ExecutionMode.WATCHER deadlock in Airflow 3.0 & 3.1#2087
tatiana merged 2 commits into
mainfrom
fix-deadlock

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Nov 6, 2025

Airflow 3.0 and 3.1 are currently not thread-safe when running xcom_push, which leads to deadlocks.

The problem description and troubleshooting are detailed in ticket #2057.

Unfortunately, since this issue is not deterministic, I was unable to create an automated test to reproduce and prevent regressions. That said, given that the producer node hangs indefinitely until the task times out, and given that we were no longer able to reproduce the issue after the current changes, I firmly believe we should proceed with this change.

Closes: #2057

Co-authored-by: Ash Berlin-Taylor ash@astronomer.io

@tatiana tatiana marked this pull request as ready for review November 6, 2025 13:31
Copilot AI review requested due to automatic review settings November 6, 2025 13:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses a thread-safety issue in Airflow 3.0 and 3.1 where xcom_push operations can cause deadlocks when running in ExecutionMode.WATCHER mode with multi-threaded dbt operations.

Key Changes:

  • Introduces a thread-safe wrapper function safe_xcom_push that uses a lock to serialize XCom operations
  • Replaces all direct xcom_push calls in the watcher operator with the thread-safe wrapper
  • Adds necessary imports for threading and Airflow 3.x task instance types

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/watcher.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Nov 6, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.80%. Comparing base (a95c81b) to head (b8e9de1).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2087   +/-   ##
=======================================
  Coverage   97.80%   97.80%           
=======================================
  Files          91       91           
  Lines        5871     5873    +2     
=======================================
+ Hits         5742     5744    +2     
  Misses        129      129           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread cosmos/operators/watcher.py Outdated
@tatiana tatiana added this to the Cosmos 1.11.1 milestone Nov 6, 2025
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! It might be nice to add a regression test to make sure this issue doesn’t come back in the future.

@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Nov 7, 2025

Looks good! It would be beneficial to add a regression test to ensure this issue doesn’t recur in the future.

I fully agree, @pankajastro. I discussed this with @ashb, but since it would happen only from time to time, and I couldn't reproduce it with Airflow dag.test() or airflow dags test, I couldn't find an easy way to accomplish this. If we test against Astro in the future, we could ensure that we run multiple tests and have a method for avoiding this from regressing.

@tatiana tatiana merged commit 137350a into main Nov 7, 2025
81 checks passed
@tatiana tatiana deleted the fix-deadlock branch November 7, 2025 15:21

def safe_xcom_push(task_instance: TaskInstance, key: str, value: Any) -> None:
"""
Safely set an XCom value in a thread-safe manner in Airflow 3.0 and 3.1.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to mention specific Airflow versions here? Because we are not altering the behavior here based on Airflow versions here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention we're using this because of these versions bug, but it is also harmless to use for others. It simplifies our code-base to not have to handle this differently based on Airflow version. If Airflow fixes this, we can then do the version check

tatiana added a commit that referenced this pull request Nov 7, 2025
Airflow 3.0 and 3.1 are currently not thread-safe when running
`xcom_push`, which leads to deadlocks.

The problem description and troubleshooting are detailed in ticket
#2057.

Unfortunately, since this issue is not deterministic, I was unable to
create an automated test to reproduce and prevent regressions. That
said, given that the producer node hangs indefinitely until the task
times out, and given that we were no longer able to reproduce the issue
after the current changes, I firmly believe we should proceed with this
change.

Closes: #2057

Co-authored-by: Ash Berlin-Taylor <ash@astronomer.io>
@tatiana tatiana mentioned this pull request Nov 7, 2025
tatiana added a commit that referenced this pull request Nov 12, 2025
Bug fixes

* Fix ``ExecutionMode.WATCHER`` deadlock in Airflow 3.0 & 3.1 by
@tatiana in #2087
* Fix ``ExecutionMode.AIRFLOW_ASYNC`` ``TaskGroup`` XCom issue by
@tatiana in #2088
* Guard watcher callback exceptions to avoid hanging producer tasks by
@pankajkoti in #2101
* Fix SQL templated field rendering for dynamically mapped tasks in
Airflow 2 by @tatiana in #2119

Enhancements

* Remove usage of contextmanager in plugins for accessing connections in
Airflow >= 3.1.2 by @pankajkoti in #2073

Docs

* Improve ``ExecutionMode.AIRFLOW_ASYNC`` docs by @tatiana in #2103
* Add note about experimenting threads count for the Watcher Execution
mode by @pankajkoti in #2083
* Fix minor documentation formatting issue by @dnskrv in #2098
* Correct example YAML key from ``operator_args`` to ``operator_kwargs``
by @jx2lee in #2091

Others

* Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3
by @pankajkoti in #2076
* pre-commit autoupdate in #2078, #2104

related:
astronomer/oss-integrations-private#272
@tatiana tatiana mentioned this pull request Nov 12, 2025
tatiana added a commit that referenced this pull request Nov 13, 2025
Bug fixes

* Fix ``ExecutionMode.WATCHER`` deadlock in Airflow 3.0 & 3.1 by
@tatiana in #2087
* Fix ``ExecutionMode.AIRFLOW_ASYNC`` ``TaskGroup`` XCom issue by
@tatiana in #2088
* Guard watcher callback exceptions to avoid hanging producer tasks by
@pankajkoti in #2101
* Fix SQL templated field rendering for dynamically mapped tasks in
Airflow 2 by @tatiana in #2119

Enhancements

* Remove usage of contextmanager in plugins for accessing connections in
Airflow >= 3.1.2 by @pankajkoti in #2073

Docs

* Improve ``ExecutionMode.AIRFLOW_ASYNC`` docs by @tatiana in #2103
* Add note about experimenting threads count for the Watcher Execution
mode by @pankajkoti in #2083
* Fix minor documentation formatting issue by @dnskrv in #2098
* Correct example YAML key from ``operator_args`` to ``operator_kwargs``
by @jx2lee in #2091

Others

* Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3
by @pankajkoti in #2076
* pre-commit autoupdate in #2078, #2104

Closes: astronomer/oss-integrations-private#272
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] dbt Command hanging in the producer task with ExecutionMode.WATCHER

4 participants