Skip to content

Compress telemetry metadata to reduce serialized DAG size #2252

Merged
pankajkoti merged 6 commits into
mainfrom
compress-telemetry-metadata
Jan 7, 2026
Merged

Compress telemetry metadata to reduce serialized DAG size #2252
pankajkoti merged 6 commits into
mainfrom
compress-telemetry-metadata

Conversation

@pankajkoti
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti commented Jan 6, 2026

Implement gzip compression + base64 encoding for telemetry metadata stored in dag.params. This reduces the size of serialized DAGs in Airflow's database.

Changes:

  • Add _compress_telemetry_metadata() and _decompress_telemetry_metadata() to cosmos/telemetry.py
  • Update converter to compress metadata before storing in dag.params
  • Update dag_run_listener to decompress metadata when reading
  • Catch specific exceptions during decompression (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError)
  • Add size comparison logging
  • Update tests to verify compression

related: #2223
closes: https://github.com/astronomer/oss-integrations-private/issues/300

@netlify
Copy link
Copy Markdown

netlify Bot commented Jan 6, 2026

Deploy Preview for astronomer-cosmos canceled.

Name Link
🔨 Latest commit 6471c31
🔍 Latest deploy log https://app.netlify.com/projects/astronomer-cosmos/deploys/695e75b686912600080c5f14

@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 6, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.03%. Comparing base (4e7bc91) to head (6471c31).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2252   +/-   ##
=======================================
  Coverage   98.03%   98.03%           
=======================================
  Files          98       98           
  Lines        6352     6377   +25     
=======================================
+ Hits         6227     6252   +25     
  Misses        125      125           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajkoti pankajkoti marked this pull request as ready for review January 6, 2026 16:06
Copilot AI review requested due to automatic review settings January 6, 2026 16:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements gzip compression with base64 encoding for telemetry metadata to reduce the size of serialized DAGs in Airflow's database. The compression is applied when storing metadata in dag.params and decompressed when reading it back.

Key changes:

  • Added compression/decompression utility functions using gzip (level 9) and base64 encoding
  • Updated the converter to compress metadata before storage with size comparison logging
  • Modified the dag_run_listener to safely decompress metadata with proper exception handling

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
cosmos/telemetry.py Adds _compress_telemetry_metadata() and _decompress_telemetry_metadata() helper functions
cosmos/converter.py Compresses metadata before storing in dag.params and logs size comparison
cosmos/listeners/dag_run_listener.py Updates get_cosmos_telemetry_metadata() to decompress metadata with error handling
tests/test_converter.py Verifies compression by checking metadata type and decompressing for content validation
tests/listeners/test_dag_run_listener.py Adds tests for handling invalid compressed data and missing metadata cases

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/telemetry.py Outdated
Comment thread cosmos/listeners/dag_run_listener.py Outdated
@pankajkoti
Copy link
Copy Markdown
Contributor Author

Screenshot showing compressed encoded data in the conf:
Screenshot 2026-01-06 at 10 06 09 PM

Comment thread cosmos/telemetry.py Outdated
Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving this so quickly, @pankajkoti ! I left a minor comment, and I'm happy with the implementation.

pankajkoti and others added 6 commits January 7, 2026 20:33
Implement gzip compression + base64 encoding for telemetry metadata stored in dag.params. This reduces the size of serialized DAGs in Airflow's database.

Changes:
- Add _compress_telemetry_metadata() and _decompress_telemetry_metadata() to cosmos/telemetry.py
- Update converter to compress metadata before storing in dag.params
- Update dag_run_listener to decompress metadata when reading
- Catch specific exceptions during decompression (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError)
- Add size comparison logging
- Update tests to verify compression
Set mtime=0 in gzip.compress() to make compression deterministic. Without this, gzip includes a timestamp in the header, causing the compressed output to differ slightly each time even with identical input. This broke Airflow's Param const validation which requires the value to remain exactly the same.
Test that get_cosmos_telemetry_metadata gracefully handles invalid compressed data and missing metadata by returning empty dict instead of raising exceptions.
Add explanation in docstring that mtime=0 ensures deterministic compression output, preventing Airflow Param validation errors. Add tests to verify compression is deterministic and roundtrip works correctly.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@pankajkoti pankajkoti force-pushed the compress-telemetry-metadata branch from f5e0d4f to 6471c31 Compare January 7, 2026 15:03
@pankajkoti pankajkoti merged commit c251c36 into main Jan 7, 2026
90 checks passed
@pankajkoti pankajkoti deleted the compress-telemetry-metadata branch January 7, 2026 15:37
@tatiana tatiana modified the milestones: Cosmos 2.0.0, Cosmos 1.13.0 Jan 7, 2026
@pankajastro pankajastro mentioned this pull request Jan 29, 2026
tatiana added a commit that referenced this pull request Jan 30, 2026
Features

* Support cross-referencing models across dbt projects using dbt-loom by
@pankajkoti in #2271
* Support use of YAML selectors when using ``LoadMode.DBT_MANIFEST`` by
@YourRoyalLinus in #2261
* Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use the watcher with
``KubernetesPodOperator`` by @tatiana in #2207
* Add support for StarRocks profile mapping by @kurkim0661 in #2256
* Allow pushing URIs as XComs for Cosmos tasks by @corsettigyg in #2275
* Support defining custom callbacks alongside the ``WATCHER_KUBERNETES``
callback by @johnhoran in #2307

Enhancements

* Refactor: remove duplicate ``_construct_dest_file_path`` by @jx2lee in
#2077
* Leverage Airflow ``::group::`` to group logs associated with DAG
parsing by @tatiana in #2235
* Refactor ``DbtConsumerWatcherSensor`` for reusability by @tatiana in
#2245
* Restore plain text output when using ``ExecutionMode.WATCHER`` by
@tiovader in #2241

Bug Fixes

* Fix running empty models or ephemeral nodes in
``ExecutionMode.WATCHER`` by @tatiana in #2279
* Improve watcher producer task priority in scheduling and the UI by
@tatiana in #2237
* Fix typos and formatting issues in documentation by @pankajkoti in
#2259
* Allow watcher producer retries without erroring by @tatiana in #2283
* Fix ``TestBehavior.AFTER_ALL`` is missing project_name information
when loading project using manifest file by @tuantran0910 in #2242
* Fix duplicate log lines in watcher subprocess execution and format
timestamps by @pankajkoti in #2301

Docs

* Add Watcher Kubernetes documentation by @tatiana in #2303
* Document newly added telemetry metrics in the privacy notice by
@pankajkoti in #2249
* Add compatibility policy document by @pankajastro in #2251
* Improve watcher documentation related to dbt threads by @tatiana in
#2273
* Fix link in watcher execution mode documentation by @jedcunningham in
#2277
* Update Apache Airflow minimum compatibility policy by @tatiana in
#2285
* Clarify Cosmos runtime support until "End of Basic Support" by
@jedcunningham in #2286
* Update watcher docs by @tatiana in #2298
* Update watcher kubernetes documentation by @tatiana in #2306

Others

* Add Airflow 3 DAG versioning tests for Cosmos by @michal-mrazek in
#2177
* Add dbt Core 1.11 to the test matrix by @tatiana in #2230
* Add integration tests using InvocationMode.SUBPROCESS and validate
output by @tatiana in #2287
* Fix main branch failing tests by @tatiana in #2296
* Update pre-commit hooks to the latest versions by @jedcunningham in
#2289
* Pre-commit autoupdates by @pre-commit in #2222, #2264, #2274 and #2290
* Dependabot updates by @dependabot in #2218, #2219, #2220, #2280 and
#2284
* Add Scarf metrics to understand Cosmos feature usage patterns
- Add telemetry tracking for dbt docs plugin usage by @pankajkoti in
#2240
- Add DAG run telemetry metrics for load mode, invocation, and
render_config parameters by @pankajkoti in #2223
  - Collect profile metrics for DAG runs by @pankajastro in #2228
- Compress telemetry metadata to reduce serialized DAG size by
@pankajkoti in #2252
- Skip storing telemetry metadata when emission is disabled by
@pankajkoti in #2278
- Hide telemetry metadata parameters from the Airflow trigger UI by
@pankajkoti in #2247

closes:
astronomer/oss-integrations-private#317

---------

Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants