From 5dd3ba3d2491dbe9afb4bdc4d513a8f7825c4159 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 6 Jan 2026 13:42:10 +0000 Subject: [PATCH 1/6] Compress telemetry metadata to reduce serialized DAG size Implement gzip compression + base64 encoding for telemetry metadata stored in dag.params. This reduces the size of serialized DAGs in Airflow's database. Changes: - Add _compress_telemetry_metadata() and _decompress_telemetry_metadata() to cosmos/telemetry.py - Update converter to compress metadata before storing in dag.params - Update dag_run_listener to decompress metadata when reading - Catch specific exceptions during decompression (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError) - Add size comparison logging - Update tests to verify compression --- cosmos/converter.py | 8 ++++++-- cosmos/listeners/dag_run_listener.py | 18 ++++++++++++++--- cosmos/telemetry.py | 29 ++++++++++++++++++++++++++++ tests/test_converter.py | 9 ++++++++- 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 5fc50587ff..0fd44b43bc 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -33,6 +33,7 @@ # TODO: Move _get_profile_config_attribute at common place from cosmos.listeners.task_instance_listener import _get_profile_config_attribute from cosmos.log import get_logger +from cosmos.telemetry import _compress_telemetry_metadata from cosmos.versioning import _create_folder_version_hash logger = get_logger(__name__) @@ -437,5 +438,8 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 # Store metadata in dag.params which is preserved during serialization # Using a key that's unlikely to conflict with user params - dag.params["__cosmos_telemetry_metadata__"] = Param(default=metadata, const=metadata) - logger.debug(f"Stored Cosmos telemetry metadata in DAG {dag.dag_id} params: {metadata}") + compressed_metadata = _compress_telemetry_metadata(metadata) + dag.params["__cosmos_telemetry_metadata__"] = Param(default=compressed_metadata, const=compressed_metadata) + logger.debug( + f"Stored compressed Cosmos telemetry metadata in DAG {dag.dag_id} params (original size: {len(str(metadata))} bytes, compressed: {len(compressed_metadata)} bytes)" + ) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 5d8ca2ed49..4decefef37 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,6 +1,9 @@ from __future__ import annotations +import binascii +import gzip import hashlib +import json from typing import TYPE_CHECKING, Any from airflow.listeners import hookimpl @@ -12,6 +15,7 @@ from cosmos import telemetry from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION from cosmos.log import get_logger +from cosmos.telemetry import _decompress_telemetry_metadata AIRFLOW_VERSION_MAJOR = AIRFLOW_VERSION.major @@ -63,9 +67,17 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: Returns the metadata dictionary stored by the converter in dag.params, or an empty dict if not present. """ - # Metadata is stored in dag.params to survive serialization - metadata = dag.params.get("__cosmos_telemetry_metadata__", {}) - return metadata if isinstance(metadata, dict) else {} + # Metadata is stored as compressed string in dag.params to survive serialization + compressed_metadata = dag.params.get("__cosmos_telemetry_metadata__") + + if not compressed_metadata: + return {} + + try: + return _decompress_telemetry_metadata(compressed_metadata) + except (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError) as e: + logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}") + return {} @hookimpl diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 21654495da..fd67bc4079 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -1,6 +1,10 @@ from __future__ import annotations +import gzip +import json import platform +from base64 import b64decode, b64encode +from typing import Any from urllib import parse from urllib.parse import urlencode @@ -88,3 +92,28 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, else: logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False + + +def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str: + """ + Compress and encode telemetry metadata to reduce serialized DAG size. + + :param metadata: Telemetry metadata dictionary + :returns: Base64-encoded gzip-compressed JSON string + """ + json_bytes = json.dumps(metadata).encode("utf-8") + compressed = gzip.compress(json_bytes, compresslevel=9) + return b64encode(compressed).decode("ascii") + + +def _decompress_telemetry_metadata(compressed_data: str) -> dict[str, Any]: + """ + Decompress and decode telemetry metadata. + + :param compressed_data: Base64-encoded gzip-compressed JSON string + :returns: Original metadata dictionary + """ + compressed_bytes = b64decode(compressed_data.encode("ascii")) + json_bytes = gzip.decompress(compressed_bytes) + result: dict[str, Any] = json.loads(json_bytes.decode("utf-8")) + return result diff --git a/tests/test_converter.py b/tests/test_converter.py index 4a94a52037..773691674b 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -12,6 +12,7 @@ from cosmos.dbt.graph import DbtGraph, DbtNode from cosmos.exceptions import CosmosValueError from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping +from cosmos.telemetry import _decompress_telemetry_metadata SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" @@ -1149,7 +1150,13 @@ def test_telemetry_metadata_storage(mock_load_dbt_graph): # Verify metadata is stored in dag.params assert "__cosmos_telemetry_metadata__" in dag.params - metadata = dag.params["__cosmos_telemetry_metadata__"] + compressed_metadata = dag.params["__cosmos_telemetry_metadata__"] + + # Verify it's compressed (should be a string) + assert isinstance(compressed_metadata, str) + + # Decompress to verify the contents + metadata = _decompress_telemetry_metadata(compressed_metadata) # Verify expected metadata keys are present assert "used_automatic_load_mode" in metadata From ab40a121868a9935e6c5be346ed303fbe20df324 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 6 Jan 2026 14:32:31 +0000 Subject: [PATCH 2/6] Fix non-deterministic compression causing Param validation errors Set mtime=0 in gzip.compress() to make compression deterministic. Without this, gzip includes a timestamp in the header, causing the compressed output to differ slightly each time even with identical input. This broke Airflow's Param const validation which requires the value to remain exactly the same. --- cosmos/telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index fd67bc4079..a60ca5ee2a 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -102,7 +102,7 @@ def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str: :returns: Base64-encoded gzip-compressed JSON string """ json_bytes = json.dumps(metadata).encode("utf-8") - compressed = gzip.compress(json_bytes, compresslevel=9) + compressed = gzip.compress(json_bytes, compresslevel=9, mtime=0) return b64encode(compressed).decode("ascii") From 48d1a88e804b021045e0c79f35131ae3e639dd5b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 6 Jan 2026 15:31:25 +0000 Subject: [PATCH 3/6] Add test coverage for telemetry metadata decompression error handling Test that get_cosmos_telemetry_metadata gracefully handles invalid compressed data and missing metadata by returning empty dict instead of raising exceptions. --- tests/listeners/test_dag_run_listener.py | 27 +++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 5da38ffb68..64e98c4759 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -14,7 +14,12 @@ from cosmos.airflow.task_group import DbtTaskGroup from cosmos.config import ExecutionConfig, RenderConfig from cosmos.constants import AIRFLOW_VERSION, InvocationMode, LoadMode, SourceRenderingBehavior, TestBehavior -from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks +from cosmos.listeners.dag_run_listener import ( + get_cosmos_telemetry_metadata, + on_dag_run_failed, + on_dag_run_success, + total_cosmos_tasks, +) from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -82,6 +87,26 @@ def test_not_cosmos_dag(): assert total_cosmos_tasks(dag) == 0 +def test_get_cosmos_telemetry_metadata_with_invalid_data(): + """Test that get_cosmos_telemetry_metadata handles invalid compressed data gracefully.""" + with DAG("test-dag", start_date=datetime(2022, 1, 1)) as dag: + # Set invalid base64 data that will fail decompression + dag.params["__cosmos_telemetry_metadata__"] = "invalid_base64_data!" + + # Should return empty dict instead of raising exception + result = get_cosmos_telemetry_metadata(dag) + assert result == {} + + +def test_get_cosmos_telemetry_metadata_with_no_metadata(): + """Test that get_cosmos_telemetry_metadata returns empty dict when no metadata present.""" + with DAG("test-dag", start_date=datetime(2022, 1, 1)) as dag: + pass + + result = get_cosmos_telemetry_metadata(dag) + assert result == {} + + def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun: if AIRFLOW_VERSION < Version("3.0"): # Airflow 2 and 3.0 From 24d32801260363596319e0d7cad7f4580ea277a7 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 6 Jan 2026 16:25:44 +0000 Subject: [PATCH 4/6] Document mtime=0 and add deterministic compression tests Add explanation in docstring that mtime=0 ensures deterministic compression output, preventing Airflow Param validation errors. Add tests to verify compression is deterministic and roundtrip works correctly. --- cosmos/telemetry.py | 5 +++++ tests/test_telemetry.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index a60ca5ee2a..c6da4894ce 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -98,6 +98,11 @@ def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str: """ Compress and encode telemetry metadata to reduce serialized DAG size. + Uses mtime=0 in gzip compression to ensure deterministic output regardless of when + the compression occurs. This prevents spurious Airflow Param validation errors that + would occur if the same metadata compressed at different times produced different + base64 strings. + :param metadata: Telemetry metadata dictionary :returns: Base64-encoded gzip-compressed JSON string """ diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index a85435ae93..e65fb7c46f 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -7,6 +7,45 @@ from cosmos import telemetry +def test_compress_telemetry_metadata_is_deterministic(): + """Test that compressing the same metadata multiple times produces identical output.""" + metadata = { + "cosmos_version": "1.8.0", + "airflow_version": "2.10.1", + "python_version": "3.11", + "execution_mode": "local", + "install_deps": True, + } + + # Compress the same metadata multiple times + compressed_1 = telemetry._compress_telemetry_metadata(metadata) + compressed_2 = telemetry._compress_telemetry_metadata(metadata) + compressed_3 = telemetry._compress_telemetry_metadata(metadata) + + # All compressed outputs should be identical (deterministic) + assert compressed_1 == compressed_2 + assert compressed_2 == compressed_3 + + +def test_compress_decompress_telemetry_metadata_roundtrip(): + """Test that metadata can be compressed and decompressed correctly.""" + original_metadata = { + "cosmos_version": "1.8.0", + "airflow_version": "2.10.1", + "python_version": "3.11", + "execution_mode": "local", + "install_deps": False, + "dbt_command": "run", + } + + # Compress and decompress + compressed = telemetry._compress_telemetry_metadata(original_metadata) + decompressed = telemetry._decompress_telemetry_metadata(compressed) + + # Should get back the original metadata + assert decompressed == original_metadata + + def test_should_emit_is_true_by_default(): assert telemetry.should_emit() From 4d3274defa9e800c3af54ebb5c88622b6f406f74 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 6 Jan 2026 21:56:27 +0530 Subject: [PATCH 5/6] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/listeners/dag_run_listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 4decefef37..c772e96193 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -75,7 +75,7 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: try: return _decompress_telemetry_metadata(compressed_metadata) - except (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError) as e: + except (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError, UnicodeDecodeError) as e: logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}") return {} From 6471c31f6e1145fbc219decde7cfa77e468cf0f0 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 7 Jan 2026 14:58:52 +0000 Subject: [PATCH 6/6] Refactor telemetry compression from gzip to zlib --- cosmos/listeners/dag_run_listener.py | 4 ++-- cosmos/telemetry.py | 15 +++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index c772e96193..0cab34d098 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,9 +1,9 @@ from __future__ import annotations import binascii -import gzip import hashlib import json +import zlib from typing import TYPE_CHECKING, Any from airflow.listeners import hookimpl @@ -75,7 +75,7 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: try: return _decompress_telemetry_metadata(compressed_metadata) - except (binascii.Error, gzip.BadGzipFile, json.JSONDecodeError, EOFError, UnicodeDecodeError) as e: + except (binascii.Error, zlib.error, json.JSONDecodeError, UnicodeDecodeError) as e: logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}") return {} diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index c6da4894ce..87c53570ed 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -1,8 +1,8 @@ from __future__ import annotations -import gzip import json import platform +import zlib from base64 import b64decode, b64encode from typing import Any from urllib import parse @@ -98,16 +98,11 @@ def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str: """ Compress and encode telemetry metadata to reduce serialized DAG size. - Uses mtime=0 in gzip compression to ensure deterministic output regardless of when - the compression occurs. This prevents spurious Airflow Param validation errors that - would occur if the same metadata compressed at different times produced different - base64 strings. - :param metadata: Telemetry metadata dictionary - :returns: Base64-encoded gzip-compressed JSON string + :returns: Base64-encoded zlib-compressed JSON string """ json_bytes = json.dumps(metadata).encode("utf-8") - compressed = gzip.compress(json_bytes, compresslevel=9, mtime=0) + compressed = zlib.compress(json_bytes, level=9) return b64encode(compressed).decode("ascii") @@ -115,10 +110,10 @@ def _decompress_telemetry_metadata(compressed_data: str) -> dict[str, Any]: """ Decompress and decode telemetry metadata. - :param compressed_data: Base64-encoded gzip-compressed JSON string + :param compressed_data: Base64-encoded zlib-compressed JSON string :returns: Original metadata dictionary """ compressed_bytes = b64decode(compressed_data.encode("ascii")) - json_bytes = gzip.decompress(compressed_bytes) + json_bytes = zlib.decompress(compressed_bytes) result: dict[str, Any] = json.loads(json_bytes.decode("utf-8")) return result