Skip to content

Fix ExecutionMode.AIRFLOW_ASYNC TaskGroup XCom issue#2088

Merged
tatiana merged 4 commits into
mainfrom
fix-async-xcom-taskgroup
Nov 7, 2025
Merged

Fix ExecutionMode.AIRFLOW_ASYNC TaskGroup XCom issue#2088
tatiana merged 4 commits into
mainfrom
fix-async-xcom-taskgroup

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Nov 7, 2025

When attempting to use ExecutionMode.AIRFLOW_ASYNC with TaskGroup in Cosmos 1.11.0, it started failing due to our handling of XCom. The main issue was that we were not setting the relative setup task ID in the consumer tasks, and they were trying to consume the XCom from a task that did not exist.

Example of DAG:

from airflow.models import DAG

try:
    from airflow.providers.standard.operators.empty import EmptyOperator
except ImportError:
    from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=GoogleCloudServiceAccountDictProfileMapping(
        conn_id="gcp_gs_conn", profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"}
    ),
)

with DAG(
    dag_id="simple_dag_async_taskgroup",
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
):
    pre_dbt = EmptyOperator(task_id="pre_dbt")

    first_dbt_task_group = DbtTaskGroup(
        group_id="first_dbt_task_group",
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.AIRFLOW_ASYNC,
            async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"],
        ),
        render_config=RenderConfig(select=["*customers*"], exclude=["path:seeds"]),
        project_config=ProjectConfig(DBT_PROJECT_PATH),
        profile_config=profile_config,
        operator_args={
            "location": "US",
            "install_deps": True,
            "full_refresh": True,
        },
    )

    pre_dbt >> first_dbt_task_group
Image

Error:

Traceback (most recent call last):
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/venv-af31/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920, in run
    result = _execute_task(context=context, ti=ti, log=log)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/venv-af31/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307, in _execute_task
    result = ctx.run(execute, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/venv-af31/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 416, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/cosmos/operators/_asynchronous/bigquery.py", line 187, in execute
    sql_query = self.get_sql_from_xcom(context)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/cosmos/operators/_asynchronous/bigquery.py", line 141, in get_sql_from_xcom
    compressed_b64_sql = base64.b64decode(compressed_b64_sql)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/base64.py", line 83, in b64decode
    s = _bytes_from_decode_data(s)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/base64.py", line 45, in _bytes_from_decode_data
    raise TypeError("argument should be a bytes-like object or ASCII "
TypeError: argument should be a bytes-like object or ASCII string, not 'NoneType'

Example of successful run after applying this PR:
Screenshot 2025-11-07 at 14 17 44

Closes: #2054

@tatiana tatiana added this to the Cosmos 1.11.1 milestone Nov 7, 2025
@tatiana tatiana marked this pull request as ready for review November 7, 2025 14:36
Copilot AI review requested due to automatic review settings November 7, 2025 14:36
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an issue where ExecutionMode.AIRFLOW_ASYNC fails when used with TaskGroup in Cosmos due to incorrect XCom task ID references. The fix ensures consumer tasks correctly reference the setup task for XCom data retrieval by setting the relative producer task ID.

Key changes:

  • Sets producer_task_id attribute on consumer tasks to reference the correct setup task
  • Handles both individual tasks and TaskGroups when establishing XCom dependencies
  • Uses the dynamically set producer_task_id instead of hardcoded task ID for XCom retrieval

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
dev/dags/simple_dag_async.py Adds example DAG demonstrating TaskGroup usage with AIRFLOW_ASYNC mode
cosmos/operators/_asynchronous/bigquery.py Adds producer_task_id attribute and uses it for XCom retrieval
cosmos/airflow/graph.py Sets producer_task_id on consumer tasks and handles TaskGroup children

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_asynchronous/bigquery.py Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Nov 7, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.80%. Comparing base (a95c81b) to head (3bddd23).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2088   +/-   ##
=======================================
  Coverage   97.80%   97.80%           
=======================================
  Files          91       91           
  Lines        5871     5876    +5     
=======================================
+ Hits         5742     5747    +5     
  Misses        129      129           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tatiana tatiana merged commit 3d2c992 into main Nov 7, 2025
81 checks passed
@tatiana tatiana deleted the fix-async-xcom-taskgroup branch November 7, 2025 16:18
tatiana added a commit that referenced this pull request Nov 7, 2025
When attempting to use `ExecutionMode.AIRFLOW_ASYNC` with `TaskGroup` in
Cosmos 1.11.0, it started failing due to our handling of XCom. The main
issue was that we were not setting the relative setup task ID in the
consumer tasks, and they were trying to consume the XCom from a task
that did not exist.

Example of DAG:
```
from airflow.models import DAG

try:
    from airflow.providers.standard.operators.empty import EmptyOperator
except ImportError:
    from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=GoogleCloudServiceAccountDictProfileMapping(
        conn_id="gcp_gs_conn", profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"}
    ),
)

with DAG(
    dag_id="simple_dag_async_taskgroup",
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
):
    pre_dbt = EmptyOperator(task_id="pre_dbt")

    first_dbt_task_group = DbtTaskGroup(
        group_id="first_dbt_task_group",
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.AIRFLOW_ASYNC,
            async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"],
        ),
        render_config=RenderConfig(select=["*customers*"], exclude=["path:seeds"]),
        project_config=ProjectConfig(DBT_PROJECT_PATH),
        profile_config=profile_config,
        operator_args={
            "location": "US",
            "install_deps": True,
            "full_refresh": True,
        },
    )

    pre_dbt >> first_dbt_task_group
```

<img width="1624" height="1056" alt="Image"
src="https://github.com/user-attachments/assets/6e38464f-8fe6-4f6a-8ebc-762e8f3d65f4"
/>

Error:
```
Traceback (most recent call last):
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/venv-af31/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920, in run
    result = _execute_task(context=context, ti=ti, log=log)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/venv-af31/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307, in _execute_task
    result = ctx.run(execute, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/venv-af31/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 416, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/cosmos/operators/_asynchronous/bigquery.py", line 187, in execute
    sql_query = self.get_sql_from_xcom(context)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/cosmos/operators/_asynchronous/bigquery.py", line 141, in get_sql_from_xcom
    compressed_b64_sql = base64.b64decode(compressed_b64_sql)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/base64.py", line 83, in b64decode
    s = _bytes_from_decode_data(s)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/base64.py", line 45, in _bytes_from_decode_data
    raise TypeError("argument should be a bytes-like object or ASCII "
TypeError: argument should be a bytes-like object or ASCII string, not 'NoneType'
```

Example of successful run after applying this PR:
<img width="1624" height="1056" alt="Screenshot 2025-11-07 at 14 17 44"
src="https://github.com/user-attachments/assets/9ae39c4d-0cbb-45c2-a895-4381bbcac815"
/>

Closes: #2054
tatiana added a commit that referenced this pull request Nov 12, 2025
Bug fixes

* Fix ``ExecutionMode.WATCHER`` deadlock in Airflow 3.0 & 3.1 by
@tatiana in #2087
* Fix ``ExecutionMode.AIRFLOW_ASYNC`` ``TaskGroup`` XCom issue by
@tatiana in #2088
* Guard watcher callback exceptions to avoid hanging producer tasks by
@pankajkoti in #2101
* Fix SQL templated field rendering for dynamically mapped tasks in
Airflow 2 by @tatiana in #2119

Enhancements

* Remove usage of contextmanager in plugins for accessing connections in
Airflow >= 3.1.2 by @pankajkoti in #2073

Docs

* Improve ``ExecutionMode.AIRFLOW_ASYNC`` docs by @tatiana in #2103
* Add note about experimenting threads count for the Watcher Execution
mode by @pankajkoti in #2083
* Fix minor documentation formatting issue by @dnskrv in #2098
* Correct example YAML key from ``operator_args`` to ``operator_kwargs``
by @jx2lee in #2091

Others

* Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3
by @pankajkoti in #2076
* pre-commit autoupdate in #2078, #2104

related:
astronomer/oss-integrations-private#272
@tatiana tatiana mentioned this pull request Nov 12, 2025
tatiana added a commit that referenced this pull request Nov 13, 2025
Bug fixes

* Fix ``ExecutionMode.WATCHER`` deadlock in Airflow 3.0 & 3.1 by
@tatiana in #2087
* Fix ``ExecutionMode.AIRFLOW_ASYNC`` ``TaskGroup`` XCom issue by
@tatiana in #2088
* Guard watcher callback exceptions to avoid hanging producer tasks by
@pankajkoti in #2101
* Fix SQL templated field rendering for dynamically mapped tasks in
Airflow 2 by @tatiana in #2119

Enhancements

* Remove usage of contextmanager in plugins for accessing connections in
Airflow >= 3.1.2 by @pankajkoti in #2073

Docs

* Improve ``ExecutionMode.AIRFLOW_ASYNC`` docs by @tatiana in #2103
* Add note about experimenting threads count for the Watcher Execution
mode by @pankajkoti in #2083
* Fix minor documentation formatting issue by @dnskrv in #2098
* Correct example YAML key from ``operator_args`` to ``operator_kwargs``
by @jx2lee in #2091

Others

* Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3
by @pankajkoti in #2076
* pre-commit autoupdate in #2078, #2104

Closes: astronomer/oss-integrations-private#272
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Make ExecutionMode.AIRFLOW_ASYNC work with TaskGroups

3 participants