Introduce the concept of an interceptor in Cosmos tasks#2419
Conversation
Sometimes, users may need to run commands before the actual execution of the dbt command in Cosmos tasks.
This behaviour is similar to the `callback` feature - but instead of happening after the dbt command execution, it happens before.
This feature is only defined within the `AbstractDbtLocalBase`, so it will work for execution modes that are not containerised and run in the Airflow worker node, such as LOCAL, VIRTUALENV, WATCHER and AIRFLOW_ASYNC.
Other execution modes (Kubernetes, Docker, etc.) do not run interceptors - for now - unless they override `build_and_run_cmd`.
Usage example:
```
def my_interceptor(context, operator):
# Modify vars (e.g. add execution date)
if operator.vars is None:
operator.vars = {}
operator.vars["run_date"] = str(context["data_interval_start"].date())
# Modify env
if operator.env is None:
operator.env = {}
operator.env["MY_VAR"] = "value"
DbtBuildLocalOperator(
task_id="build",
...,
interceptors=[my_interceptor],
)
```
interceptors, similar to callbacks, but run before the commandinterceptor in Cosmos tasks
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2419 +/- ##
=======================================
Coverage 97.95% 97.96%
=======================================
Files 102 102
Lines 7009 7019 +10
=======================================
+ Hits 6866 6876 +10
Misses 143 143 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds a new “interceptor” hook to Cosmos dbt operators so users can run callables before the dbt command is built/executed, enabling runtime injection/modification of vars and environment variables across execution modes.
Changes:
- Add
interceptorsto the base operator API and invoke them prior to building dbt commands. - Wire interceptor invocation into container-based execution modes (Kubernetes, Docker, ECS, ACI, Cloud Run Job) and local execution.
- Add unit tests across execution modes and document the new
operator_argsoption.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/operators/base.py |
Introduces interceptors param and base invoke_interceptors() helper. |
cosmos/operators/local.py |
Invokes interceptors before build_cmd() for local execution. |
cosmos/operators/kubernetes.py |
Invokes interceptors before building kube args / executing pod. |
cosmos/operators/docker.py |
Invokes interceptors before building docker command / executing container. |
cosmos/operators/aws_ecs.py |
Invokes interceptors before building ECS overrides/command. |
cosmos/operators/azure_container_instance.py |
Invokes interceptors before building ACI command. |
cosmos/operators/gcp_cloud_run_job.py |
Invokes interceptors before building Cloud Run Job command. |
docs/configuration/operator-args.rst |
Documents the new interceptors operator arg + example usage. |
tests/operators/test_local.py |
Adds unit test asserting interceptors run and affect vars/env for local. |
tests/operators/test_virtualenv.py |
Adds unit test for virtualenv operators (inherited behavior). |
tests/operators/test_kubernetes.py |
Adds unit test for interceptor behavior in Kubernetes mode. |
tests/operators/test_docker.py |
Adds unit test for interceptor behavior in Docker mode. |
tests/operators/test_aws_ecs.py |
Adds unit test for interceptor behavior in ECS mode. |
tests/operators/test_azure_container_instance.py |
Adds unit test for interceptor behavior in ACI mode. |
tests/operators/test_gcp_cloud_run_job.py |
Adds unit test for interceptor behavior in Cloud Run Job mode. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajkoti
left a comment
There was a problem hiding this comment.
LGTM. Have a minor question inline.
And maybe you have thought of this already and have your rationale, but wondering if the name injectors could suit better instead of interceptors for the use case we're tackling here?
|
Thanks a lot for the review, @pankajkoti !
Interceptor typically refers to a mechanism that observes, modifies, or blocks an operation at a well-defined point in its lifecycle. It often implies hooking into a control flow to inspect or change something - exactly what this PR does (it lets users run code before dbt commands). Interceptors are common in middleware, request pipelines, event systems, and AOP (aspect-oriented programming). Injectors, on the other hand, usually suggest adding or providing something (usually a dependency). In a dependency injection system, injectors provide objects with what they need; they typically don't observe or intervene in behaviour. The term focuses more on the insertion of dependencies than on the control of execution. I think in this particular feature, interceptor is a better fit because:
In contrast, injector would imply that the primary purpose is to insert something (like environment variables or config), which is part of what the new hook does, but not its defining behaviour. We could evolve the current version of the interceptor to, depending on its output, not even run the dbt command. At the moment, users could accomplish this by raising an exception, for instance. I'm curious about the community's feedback and how they'll use it. |
I see the association with dependency injection, which does come up in framework discussions. I am reading that injection as a term can be used more broadly across domains, for example, in Code injection, Command injection, Script injection, SQL injection, Chaos injection, fault injection (in testing), environment variable injection, feature flag injection, sidecar injection in Kubernetes, or trace context (trace ID) injection in observability, all of which influence runtime behaviour. Even dependency injection itself ultimately changes behaviour by supplying different implementations. I support |
Sometimes users may need to run commands before the dbt command is
executed in Cosmos tasks.
This behaviour is similar to the `callback` feature, but instead of
happening after dbt command execution, it happens before.
A use case is to calculate dbt or environment variables at runtime and
use them during dbt command execution.
Example:
```
def my_interceptor(context, operator):
# Modify vars (e.g. add execution date)
if operator.vars is None:
operator.vars = {}
operator.vars["run_date"] = str(context["data_interval_start"].date())
# Modify env
if operator.env is None:
operator.env = {}
operator.env["MY_VAR"] = "value"
# Example of usage in an operator
dbt_build = DbtBuildLocalOperator(
...,
interceptors=[my_interceptor],
)
# Example of usage in a DbtTaskGroup
dbt_task_group = DbtTaskGroup(
...,
operator_args={"interceptors": [my_interceptor]},
)
```
The current implementation works with all existing execution modes:
- Local
- Virtualenv
- Docker
- Kubernetes
- AWS EKS
- AWS ECS
- GCP Cloud Run Job
- Azure Container Instance
- Async (e.g. BigQuery)
- Watcher
- Watcher Kubernetes
1.14.0 (2026-04-07) --------------------- Breaking Changes * Drop support for Airflow versions earlier than **2.9** by @jedcunningham in #2288 * Fix inclusion of package models and selection/exclusion behavior by @pankajkoti in #2357 * ``ExecutionMode.WATCHER``: The per-node ``*_status`` XCom value is now a dict (``{"status": "<status>", "outlet_uris": [...]}``) instead of a plain string. Any custom code that reads these internal XCom keys directly will need to be updated by @pankajkoti in #2507 Features * Add cluster policy support for ``ExecutionMode.WATCHER`` sensor retries by @astro-anand in #2293 * Add debug mode to track memory utilization by @tatiana in #2327 * Add FQN selection support for ``LoadMode.DBT_MANIFEST`` by @pankajastro in #2375 * Introduce interceptors for Cosmos tasks by @tatiana in #2419 * Add config to allow disabling dag versioning by @pankajkoti in #2470 * Implement TaskGroups by models folder by @maximilianoarcieri and @tatiana in #1566, #2469, and #2420 * feat: implement DbtTestWatcherOperator by @michal-mrazek in #2447 * Add source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro and @tatiana in #2467 * Note: Like ``ExecutionMode.WATCHER``, this feature is experimental and its interface and implementation can change in the future. * Add Airflow 3.2 support by @pankajastro and @pankajkoti in #2472 Enhancements * Add watcher mode support for dbt test node states by @michal-mrazek in #2318 * Rename watcher-mode sensor retry queue and reuse it for producer tasks by @pankajastro in #2331 * Fix leaked semaphore warnings in Airflow 3 by resetting dbt adapters by @pankajkoti in #2335 * Improve dbt Fusion support and related tests by @tatiana in #2356 * Default Snowflake profile mappings to four threads by @tatiana in #2374 * Attempt to remove Pydantic as a dependency by @tatiana in #2377 * Log dbt-core and adapter versions in watcher consumer tasks by @pankajastro in #2412 * Log model errors in watcher consumer on dbt node failure by @pankajastro in #2431 * Reduce XCom read/write for tracking node state and errors in ConsumerWatcher task by @pankajastro in #2471 * Remove duplicate debug log in watcher subprocess path by @tatiana in #2494 * Simplify and unify WATCHER implementation regardless of InvocationMode by @tatiana in #2498 * Switch to lazy imports in cosmos/__init__.py by @pankajkoti in #2531 Bug Fixes * Handle invalid YAML errors with ``LoadMode.DBT_MANIFEST`` and ``RenderConfig.selector`` by @YourRoyalLinus in #2316 * Populate ``compiled_sql`` for ``InvocationMode.SUBPROCESS`` in ``ExecutionMode.WATCHER`` by @pankajkoti in #2319 * Fix select/exclude type mismatch by @tatiana in #2364 * Set ``emit_datasets=False`` for ``DbtTest*`` operators by @pankajastro in #2365 * Set correct queue priority for watcher producer tasks by @pankajastro in #2372 * Preserve ``extra_context`` for watcher consumer task instances by @pankajkoti in #2381 * Respect ``deferrable=False`` from ``operator_args`` on watcher consumer sensors by @pankajkoti in #2384 * Fix watcher queue precedence and add documentation by @pankajastro in #2391 * Do not set ``compiled_sql`` on ``ExecutionMode.WATCHER`` producers by @pankajkoti in #2440 * Remove const attribute for ``__cosmos_telemetry_metadata__`` dag param by @pankajkoti in #2466 * Remove timeout override from Cosmos watcher sensors by @tatiana and @claude in #2478 * Remove forced ``retries=0`` from watcher producer operators by @tatiana in #2479 * RFC: Add patch for newer versions of amazon provider when running dbt on EKS by @aoelvp94 in #2481 * Fix ``cosmos_debug_max_memory_mb`` XCom not pushed in Watcher sensor tasks by @tatiana in #2503 * Fix ``TestBehavior.NONE`` and ``TestBehavior.AFTER_ALL`` exclude ignored with selectors in ``ExecutionMode.WATCHER`` by @pankajkoti in #2511 * Move dataset emission for ``ExecutionMode.WATCHER`` from producer to consumer sensors by @pankajkoti in #2507 Docs * Document cluster policy configuration for ``ExecutionMode.WATCHER`` sensor tasks by @pankajastro in #2315 * Remove outdated docs for the dbt docs plugin with Airflow 3 by @pankajastro in #2353 * Make Watcher DBT Execution Queue heading clickable by @pankajastro in #2354 * Update ``ExecutionMode.WATCHER`` documentation regarding test node implementation by @jroachgolf84 in #2355 * Fix ``pre_dbt_fusion`` configuration rendering by @pankajastro in #2369 * Add documentation for including/excluding nodes based on FQN by @pankajastro in #2371 * Update watcher execution mode documentation by @tatiana in #2380 * Add documentation for ``DbtSeedLocalOperator`` by @jroachgolf84 in #2383 * Fix miscellaneous Sphinx warnings by @pankajastro in #2395 * Improve contributing documentation by @lzdanski in #2397 * Add **Get Started in 5 Minutes** guide by @lzdanski in #2398 * Add Sphinx redirects package for documentation redirects by @lzdanski in #2407 * Restructure **Getting Started** and **Guides** sections by @lzdanski in #2418 * Add open-source quickstart by @lzdanski in #2439 * Fix documentation redirects by @lzdanski in #2442 * Restructure and refactor reference documentation by @lzdanski in #2443 * Add execution modes decision documentation by @lzdanski in #2444 * Add **Core Concepts** page to Getting Started by @lzdanski in #2448 * Add guide: *How Cosmos Works* by @lzdanski in #2449 * Update **Getting Started** overview and index pages by @lzdanski in #2452 * Add guide: *How Cosmos Runs dbt* by @lzdanski in #2453 * Fix miscellaneous documentation links by @lzdanski in #2454 * Add Mermaid diagrams and execution mode diagrams by @lzdanski and @tatiana in #2459 * Add documentation for memory optimization options by @pankajastro in #2340 * Fix typo in watcher execution mode docs by @evanvolgas in #2485 * Fix minor documentation issues by @evanvolgas in #2489 * Add troubleshooting note for dbt debug logs in ExecutionMode.WATCHER by @tatiana in #2491 * docs: unify RST header styles across documentation by @jigangz in #2473 * docs: fix env var for rich logging by @vricciardulli in #2514 * docs: update dbt project path example for Airflow 3 Astro compatibility by @yeoreums in #2512 * Document missing Cosmos Airflow config settings in cosmos-conf.rst by @tatiana in #2515 * Split security-privacy policy doc and add dependency cooldown by @pankajkoti in #2519 * Add performance optimization and troubleshooting docs by @pankajkoti in #2521 * Update copyright year to 2026 by @tayloramurphy in #2527 * docs: Updating "Project Policies" to "Policies" in menu bar by @jroachgolf84 in #2526 Others * Fix tests after removing support for Airflow versions earlier than 2.9 by @tatiana in #2321 * Enable listener tests for Airflow 3.1 by @pankajastro in #2348 * Accept ``int`` or ``float`` for ``cosmos_debug_max_memory_mb`` in integration tests by @pankajkoti in #2352 * Update ``CODEOWNERS`` to prioritize ``oss-integrations`` by @tatiana in #2359 * Fix automatic reviewer assignment in GitHub by @tatiana and @phanikumv in #2360 * Improve PyPI tagging by @tatiana in #2363 * Add integration tests for dbt Fusion and ``ExecutionMode.WATCHER`` by @tatiana in #2373 * Fix Zizmor check by @tatiana in #2376 * Remove ``methodtools`` dependency by @tatiana in #2378 * Improve comments on #2389 by @evanvolgas in #2394 * Refactor ``load_from_dbt_manifest`` to reduce code complexity by @pankajkoti in #2399 * Refactor ``_handle_no_precursors_or_descendants`` to reduce complexity by @pankajkoti in #2400 * Improve issue templates by @tatiana in #2401 * Avoid running tests when only docs change by @tatiana in #2402 * Add ``no-reload`` target for serving docs locally by @pankajkoti in #2405 * Fix test hash checks on macOS by @tatiana in #2406 * Attempt deterministic dbt project copy in test fixtures by @pankajkoti in #2409 * Pin ``virtualenv <21`` due to hatch incompatibility in CI by @pankajkoti in #2410 * Revert virtualenv pin for hatch installation in CI by @pankajkoti in #2426 * Add version comments for commit SHA pinned GitHub Actions by @pankajkoti in #2436 * Fix ``hatch run docs:build`` issues by @tatiana in #2437 * Minor code improvements by @dnskr in #2446 * Pre-commit autoupdate by @pre-commit-ci in #2367, #2396, #2422, #2451, #2468, #2495, and #2516 * Add file to support Claude understanding the Cosmos repository by @tatiana in #2458 * Dependency updates by @dependabot in #2368, #2425, #2435, #2465, #2475, #2504, #2518, and #2528 * Isolate Scarf telemetry integration test into its own CI job by @pankajkoti and @claude in #2477 * ci: upgrade Airflow version to 3.1 in MyPy type-check job by @yeoreums in #2506 * Add commit message guidelines to CLAUDE.md by @pankajkoti in #2509 * Extend skipping tests in CI for more non-code file changes by @pankajkoti in #2510 * Add Dependabot pre-commit support with 7-day cooldown by @pankajkoti in #2517 * Enforce zero warnings policy for documentation by @dnskr in #2513 Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Sometimes users may need to run commands before the dbt command is executed in Cosmos tasks.
This behaviour is similar to the
callbackfeature, but instead of happening after dbt command execution, it happens before.A use case is to calculate dbt or environment variables at runtime and use them during dbt command execution.
Example:
The current implementation works with all existing execution modes: