diff --git a/cosmos/converter.py b/cosmos/converter.py index 5fc50587ff..0fd44b43bc 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -33,6 +33,7 @@ # TODO: Move _get_profile_config_attribute at common place from cosmos.listeners.task_instance_listener import _get_profile_config_attribute from cosmos.log import get_logger +from cosmos.telemetry import _compress_telemetry_metadata from cosmos.versioning import _create_folder_version_hash logger = get_logger(__name__) @@ -437,5 +438,8 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 # Store metadata in dag.params which is preserved during serialization # Using a key that's unlikely to conflict with user params - dag.params["__cosmos_telemetry_metadata__"] = Param(default=metadata, const=metadata) - logger.debug(f"Stored Cosmos telemetry metadata in DAG {dag.dag_id} params: {metadata}") + compressed_metadata = _compress_telemetry_metadata(metadata) + dag.params["__cosmos_telemetry_metadata__"] = Param(default=compressed_metadata, const=compressed_metadata) + logger.debug( + f"Stored compressed Cosmos telemetry metadata in DAG {dag.dag_id} params (original size: {len(str(metadata))} bytes, compressed: {len(compressed_metadata)} bytes)" + ) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 5d8ca2ed49..0cab34d098 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,6 +1,9 @@ from __future__ import annotations +import binascii import hashlib +import json +import zlib from typing import TYPE_CHECKING, Any from airflow.listeners import hookimpl @@ -12,6 +15,7 @@ from cosmos import telemetry from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION from cosmos.log import get_logger +from cosmos.telemetry import _decompress_telemetry_metadata AIRFLOW_VERSION_MAJOR = AIRFLOW_VERSION.major @@ -63,9 +67,17 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: Returns the metadata dictionary stored by the converter in dag.params, or an empty dict if not present. """ - # Metadata is stored in dag.params to survive serialization - metadata = dag.params.get("__cosmos_telemetry_metadata__", {}) - return metadata if isinstance(metadata, dict) else {} + # Metadata is stored as compressed string in dag.params to survive serialization + compressed_metadata = dag.params.get("__cosmos_telemetry_metadata__") + + if not compressed_metadata: + return {} + + try: + return _decompress_telemetry_metadata(compressed_metadata) + except (binascii.Error, zlib.error, json.JSONDecodeError, UnicodeDecodeError) as e: + logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}") + return {} @hookimpl diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 21654495da..87c53570ed 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -1,6 +1,10 @@ from __future__ import annotations +import json import platform +import zlib +from base64 import b64decode, b64encode +from typing import Any from urllib import parse from urllib.parse import urlencode @@ -88,3 +92,28 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, else: logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False + + +def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str: + """ + Compress and encode telemetry metadata to reduce serialized DAG size. + + :param metadata: Telemetry metadata dictionary + :returns: Base64-encoded zlib-compressed JSON string + """ + json_bytes = json.dumps(metadata).encode("utf-8") + compressed = zlib.compress(json_bytes, level=9) + return b64encode(compressed).decode("ascii") + + +def _decompress_telemetry_metadata(compressed_data: str) -> dict[str, Any]: + """ + Decompress and decode telemetry metadata. + + :param compressed_data: Base64-encoded zlib-compressed JSON string + :returns: Original metadata dictionary + """ + compressed_bytes = b64decode(compressed_data.encode("ascii")) + json_bytes = zlib.decompress(compressed_bytes) + result: dict[str, Any] = json.loads(json_bytes.decode("utf-8")) + return result diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 5da38ffb68..64e98c4759 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -14,7 +14,12 @@ from cosmos.airflow.task_group import DbtTaskGroup from cosmos.config import ExecutionConfig, RenderConfig from cosmos.constants import AIRFLOW_VERSION, InvocationMode, LoadMode, SourceRenderingBehavior, TestBehavior -from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks +from cosmos.listeners.dag_run_listener import ( + get_cosmos_telemetry_metadata, + on_dag_run_failed, + on_dag_run_success, + total_cosmos_tasks, +) from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -82,6 +87,26 @@ def test_not_cosmos_dag(): assert total_cosmos_tasks(dag) == 0 +def test_get_cosmos_telemetry_metadata_with_invalid_data(): + """Test that get_cosmos_telemetry_metadata handles invalid compressed data gracefully.""" + with DAG("test-dag", start_date=datetime(2022, 1, 1)) as dag: + # Set invalid base64 data that will fail decompression + dag.params["__cosmos_telemetry_metadata__"] = "invalid_base64_data!" + + # Should return empty dict instead of raising exception + result = get_cosmos_telemetry_metadata(dag) + assert result == {} + + +def test_get_cosmos_telemetry_metadata_with_no_metadata(): + """Test that get_cosmos_telemetry_metadata returns empty dict when no metadata present.""" + with DAG("test-dag", start_date=datetime(2022, 1, 1)) as dag: + pass + + result = get_cosmos_telemetry_metadata(dag) + assert result == {} + + def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun: if AIRFLOW_VERSION < Version("3.0"): # Airflow 2 and 3.0 diff --git a/tests/test_converter.py b/tests/test_converter.py index 4a94a52037..773691674b 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -12,6 +12,7 @@ from cosmos.dbt.graph import DbtGraph, DbtNode from cosmos.exceptions import CosmosValueError from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping +from cosmos.telemetry import _decompress_telemetry_metadata SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" @@ -1149,7 +1150,13 @@ def test_telemetry_metadata_storage(mock_load_dbt_graph): # Verify metadata is stored in dag.params assert "__cosmos_telemetry_metadata__" in dag.params - metadata = dag.params["__cosmos_telemetry_metadata__"] + compressed_metadata = dag.params["__cosmos_telemetry_metadata__"] + + # Verify it's compressed (should be a string) + assert isinstance(compressed_metadata, str) + + # Decompress to verify the contents + metadata = _decompress_telemetry_metadata(compressed_metadata) # Verify expected metadata keys are present assert "used_automatic_load_mode" in metadata diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index a85435ae93..e65fb7c46f 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -7,6 +7,45 @@ from cosmos import telemetry +def test_compress_telemetry_metadata_is_deterministic(): + """Test that compressing the same metadata multiple times produces identical output.""" + metadata = { + "cosmos_version": "1.8.0", + "airflow_version": "2.10.1", + "python_version": "3.11", + "execution_mode": "local", + "install_deps": True, + } + + # Compress the same metadata multiple times + compressed_1 = telemetry._compress_telemetry_metadata(metadata) + compressed_2 = telemetry._compress_telemetry_metadata(metadata) + compressed_3 = telemetry._compress_telemetry_metadata(metadata) + + # All compressed outputs should be identical (deterministic) + assert compressed_1 == compressed_2 + assert compressed_2 == compressed_3 + + +def test_compress_decompress_telemetry_metadata_roundtrip(): + """Test that metadata can be compressed and decompressed correctly.""" + original_metadata = { + "cosmos_version": "1.8.0", + "airflow_version": "2.10.1", + "python_version": "3.11", + "execution_mode": "local", + "install_deps": False, + "dbt_command": "run", + } + + # Compress and decompress + compressed = telemetry._compress_telemetry_metadata(original_metadata) + decompressed = telemetry._decompress_telemetry_metadata(compressed) + + # Should get back the original metadata + assert decompressed == original_metadata + + def test_should_emit_is_true_by_default(): assert telemetry.should_emit()