Skip to content

Add Airflow 3 compatibility for BiqQuery deferrable support (ExecutionMode.AIRFLOW_ASYNC)#1674

Merged
tatiana merged 4 commits into
mainfrom
fix-async-bigquery-af3
Apr 17, 2025
Merged

Add Airflow 3 compatibility for BiqQuery deferrable support (ExecutionMode.AIRFLOW_ASYNC)#1674
tatiana merged 4 commits into
mainfrom
fix-async-bigquery-af3

Conversation

@pankajkoti
Copy link
Copy Markdown
Contributor

Changes

  • Remove SQLAlchemy session dependency and calls to DB ORM
  • Fix base class inheritance for proper object initialisation wrt Airflow 3
  • Add symlink in the airflow3 dags for the simple_dag_async.py example DAG
  • Set environment variables in airflow3 env to not use setup task for the async DAG until we resolve the Virtualenv execution mode
  • Add dbt-bigquery and Google provider to requirements.txt in Airflow 3 setup required for the example DAG

closes: #1634

@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 16, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 143d902
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/67ffe69617f2a0000893df10

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Apr 16, 2025

Deploying astronomer-cosmos with  Cloudflare Pages  Cloudflare Pages

Latest commit: 27a9681
Status: ✅  Deploy successful!
Preview URL: https://d471e8ff.astronomer-cosmos.pages.dev
Branch Preview URL: https://fix-async-bigquery-af3.astronomer-cosmos.pages.dev

View logs

@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 16, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 27a9681
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/6800de3742c0e400084c497b

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 16, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 97.08%. Comparing base (5ae181e) to head (27a9681).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1674      +/-   ##
==========================================
- Coverage   97.09%   97.08%   -0.01%     
==========================================
  Files          80       80              
  Lines        5022     5015       -7     
==========================================
- Hits         4876     4869       -7     
  Misses        146      146              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajkoti pankajkoti marked this pull request as ready for review April 17, 2025 08:20
Copilot AI review requested due to automatic review settings April 17, 2025 08:20
@dosubot dosubot Bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Apr 17, 2025
@dosubot dosubot Bot added area:dependencies Related to dependencies, like Python packages, library versions, etc area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc labels Apr 17, 2025
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds Airflow 3 compatibility for BigQuery deferrable support by removing the SQLAlchemy session dependency and updating the operator's inheritance, as well as adjusting the example DAG configuration.

  • Removed SQLAlchemy session dependency and related DB ORM calls
  • Updated base class initialization for DbtRunAirflowAsyncBigqueryOperator
  • Added a symlink and updated settings for the Airflow 3 asynchronous DAG example

Reviewed Changes

Copilot reviewed 4 out of 6 changed files in this pull request and generated 1 comment.

File Description
tests/operators/_asynchronous/test_bigquery.py Updated test to patch _override_rtif instead of session-dependent RenderedTaskInstanceFields
scripts/airflow3/dags/simple_dag_async.py Added a symlink to the example DAG in the dev directory
dev/dags/simple_dag_async.py Modified path resolution, dataset reference, and schedule parameter for Airflow 3
cosmos/operators/_asynchronous/bigquery.py Removed session dependency and restructured inheritance for Airflow 3 compatibility
Files not reviewed (2)
  • scripts/airflow3/env.sh: Language not supported
  • scripts/airflow3/requirements.txt: Language not supported
Comments suppressed due to low confidence (1)

cosmos/operators/_asynchronous/bigquery.py:110

  • Reassigning bases here overrides the previous assignment, which may introduce unintended behavior in the operator's inheritance. Consolidate the modifications into a single base class assignment for clarity.
DbtRunAirflowAsyncBigqueryOperator.__bases__ = (BigQueryInsertJobOperator, AbstractDbtLocalBase,)

Comment thread cosmos/operators/_asynchronous/bigquery.py
Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work, @pankajkoti !

Comment thread cosmos/operators/_asynchronous/bigquery.py
Comment thread scripts/airflow3/env.sh
Comment thread cosmos/operators/_asynchronous/bigquery.py
@dosubot dosubot Bot added the lgtm This PR has been approved by a maintainer label Apr 17, 2025
Comment thread dev/dags/simple_dag_async.py Outdated
Comment thread dev/dags/simple_dag_async.py Outdated
@tatiana tatiana merged commit 30b6222 into main Apr 17, 2025
@tatiana tatiana deleted the fix-async-bigquery-af3 branch April 17, 2025 10:58
tatiana pushed a commit that referenced this pull request Apr 17, 2025
…nMode.AIRFLOW_ASYNC) (#1674)

- Remove SQLAlchemy session dependency and calls to DB ORM
- Fix base class inheritance for proper object initialisation wrt
Airflow 3
- Add symlink in the airflow3 dags for the simple_dag_async.py example
DAG
- Set environment variables in airflow3 env to not use setup task for
the async DAG until we resolve the Virtualenv execution mode
- Add dbt-bigquery and Google provider to requirements.txt in Airflow 3
setup required for the example DAG


Closes: #1634
@tatiana tatiana mentioned this pull request Apr 17, 2025
@tatiana tatiana changed the title Add Airflow 3 compatibility for BiqQuery deferrable support (ExecutionMode.AIRFLOW_ASYNC) Add Airflow 3 compatibility for BiqQuery deferrable support (ExecutionMode.AIRFLOW_ASYNC) Apr 17, 2025
@tatiana tatiana added this to the Cosmos 1.10.0 milestone Apr 17, 2025
tatiana pushed a commit that referenced this pull request Apr 17, 2025
PR #1230 introduced the `original_jaffle_shop` project for
`ExecutionMode.AIRFLOW_ASYNC`, likely to support early customisations
for async execution. Since Cosmos 1.9, we’ve shifted to relying on dbt
to generate the SQL, making those customisations unnecessary. As this
project isn’t used in any other example DAG, it now seems redundant.
I’ve updated the test and example DAG to use `jaffle_shop` instead,
which is consistently used across most of our examples.

This is a follow-up PR from the discussion in
#1674 (comment)

related: #1677
tatiana pushed a commit that referenced this pull request Apr 17, 2025
Main is currently failing for AF3 after merging PR #1674, with the
following error:
```
 File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/airflow_async.py", line 8, in <module>
    from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/_asynchronous/__init__.py", line 12, in <module>
    from cosmos.operators.virtualenv import DbtRunVirtualenvOperator
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/operators/virtualenv.py", line 11, in <module>
    from airflow.utils.python_virtualenv import prepare_virtualenv
ModuleNotFoundError: No module named 'airflow.utils.python_virtualenv'
```

(as reported by @tatiana)

It looks like we need to update the import path for the
python_virtualenv module. I had the necessary fix locally but didn’t
commit it, as I didn’t realize it would break after removing the setup
task. However, since it’s a top-level import, it seems we do need to
include the fix after all.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dependencies Related to dependencies, like Python packages, library versions, etc area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc lgtm This PR has been approved by a maintainer size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Validate Cosmos ExecutionMode.AIRFLOW_ASYNC operators with Airflow 3

4 participants