feat: Add GCP_GKE and WATCHER_GCP_GKE execution modes#2488
feat: Add GCP_GKE and WATCHER_GCP_GKE execution modes#2488vricciardulli wants to merge 40 commits into
GCP_GKE and WATCHER_GCP_GKE execution modes#2488Conversation
Add two new execution modes enabling dbt execution on GKE clusters: - GCP_GKE: standard mode using GKEStartPodOperator - WATCHER_GCP_GKE: watcher mode with producer/consumer pattern Includes all operators, watcher callbacks, graph.py watcher mappings, and comprehensive unit/integration tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds new execution modes so Cosmos can run dbt tasks on self-managed GKE clusters (e.g., Cloud Composer 3 on custom GKE) by using GKEStartPodOperator and a watcher variant.
Changes:
- Introduce
ExecutionMode.GCP_GKEandExecutionMode.WATCHER_GCP_GKE. - Add GKE-backed dbt operators (
cosmos/operators/gcp_gke.py) and watcher counterparts (cosmos/operators/watcher_gcp_gke.py). - Wire new watcher mode into graph-building logic and add unit/integration tests.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/constants.py |
Adds the two new execution modes. |
cosmos/airflow/graph.py |
Ensures watcher graph building and AFTER_ALL test behavior work with WATCHER_GCP_GKE. |
cosmos/operators/gcp_gke.py |
Implements dbt operators backed by GKEStartPodOperator. |
cosmos/operators/watcher_gcp_gke.py |
Implements watcher producer/consumer behavior for GKE. |
tests/operators/test_gcp_gke.py |
Unit tests for GCP GKE operator command construction and execute path. |
tests/operators/test_watcher_gcp_gke_unit.py |
Unit tests for watcher GCP GKE retry/callback behavior. |
tests/operators/test_watcher_gcp_gke_integration.py |
Integration test for DbtDag with WATCHER_GCP_GKE. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| # Get the logs from the pod | ||
| logs = [] | ||
| for log in task.pod_manager.read_pod_logs(pod, "base"): # type: ignore[attr-defined] |
There was a problem hiding this comment.
Type checker error:
error: "BaseOperator" has no attribute "pod_manager" [attr-defined]
was not getting this error before the refactoring
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .. versionadded:: 1.13.0 | ||
|
|
||
| The ``ExecutionMode.WATCHER_KUBERNETES`` combines the **speed of the** :ref:`watcher-execution-mode` **with the isolation of** :ref:`kubernetes`. | ||
|
|
||
| A GCP GKE variant is also available as ``ExecutionMode.WATCHER_GCP_GKE``, which uses | ||
| ``GKEStartPodOperator`` instead of ``KubernetesPodOperator``. See :ref:`watcher-gcp-gke` below. | ||
|
|
There was a problem hiding this comment.
The page-level .. versionadded:: 1.13.0 now applies to the newly documented ExecutionMode.WATCHER_GCP_GKE as well, but this mode is introduced in this PR (later than 1.13.0). Consider adding a separate .. versionadded:: <new version> note for WATCHER_GCP_GKE (or rewording the existing directive) to avoid implying it has been available since 1.13.0.
|
Thank you so much for these improvements, @vricciardulli ! They look really valuable and will definitely enhance the project. We’ve had a heavier workload than expected over the past few weeks, so we haven’t been able to review everything immediately. We’ll be reviewing this PR thoroughly in the upcoming month and are considering including these changes in the upcoming 1.15.0 release. Really appreciate your patience and contribution! In the meantime, please, could you address Copilot's feedback and also rebase with the latest changes on main? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2488 +/- ##
==========================================
- Coverage 97.92% 97.89% -0.03%
==========================================
Files 103 106 +3
Lines 7312 7460 +148
==========================================
+ Hits 7160 7303 +143
- Misses 152 157 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@tatiana I'll rebase and clean up once #2543 is merged to main |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
cosmos/operators/kubernetes.py:253
- DbtDocsCloudKubernetesOperator.build_and_run_cmd builds the shell command from
self.argumentsonly. After the refactor that splits the executable intoself.cmds+self.arguments, this drops thedbtexecutable entirely (e.g., producesdocs generate ...), which will fail at runtime. Build the command string fromself.cmds + self.arguments(or otherwise ensuredbtis included) before overridingself.cmds/self.argumentsfor the bash wrapper.
| from cosmos.log import get_logger | ||
| from cosmos.operators._watcher.xcom import ( | ||
| _backup_xcom_to_variable, | ||
| _delete_xcom_backup_variable, | ||
| _init_xcom_backup, | ||
| _restore_xcom_from_variable, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: # pragma: no cover | ||
| from pendulum import DateTime | ||
|
|
||
| if TYPE_CHECKING: # pragma: no cover | ||
| try: | ||
| from airflow.sdk.definitions.context import Context | ||
| except ImportError: | ||
| from airflow.utils.context import Context # type: ignore[attr-defined] | ||
| try: | ||
| # apache-airflow-providers-cncf-kubernetes >= 7.14.0 | ||
| from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback | ||
| except ImportError: | ||
|
|
||
| class KubernetesPodOperatorCallback: # type: ignore[no-redef] | ||
| """Mock fallback for older versions. Should not be used in practice.""" | ||
|
|
||
| pass | ||
|
|
||
|
|
||
| from cosmos.dbt.parser.output import extract_log_issues | ||
| from cosmos.operators._watcher.base import store_dbt_resource_status_from_log | ||
| from cosmos.operators.base import AbstractDbtBase | ||
|
|
||
| try: | ||
| from airflow.sdk.bases.operator import BaseOperator # Airflow 3 | ||
| except ImportError: | ||
| from airflow.models import BaseOperator # Airflow 2 | ||
|
|
||
| logger = get_logger(__name__) |
There was a problem hiding this comment.
Kept because it was here also before the refactoring
Description
Added
GCP_GKEandWATCHER_GCP_GKEexecution modes.It's the same as
KUBERNETESandWATCHER_KUBERNETESmodes, but uses GKE operators (i.e.GKEStartPodOperator).This enables a user to use cosmos in Cloud Composer 3 with a self-managed custom GKE cluster.
Tests:
Implementation details:
cosmos/operators/_k8s_common.py.Docs:
Co-Authored-By: Claude Opus 4.6 (1M context) noreply@anthropic.com
Related Issue(s)
closes #2487
closes #2379
Breaking Change?
There should not be a breaking change.
Checklist