Skip to content

Emit asset events in async execution mode#2184

Merged
tatiana merged 25 commits into
mainfrom
issue-2141
Dec 18, 2025
Merged

Emit asset events in async execution mode#2184
tatiana merged 25 commits into
mainfrom
issue-2141

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro commented Dec 5, 2025

closes: #2141

This PR implements asset event emission for BigQuery async execution mode, addressing issue #2141. The changes enable Airflow to track dataset lineage when using async operators by registering output datasets after task execution.

Key changes:

  • Added asset/dataset event registration in execute() and execute_complete() methods
  • Implemented helper methods _register_event() to generate BigQuery asset URIs and register them
Screenshot 2025-12-09 at 2 00 59 PM

@pankajastro pankajastro changed the title Emits asset events in async execution mode Emit asset events in async execution mode Dec 5, 2025
@netlify
Copy link
Copy Markdown

netlify Bot commented Dec 9, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 76a8253
🔍 Latest deploy log https://app.netlify.com/projects/sunny-pastelito-5ecb04/deploys/693926c10eca2100083dbd22

@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 9, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.86%. Comparing base (37e796a) to head (572774b).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2184   +/-   ##
=======================================
  Coverage   97.85%   97.86%           
=======================================
  Files          94       94           
  Lines        6066     6083   +17     
=======================================
+ Hits         5936     5953   +17     
  Misses        130      130           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajastro pankajastro marked this pull request as ready for review December 9, 2025 19:19
Copilot AI review requested due to automatic review settings December 9, 2025 19:19
@pankajastro pankajastro marked this pull request as draft December 9, 2025 19:19
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements asset event emission for BigQuery async execution mode, addressing issue #2141. The changes enable Airflow to track dataset lineage when using async operators by registering output datasets after task execution.

Key changes:

  • Added asset/dataset event registration in execute() and execute_complete() methods
  • Implemented helper methods _get_asset_uri() and _register_event() to generate BigQuery asset URIs and register them
  • Added Airflow 2/3 compatibility for Asset/Dataset imports

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
cosmos/operators/_asynchronous/bigquery.py Adds asset event emission with new _get_asset_uri() and _register_event() methods, including Airflow 2/3 compatible Asset imports
tests/operators/_asynchronous/test_bigquery.py Disables dataset emission in existing tests to prevent failures due to uninitialized gcp_project/dataset attributes
Comments suppressed due to low confidence (4)

cosmos/operators/_asynchronous/bigquery.py:265

  • The register_dataset() call passes an empty list for inlets (input datasets). While this may be intentional because OpenLineage events are not available in async execution mode (when enable_setup_async_task is True), adding a comment would clarify this design decision:
def _register_event(self, context: Context) -> None:
    output = [Asset(uri=self._get_asset_uri())]
    # Inlets (input datasets) are not available in async execution mode
    # because OpenLineage events are not calculated when build_and_run_cmd is not called
    self.register_dataset([], output, context)

cosmos/operators/_asynchronous/bigquery.py:213

  • The _register_event() method is called in execute() when emit_datasets is True, but when settings.enable_setup_async_task is False, the _store_template_fields() method returns early (line 216-218) without setting self.gcp_project and self.dataset. This means _get_asset_uri() will use the empty string values initialized in __init__ (lines 129-130), resulting in invalid asset URIs like bigquery:///unknown_model or bigquery://..unknown_model.

Consider checking if gcp_project and dataset are set before calling _register_event(), or skip asset registration when enable_setup_async_task is False:

if self.emit_datasets and settings.enable_setup_async_task:
    self._register_event(context)
        if not settings.enable_setup_async_task:
            self.log.info("SQL cannot be made available, skipping registration of compiled_sql template field")

cosmos/operators/_asynchronous/bigquery.py:12

  • The logger is imported at the top of the file but is never used. Since logging is being done via self.log (inherited from the base class), consider removing the unused logger import and declaration if it's not needed, or use it for module-level logging if applicable.
from cosmos.operators.base import _sanitize_xcom_key

try:

cosmos/operators/_asynchronous/bigquery.py:265

  • The new helper methods _get_asset_uri() and _register_event() lack docstrings. Adding documentation would improve code maintainability and help other developers understand their purpose and behavior.

Consider adding docstrings:

def _get_asset_uri(self) -> str:
    """
    Generate the BigQuery asset URI for dataset registration.
    
    Returns a URI in the format:
    - Airflow 3: bigquery://{project}/{dataset}/{unique_id}
    - Airflow 2: bigquery://{project}.{dataset}.{unique_id}
    """
    ...

def _register_event(self, context: Context) -> None:
    """
    Register the asset/dataset event for this task execution.
    
    Args:
        context: The Airflow task execution context.
    """
    ...
        if AIRFLOW_VERSION.major >= 3:
            return f"bigquery://{self.gcp_project}/{self.dataset}/{unique_id}"
        else:
            return f"bigquery://{self.gcp_project}.{self.dataset}.{unique_id}"

    def _register_event(self, context: Context) -> None:
        output = [Asset(uri=self._get_asset_uri())]
        self.register_dataset([], output, context)


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_asynchronous/bigquery.py
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_asynchronous/bigquery.py Outdated
Comment thread cosmos/operators/_asynchronous/bigquery.py Outdated
Comment thread cosmos/operators/_asynchronous/bigquery.py Outdated
Comment thread cosmos/operators/_asynchronous/bigquery.py
@pankajastro pankajastro marked this pull request as ready for review December 9, 2025 20:00
@tatiana tatiana merged commit 9ba7395 into main Dec 18, 2025
84 of 85 checks passed
@tatiana tatiana deleted the issue-2141 branch December 18, 2025 11:57
@pankajkoti pankajkoti mentioned this pull request Dec 18, 2025
pankajkoti added a commit that referenced this pull request Dec 18, 2025
Breaking changes

* Introduced in the PR #2080. The following functions are expected to be
used internally only to Cosmos, so we hope these won't impact end-users,
but we are documenting the changes just in case:
- ``generate_task_or_group`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` now expects the ``node_converters`` argument
* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165

Features

* Support applying ``node_converter`` at a task level instead of task
group level by @anyapriya in #1759
* Allow overriding ``DbtProducerWatcherOperator`` parameters via
``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133
* Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by
@pankajastro in #2084
* Support real-time consumer updates when using
``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by
@pankajastro in #2152
* Update telemetry to v3 format with query parameters by @pankajkoti in
#2192
* Add initial set of telemetry task listener metrics for Cosmos
operators by @pankajkoti in #2195

Enhancements

* Unify Airflow version handling into ``constants.py`` by @tatiana in
#2089
* Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in
#2080
* Force watcher producer retries to zero by @pankajkoti in #2114
* Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the
producer fails using Airflow context by @pankajkoti in #2126
* ``ExecutonMode.WATCHER``: fetch producer status asynchronously from
the Airflow runtime so deferrable sensors fail immediately when the
producer task fails by @pankajkoti in #2144
* Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log
parser by @tatiana in #2183
* Replace map_index with is_mapped_task boolean in task telemetry
metrics by @pankajkoti in #2210
* Collect cosmos profile metrics in task telemetry metrics by
@pankajastro in #2198
* Remove unnecessary information from telemetry by @tatiana in #2211

Bug fixes

* Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by
@pankajkoti in #2124
* Remove empty test tasks when all tests are detached by @anyapriya in
#2010
* Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by
@michal-mrazek in #2127
* Add databricks oauth mock profile by @fjmacagno in #2164
* Register listeners in Airflow 3 plugin implementation by @pankajastro
in #2187
* Fix resolution of ``packages-install-path`` when it uses ``env_var``
by @tatiana in #2194
* Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include
``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis
in #2209
* Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro
in #2184
* Remove dag_run_id from telemetry tests by @tatiana in #2213

Docs

* Document dataset-event limitation when using
``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143
* Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana  in #2139
* Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro
in #2115
* Fix reStructuredText formatting by @dnskr in #2132
* Add docs for ``setup_operator_args`` param by @pankajastro in #2136
* Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #2153
* Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti
in #2167
* Update PRIVACY_NOTICE.rst by @tatiana in #2212

Others

* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165
* Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140
* Enable tests for Python 3.13 by @pankajastro in #2154
* Add Python 3.12 to CI integration tests matrix by @pankajastro in
#2168
* Retry flaky Telemetry success test to stabilise CI by @pankajkoti in
#2138
* Drop unused producer state xcom handling in ``ExecutionMode.WATCHER``
by @pankajkoti in #2145
* Remove unused Python3.9 uses from Github action CI by @pankajastro in
#2117
* Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in
#2150
* Refactor: Use shared airflow version constant by @pankajkoti in #2157
* Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in
#2172
* Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170
* Add targeted ``type: ignore`` for untyped decorators to fix ``mypy``
errors by @pankajastro in #2174
* Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by
@pankajastro in #2175
* Refactor to reuse ``load_method_from_module`` from
``_utils/importer.py`` by @pankajastro in #2176
* Remove try except block for cache import and unused python_version
variable by @pankajastro in #2186
* Unpin Airflow to satisfy GitHub Security tab requirements by
@pankajastro in #2171
* Update Python version for ``pyupgrade`` in ``pre-commit`` config by
@pankajastro in #2190
* Add cooldown config in ``dependabot`` config by @pankajastro in #2189
* Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in
#2196
* Remove empty variables emission from telemetry metrics by @pankajkoti
in #2197
* Reformat documented comments for historical URL formats by @pankajkoti
in #2199
* Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot
in #2135
* Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by
@dependabot in #2147
* Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by
@dependabot in #2156
* Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot
in #2155
* Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot
in #2178
* Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by
@dependabot in #2208
* pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173,
#2191, #2202

closes:
astronomer/oss-integrations-private#275
@tatiana tatiana added this to the Cosmos 1.12.0 milestone Dec 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Dataset events not emitted in ExecutionMode.AIRFLOW_ASYNC, preventing downstream DAG triggers

3 participants