Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cosmos/plugin/cluster_policy.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING

from airflow.models.taskinstance import TaskInstance
from airflow.policies import hookimpl
from packaging.version import Version

from cosmos.constants import AIRFLOW_VERSION

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance

log = getLogger(__name__)


Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ dependencies = [
"types-python-dateutil",
"Werkzeug<3.0.0",
"pytest-asyncio",
# Needed by the regression test for issue #2620: the test exercises
# airflow.sentry's module-level ConfiguredSentry() path, which only
# runs when sentry_sdk is importable.
"sentry-sdk",
]
pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"]

Expand Down
35 changes: 35 additions & 0 deletions tests/plugin/test_cluster_policy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import subprocess
import sys
from types import SimpleNamespace
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -243,3 +246,35 @@ def test_queue_already_set_gets_overwritten(self):
task_instance_mutation_hook(task_instance)

assert task_instance.queue == "retry_queue"


def test_cluster_policy_load_does_not_break_airflow_sentry_init(tmp_path):
"""Regression for #2620: importing cosmos.plugin.cluster_policy must not
transitively load airflow.sentry. If it does, [sentry] before_send is
evaluated before the DAGs folder is on sys.path and Airflow startup
fails with AirflowConfigException.
"""
code = "import cosmos.plugin.cluster_policy # noqa: F401\n" "print('OK')\n"
env = {
**os.environ,
"AIRFLOW_HOME": str(tmp_path),
# User's scenario: sentry is on and before_send points at a module
# that is not importable at plugin-load time.
"AIRFLOW__SENTRY__SENTRY_ON": "True",
"AIRFLOW__SENTRY__BEFORE_SEND": "cosmos_2620_dags_folder_module.before_send",
}
result = subprocess.run(
[sys.executable, "-c", code],
capture_output=True,
text=True,
env=env,
)
assert result.returncode == 0 and "OK" in result.stdout, (
"Issue #2620 reproduced: importing cosmos.plugin.cluster_policy with "
"[sentry] before_send pointing at a not-yet-importable module "
"transitively loaded airflow.sentry and raised AirflowConfigException "
"at plugin-load time.\n"
f"returncode: {result.returncode}\n"
f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}"
)
Loading