Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions airflow-core/src/airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import sys
import weakref
from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence
from functools import cached_property, lru_cache
from functools import cache, cached_property, lru_cache
from inspect import signature
from textwrap import dedent
from typing import (
Expand Down Expand Up @@ -140,7 +140,6 @@
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import BaseEventTrigger

HAS_KUBERNETES: bool
try:
from kubernetes.client import models as k8s # noqa: TC004

Expand Down Expand Up @@ -3855,6 +3854,7 @@ class SerializedAssetWatcher(AssetWatcher):
trigger: dict


@cache
def _has_kubernetes(attempt_import: bool = False) -> bool:
"""
Check if kubernetes libraries are available.
Expand All @@ -3863,28 +3863,21 @@ def _has_kubernetes(attempt_import: bool = False) -> bool:
False, only check if already in sys.modules (avoids expensive import).
:return: True if kubernetes libraries are available, False otherwise.
"""
global HAS_KUBERNETES
if "HAS_KUBERNETES" in globals():
return HAS_KUBERNETES

# Check if kubernetes is already imported before triggering expensive import
if "kubernetes.client" not in sys.modules and not attempt_import:
HAS_KUBERNETES = False
return False

# Loading kube modules is expensive, so delay it until the last moment

try:
from kubernetes.client import models as k8s

from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator

globals()["k8s"] = k8s
globals()["PodGenerator"] = PodGenerator
HAS_KUBERNETES = True
return True
except ImportError:
HAS_KUBERNETES = False
return HAS_KUBERNETES
return False


AssetT = TypeVar("AssetT", bound=BaseAsset, covariant=True)
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/tests/unit/models/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
from unittest import mock

import pytest
from cryptography.fernet import Fernet

from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.models import Connection
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
from airflow.sdk.execution_time.comms import ErrorResponse

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_connections

if TYPE_CHECKING:
Expand Down Expand Up @@ -196,6 +198,7 @@ def test_parse_from_uri(
),
],
)
@conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()})
def test_get_uri(self, connection, expected_uri):
assert connection.get_uri() == expected_uri

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ def test_has_kubernetes_no_import_when_not_needed(self):
pytest.skip("Kubernetes already imported, cannot test import avoidance")

# Call _has_kubernetes() - should check sys.modules and return False without importing
_has_kubernetes.cache_clear()
result = _has_kubernetes()

assert result is False
Expand All @@ -747,6 +748,7 @@ def test_has_kubernetes_uses_existing_import(self):
pytest.importorskip("kubernetes")

# Now k8s is imported, should return True
_has_kubernetes.cache_clear()
result = _has_kubernetes()

assert result is True