Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# TODO: Move _get_profile_config_attribute at common place
from cosmos.listeners.task_instance_listener import _get_profile_config_attribute
from cosmos.log import get_logger
from cosmos.telemetry import _compress_telemetry_metadata
from cosmos.versioning import _create_folder_version_hash

logger = get_logger(__name__)
Expand Down Expand Up @@ -437,5 +438,8 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901

# Store metadata in dag.params which is preserved during serialization
# Using a key that's unlikely to conflict with user params
dag.params["__cosmos_telemetry_metadata__"] = Param(default=metadata, const=metadata)
logger.debug(f"Stored Cosmos telemetry metadata in DAG {dag.dag_id} params: {metadata}")
compressed_metadata = _compress_telemetry_metadata(metadata)
dag.params["__cosmos_telemetry_metadata__"] = Param(default=compressed_metadata, const=compressed_metadata)
logger.debug(
f"Stored compressed Cosmos telemetry metadata in DAG {dag.dag_id} params (original size: {len(str(metadata))} bytes, compressed: {len(compressed_metadata)} bytes)"
)
18 changes: 15 additions & 3 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import binascii
import hashlib
import json
import zlib
from typing import TYPE_CHECKING, Any

from airflow.listeners import hookimpl
Expand All @@ -12,6 +15,7 @@
from cosmos import telemetry
from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION
from cosmos.log import get_logger
from cosmos.telemetry import _decompress_telemetry_metadata

AIRFLOW_VERSION_MAJOR = AIRFLOW_VERSION.major

Expand Down Expand Up @@ -63,9 +67,17 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]:

Returns the metadata dictionary stored by the converter in dag.params, or an empty dict if not present.
"""
# Metadata is stored in dag.params to survive serialization
metadata = dag.params.get("__cosmos_telemetry_metadata__", {})
return metadata if isinstance(metadata, dict) else {}
# Metadata is stored as compressed string in dag.params to survive serialization
compressed_metadata = dag.params.get("__cosmos_telemetry_metadata__")

if not compressed_metadata:
return {}

try:
return _decompress_telemetry_metadata(compressed_metadata)
except (binascii.Error, zlib.error, json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}")
return {}


@hookimpl
Expand Down
29 changes: 29 additions & 0 deletions cosmos/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

import json
import platform
import zlib
from base64 import b64decode, b64encode
from typing import Any
from urllib import parse
from urllib.parse import urlencode

Expand Down Expand Up @@ -88,3 +92,28 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str,
else:
logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.")
return False


def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str:
"""
Compress and encode telemetry metadata to reduce serialized DAG size.

:param metadata: Telemetry metadata dictionary
:returns: Base64-encoded zlib-compressed JSON string
"""
json_bytes = json.dumps(metadata).encode("utf-8")
compressed = zlib.compress(json_bytes, level=9)
return b64encode(compressed).decode("ascii")


def _decompress_telemetry_metadata(compressed_data: str) -> dict[str, Any]:
"""
Decompress and decode telemetry metadata.

:param compressed_data: Base64-encoded zlib-compressed JSON string
:returns: Original metadata dictionary
"""
compressed_bytes = b64decode(compressed_data.encode("ascii"))
json_bytes = zlib.decompress(compressed_bytes)
result: dict[str, Any] = json.loads(json_bytes.decode("utf-8"))
return result
27 changes: 26 additions & 1 deletion tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.config import ExecutionConfig, RenderConfig
from cosmos.constants import AIRFLOW_VERSION, InvocationMode, LoadMode, SourceRenderingBehavior, TestBehavior
from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks
from cosmos.listeners.dag_run_listener import (
get_cosmos_telemetry_metadata,
on_dag_run_failed,
on_dag_run_success,
total_cosmos_tasks,
)
from cosmos.profiles import PostgresUserPasswordProfileMapping

DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt"
Expand Down Expand Up @@ -82,6 +87,26 @@ def test_not_cosmos_dag():
assert total_cosmos_tasks(dag) == 0


def test_get_cosmos_telemetry_metadata_with_invalid_data():
"""Test that get_cosmos_telemetry_metadata handles invalid compressed data gracefully."""
with DAG("test-dag", start_date=datetime(2022, 1, 1)) as dag:
# Set invalid base64 data that will fail decompression
dag.params["__cosmos_telemetry_metadata__"] = "invalid_base64_data!"

# Should return empty dict instead of raising exception
result = get_cosmos_telemetry_metadata(dag)
assert result == {}


def test_get_cosmos_telemetry_metadata_with_no_metadata():
"""Test that get_cosmos_telemetry_metadata returns empty dict when no metadata present."""
with DAG("test-dag", start_date=datetime(2022, 1, 1)) as dag:
pass

result = get_cosmos_telemetry_metadata(dag)
assert result == {}


def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun:
if AIRFLOW_VERSION < Version("3.0"):
# Airflow 2 and 3.0
Expand Down
9 changes: 8 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cosmos.dbt.graph import DbtGraph, DbtNode
from cosmos.exceptions import CosmosValueError
from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping
from cosmos.telemetry import _decompress_telemetry_metadata

SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml"
SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"
Expand Down Expand Up @@ -1149,7 +1150,13 @@ def test_telemetry_metadata_storage(mock_load_dbt_graph):

# Verify metadata is stored in dag.params
assert "__cosmos_telemetry_metadata__" in dag.params
metadata = dag.params["__cosmos_telemetry_metadata__"]
compressed_metadata = dag.params["__cosmos_telemetry_metadata__"]

# Verify it's compressed (should be a string)
assert isinstance(compressed_metadata, str)

# Decompress to verify the contents
metadata = _decompress_telemetry_metadata(compressed_metadata)

# Verify expected metadata keys are present
assert "used_automatic_load_mode" in metadata
Expand Down
39 changes: 39 additions & 0 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,45 @@
from cosmos import telemetry


def test_compress_telemetry_metadata_is_deterministic():
"""Test that compressing the same metadata multiple times produces identical output."""
metadata = {
"cosmos_version": "1.8.0",
"airflow_version": "2.10.1",
"python_version": "3.11",
"execution_mode": "local",
"install_deps": True,
}

# Compress the same metadata multiple times
compressed_1 = telemetry._compress_telemetry_metadata(metadata)
compressed_2 = telemetry._compress_telemetry_metadata(metadata)
compressed_3 = telemetry._compress_telemetry_metadata(metadata)

# All compressed outputs should be identical (deterministic)
assert compressed_1 == compressed_2
assert compressed_2 == compressed_3


def test_compress_decompress_telemetry_metadata_roundtrip():
"""Test that metadata can be compressed and decompressed correctly."""
original_metadata = {
"cosmos_version": "1.8.0",
"airflow_version": "2.10.1",
"python_version": "3.11",
"execution_mode": "local",
"install_deps": False,
"dbt_command": "run",
}

# Compress and decompress
compressed = telemetry._compress_telemetry_metadata(original_metadata)
decompressed = telemetry._decompress_telemetry_metadata(compressed)

# Should get back the original metadata
assert decompressed == original_metadata


def test_should_emit_is_true_by_default():
assert telemetry.should_emit()

Expand Down