Skip to content

Add telemetry task listener metrics for Cosmos operators#2195

Merged
pankajkoti merged 15 commits into
mainfrom
telemetry-task-metrics
Dec 18, 2025
Merged

Add telemetry task listener metrics for Cosmos operators#2195
pankajkoti merged 15 commits into
mainfrom
telemetry-task-metrics

Conversation

@pankajkoti
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti commented Dec 12, 2025

The PR introduces the Airflow task-instance listener and emits task-level telemetry for Cosmos operators (operator name, invocation mode, execution mode, dbt command, install_deps flag, has_callback, subclass detection)

related: #2110
related: #2192

@netlify
Copy link
Copy Markdown

netlify Bot commented Dec 12, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit a9f551c
🔍 Latest deploy log https://app.netlify.com/projects/sunny-pastelito-5ecb04/deploys/693bd6b27463b00008a56075

@netlify
Copy link
Copy Markdown

netlify Bot commented Dec 12, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 3bb82c2
🔍 Latest deploy log https://app.netlify.com/projects/sunny-pastelito-5ecb04/deploys/693c43744233b90008a458fc

@netlify
Copy link
Copy Markdown

netlify Bot commented Dec 15, 2025

Deploy Preview for astronomer-cosmos canceled.

Name Link
🔨 Latest commit f07ea0e
🔍 Latest deploy log https://app.netlify.com/projects/astronomer-cosmos/deploys/6942c1b2be255900082e00ef

@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 15, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.88%. Comparing base (fda9630) to head (f07ea0e).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2195      +/-   ##
==========================================
+ Coverage   97.85%   97.88%   +0.03%     
==========================================
  Files          94       95       +1     
  Lines        6066     6154      +88     
==========================================
+ Hits         5936     6024      +88     
  Misses        130      130              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajkoti pankajkoti changed the title Emit telemetry task metrics Add telemetry task listener metrics for Cosmos operators Dec 16, 2025
@pankajkoti pankajkoti marked this pull request as ready for review December 16, 2025 15:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds telemetry collection for Cosmos task instances by implementing an Airflow task instance listener. The listener captures task-level metrics including operator name, invocation mode, execution mode, dbt command, and callback presence when Cosmos operators complete.

Key changes:

  • Implemented task instance lifecycle hooks that emit telemetry on success/failure
  • Added comprehensive test coverage for metric extraction logic
  • Registered the listener in both Airflow 2 and Airflow 3 plugins

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
cosmos/listeners/task_instance_listener.py Core implementation of task instance listener with metric extraction helpers
cosmos/plugin/airflow2.py Registered task_instance_listener in Airflow 2 plugin
cosmos/plugin/airflow3.py Registered task_instance_listener in Airflow 3 plugin
tests/listeners/test_task_instance_listener.py Comprehensive test suite covering metric extraction and listener behavior

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/listeners/test_task_instance_listener.py
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Comment thread cosmos/listeners/task_instance_listener.py Outdated
Comment thread cosmos/listeners/task_instance_listener.py Outdated
@pankajkoti pankajkoti force-pushed the telemetry-task-metrics branch from 50f2c5a to f07ea0e Compare December 17, 2025 14:44
@tatiana tatiana added this to the Cosmos 1.12.0 milestone Dec 18, 2025
Comment thread cosmos/listeners/task_instance_listener.py
Comment thread cosmos/listeners/task_instance_listener.py
Comment thread cosmos/listeners/task_instance_listener.py
"is_cosmos_operator_subclass": _is_cosmos_subclass(task_instance),
"invocation_mode": _invocation_mode(task_instance),
"execution_mode": _execution_mode_from_task(task_instance),
"map_index": task_instance.map_index,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the map_indexand why do we have it here?

Copy link
Copy Markdown
Contributor Author

@pankajkoti pankajkoti Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_index is the one that gets attached to task instances when using dynaming task mapping. I thought it would be nice to capture that since it's already available on the task instance.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajkoti I do not believe we should add what the map_index is, but I think we should add if it were a mapped task (boolean). And we need to make this change consistent in https://github.com/astronomer/ap-vendor/pull/1102/changes#r2631088199

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a follow-up PR to address this suggestion: #2210

Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pankajkoti , thanks for implementing this! I left minor comments in line.

The significant risk with this change is that our telemetry logic fails, causing the task to fail. We may need to add some exception handling there if issues surface, but it is a good starting point.

It would be great to have follow up tickets for the work we did not include here yet, and assign to the next milestone

@pankajkoti
Copy link
Copy Markdown
Contributor Author

I am proceeding with the merge, having addressed the questions inline. Please let me know if you'd like me to address something further @tatiana

cc: @pankajastro

@pankajkoti pankajkoti merged commit ec7c6b0 into main Dec 18, 2025
88 checks passed
@pankajkoti pankajkoti deleted the telemetry-task-metrics branch December 18, 2025 13:18
pankajkoti added a commit that referenced this pull request Dec 18, 2025
…2210)

Change task telemetry to emit `is_mapped_task` boolean instead of raw
`map_index` integer value for clearer intent

related: #2195 
related: #2110
@pankajkoti pankajkoti mentioned this pull request Dec 18, 2025
pankajkoti added a commit that referenced this pull request Dec 18, 2025
Breaking changes

* Introduced in the PR #2080. The following functions are expected to be
used internally only to Cosmos, so we hope these won't impact end-users,
but we are documenting the changes just in case:
- ``generate_task_or_group`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` now expects the ``node_converters`` argument
* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165

Features

* Support applying ``node_converter`` at a task level instead of task
group level by @anyapriya in #1759
* Allow overriding ``DbtProducerWatcherOperator`` parameters via
``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133
* Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by
@pankajastro in #2084
* Support real-time consumer updates when using
``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by
@pankajastro in #2152
* Update telemetry to v3 format with query parameters by @pankajkoti in
#2192
* Add initial set of telemetry task listener metrics for Cosmos
operators by @pankajkoti in #2195

Enhancements

* Unify Airflow version handling into ``constants.py`` by @tatiana in
#2089
* Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in
#2080
* Force watcher producer retries to zero by @pankajkoti in #2114
* Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the
producer fails using Airflow context by @pankajkoti in #2126
* ``ExecutonMode.WATCHER``: fetch producer status asynchronously from
the Airflow runtime so deferrable sensors fail immediately when the
producer task fails by @pankajkoti in #2144
* Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log
parser by @tatiana in #2183
* Replace map_index with is_mapped_task boolean in task telemetry
metrics by @pankajkoti in #2210
* Collect cosmos profile metrics in task telemetry metrics by
@pankajastro in #2198
* Remove unnecessary information from telemetry by @tatiana in #2211

Bug fixes

* Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by
@pankajkoti in #2124
* Remove empty test tasks when all tests are detached by @anyapriya in
#2010
* Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by
@michal-mrazek in #2127
* Add databricks oauth mock profile by @fjmacagno in #2164
* Register listeners in Airflow 3 plugin implementation by @pankajastro
in #2187
* Fix resolution of ``packages-install-path`` when it uses ``env_var``
by @tatiana in #2194
* Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include
``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis
in #2209
* Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro
in #2184
* Remove dag_run_id from telemetry tests by @tatiana in #2213

Docs

* Document dataset-event limitation when using
``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143
* Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana  in #2139
* Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro
in #2115
* Fix reStructuredText formatting by @dnskr in #2132
* Add docs for ``setup_operator_args`` param by @pankajastro in #2136
* Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #2153
* Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti
in #2167
* Update PRIVACY_NOTICE.rst by @tatiana in #2212

Others

* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165
* Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140
* Enable tests for Python 3.13 by @pankajastro in #2154
* Add Python 3.12 to CI integration tests matrix by @pankajastro in
#2168
* Retry flaky Telemetry success test to stabilise CI by @pankajkoti in
#2138
* Drop unused producer state xcom handling in ``ExecutionMode.WATCHER``
by @pankajkoti in #2145
* Remove unused Python3.9 uses from Github action CI by @pankajastro in
#2117
* Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in
#2150
* Refactor: Use shared airflow version constant by @pankajkoti in #2157
* Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in
#2172
* Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170
* Add targeted ``type: ignore`` for untyped decorators to fix ``mypy``
errors by @pankajastro in #2174
* Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by
@pankajastro in #2175
* Refactor to reuse ``load_method_from_module`` from
``_utils/importer.py`` by @pankajastro in #2176
* Remove try except block for cache import and unused python_version
variable by @pankajastro in #2186
* Unpin Airflow to satisfy GitHub Security tab requirements by
@pankajastro in #2171
* Update Python version for ``pyupgrade`` in ``pre-commit`` config by
@pankajastro in #2190
* Add cooldown config in ``dependabot`` config by @pankajastro in #2189
* Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in
#2196
* Remove empty variables emission from telemetry metrics by @pankajkoti
in #2197
* Reformat documented comments for historical URL formats by @pankajkoti
in #2199
* Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot
in #2135
* Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by
@dependabot in #2147
* Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by
@dependabot in #2156
* Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot
in #2155
* Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot
in #2178
* Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by
@dependabot in #2208
* pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173,
#2191, #2202

closes:
astronomer/oss-integrations-private#275
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants