Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from typing import Callable, Dict

import aenum
import airflow
from packaging.version import Version

AIRFLOW_VERSION = Version(airflow.__version__)

BIGQUERY_PROFILE_TYPE = "bigquery"
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DBT_PROJECT_FILENAME = "dbt_project.yml"
Expand Down
5 changes: 1 addition & 4 deletions cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List

import airflow
from packaging.version import Version

from cosmos.constants import AIRFLOW_VERSION
from cosmos.log import get_logger

logger = get_logger(__name__)


AIRFLOW_VERSION = Version(airflow.__version__)


@dataclass
class CosmosEntity:
"""
Expand Down
7 changes: 2 additions & 5 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
import hashlib
from typing import TYPE_CHECKING

from airflow import __version__ as airflow_version
from airflow.listeners import hookimpl

if TYPE_CHECKING:
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun

from packaging import version

from cosmos import telemetry
from cosmos.constants import _AIRFLOW3_MAJOR_VERSION
from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION
from cosmos.log import get_logger

AIRFLOW_VERSION_MAJOR = version.parse(airflow_version).major
AIRFLOW_VERSION_MAJOR = AIRFLOW_VERSION.major

logger = get_logger(__name__)

Expand Down
4 changes: 1 addition & 3 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence

import airflow

from cosmos.operators.base import _sanitize_xcom_key

try:
Expand All @@ -23,12 +21,12 @@

from cosmos import settings
from cosmos.config import ProfileConfig
from cosmos.constants import AIRFLOW_VERSION
from cosmos.dataset import get_dataset_alias_name
from cosmos.exceptions import CosmosValueError
from cosmos.operators.local import AbstractDbtLocalBase
from cosmos.settings import remote_target_path, remote_target_path_conn_id

AIRFLOW_VERSION = Version(airflow.__version__)
DEFAULT_PRODUCER_ASYNC_TASK_ID = "dbt_setup_async"


Expand Down
9 changes: 3 additions & 6 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence
from urllib.parse import urlparse

import airflow
import jinja2
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from packaging.version import Version

if TYPE_CHECKING: # pragma: no cover
try:
from airflow.sdk.definitions.context import Context
except ImportError:
from airflow.utils.context import Context # type: ignore[attr-defined]

from airflow.version import version as airflow_version
from attrs import define
from packaging.version import Version

from cosmos import cache, settings

Expand All @@ -46,6 +44,7 @@
)
from cosmos.constants import (
_AIRFLOW3_MAJOR_VERSION,
AIRFLOW_VERSION,
DBT_DEPENDENCIES_FILE_NAMES,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
InvocationMode,
Expand Down Expand Up @@ -117,8 +116,6 @@
_sanitize_xcom_key,
)

AIRFLOW_VERSION = Version(airflow.__version__)

logger = get_logger(__name__)


Expand Down Expand Up @@ -323,7 +320,7 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu
if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote target path {target_path_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Object Storage feature is unavailable in Airflow version {AIRFLOW_VERSION}. Please upgrade to "
"Airflow 2.8 or later."
)

Expand Down
10 changes: 3 additions & 7 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
from threading import Lock
from typing import TYPE_CHECKING, Any, Callable, List, Union

import airflow
from packaging.version import Version

from cosmos._triggers.watcher import WatcherTrigger, _parse_compressed_xcom

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -33,8 +30,10 @@
except ImportError: # pragma: no cover
from airflow.operators.empty import EmptyOperator # type: ignore[no-redef]

from packaging.version import Version

from cosmos.config import ProfileConfig
from cosmos.constants import PRODUCER_WATCHER_TASK_ID, InvocationMode
from cosmos.constants import AIRFLOW_VERSION, PRODUCER_WATCHER_TASK_ID, InvocationMode
from cosmos.operators.base import (
DbtRunMixin,
DbtSeedMixin,
Expand All @@ -46,9 +45,6 @@
DbtSourceLocalOperator,
)

AIRFLOW_VERSION = Version(airflow.__version__)


try:
from dbt_common.events.base_types import EventMsg
except ImportError: # pragma: no cover
Expand Down
1 change: 1 addition & 0 deletions cosmos/plugin/airflow3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from cosmos.plugin.snippets import IFRAME_SCRIPT

# Airflow version gating: External views feature for the plugins used here (CosmosAF3Plugin) exist only in >= 3.1
# Note: We compute AIRFLOW_VERSION locally here (not from constants) so that tests can patch airflow.__version__ and reload this module
AIRFLOW_VERSION = Version(airflow.__version__)


Expand Down
4 changes: 1 addition & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import json
from unittest.mock import patch

import airflow
import pytest
from airflow.models.connection import Connection
from packaging.version import Version

AIRFLOW_VERSION = Version(airflow.__version__)

from cosmos.constants import AIRFLOW_VERSION

if AIRFLOW_VERSION >= Version("3.1"):
# Change introduced in Airflow 3.1.0
Expand Down
5 changes: 1 addition & 4 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
from subprocess import PIPE, Popen
from unittest.mock import MagicMock, patch

import airflow
import pytest
from airflow.models import Variable
from packaging.version import Version

from cosmos import settings
from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
_AIRFLOW3_MAJOR_VERSION,
AIRFLOW_VERSION,
DBT_LOG_FILENAME,
DBT_TARGET_DIR_NAME,
DbtResourceType,
Expand Down Expand Up @@ -51,8 +50,6 @@
SAMPLE_DBT_LS_OUTPUT = Path(__file__).parent.parent / "sample/sample_dbt_ls.txt"
SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none"))

AIRFLOW_VERSION = Version(airflow.__version__)

if AIRFLOW_VERSION.major >= _AIRFLOW3_MAJOR_VERSION:
object_storage_path = "airflow.sdk.ObjectStoragePath"
else:
Expand Down
11 changes: 5 additions & 6 deletions tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@
from unittest.mock import patch

import pytest
from airflow import __version__ as airflow_version
from airflow.models import DAG, DagRun
from airflow.utils.state import State
from packaging import version
from packaging.version import Version

from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig
from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import AIRFLOW_VERSION
from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks
from cosmos.profiles import PostgresUserPasswordProfileMapping

DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt"
DBT_PROJECT_NAME = "jaffle_shop"

AIRFLOW_VERSION = version.parse(airflow_version)
AIRFLOW_VERSION_MAJOR = AIRFLOW_VERSION.major

profile_config = ProfileConfig(
Expand Down Expand Up @@ -83,7 +82,7 @@ def test_not_cosmos_dag():


def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun:
if AIRFLOW_VERSION < version.Version("3.0"):
if AIRFLOW_VERSION < Version("3.0"):
# Airflow 2 and 3.0
dag_run = dag.create_dagrun(
state=State.NONE,
Expand Down Expand Up @@ -133,7 +132,7 @@ def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun:


@pytest.mark.skipif(
AIRFLOW_VERSION >= version.Version("3.1.0"),
AIRFLOW_VERSION >= Version("3.1.0"),
reason="TODO: Fix create_dag_run to work with AF 3.1 and remove this skip.",
)
@pytest.mark.integration
Expand Down Expand Up @@ -161,7 +160,7 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog):


@pytest.mark.skipif(
AIRFLOW_VERSION >= version.Version("3.1.0"), reason="TODO: Fix create_dag_run to work with and remove this skip."
AIRFLOW_VERSION >= Version("3.1.0"), reason="TODO: Fix create_dag_run to work with and remove this skip."
)
@pytest.mark.integration
@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled")
Expand Down
4 changes: 1 addition & 3 deletions tests/operators/_asynchronous/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
from pathlib import Path
from unittest.mock import MagicMock, Mock, mock_open, patch

import airflow
import pytest
from packaging.version import Version

from cosmos.config import ProfileConfig
from cosmos.constants import AIRFLOW_VERSION
from cosmos.hooks.subprocess import FullOutputSubprocessResult
from cosmos.operators._asynchronous import SetupAsyncOperator, TeardownAsyncOperator
from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator, _create_async_operator_class
from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator
from cosmos.operators._asynchronous.databricks import DbtRunAirflowAsyncDatabricksOperator
from cosmos.operators.local import DbtRunLocalOperator

AIRFLOW_VERSION = Version(airflow.__version__)


@pytest.mark.parametrize(
"profile_type, dbt_class, expected_operator_class",
Expand Down
5 changes: 1 addition & 4 deletions tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@
from pathlib import Path
from unittest.mock import MagicMock, patch

import airflow
import pytest
from airflow.models import DAG
from airflow.models.connection import Connection
from packaging.version import Version

from cosmos.config import ProfileConfig
from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, InvocationMode
from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION, InvocationMode
from cosmos.exceptions import CosmosValueError
from cosmos.operators.virtualenv import DbtCloneVirtualenvOperator, DbtVirtualenvBaseOperator
from cosmos.profiles import PostgresUserPasswordProfileMapping
from tests.utils import test_dag as run_test_dag

AIRFLOW_VERSION = Version(airflow.__version__)

DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop"

DAGS_FOLDER = Path(__file__).parent.parent.parent / "dev/dags/"
Expand Down
3 changes: 0 additions & 3 deletions tests/test_async_example_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@
from functools import lru_cache as cache


import airflow
import pytest
from airflow.models.dagbag import DagBag
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session
from packaging.version import Version

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
ALL_FILES_TO_IGNORE = [
f.name for f in EXAMPLE_DAGS_DIR.iterdir() if f.is_file() and f.suffix == ".py" and f.name != "simple_dag_async.py"
]

AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
AIRFLOW_VERSION = Version(airflow.__version__)


@provide_session
Expand Down
4 changes: 1 addition & 3 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,20 @@
from functools import lru_cache as cache


import airflow
import pytest
from airflow.models.dagbag import DagBag
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session
from dbt.version import get_installed_version as get_dbt_version
from packaging.version import Version

from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS
from cosmos.constants import AIRFLOW_VERSION, PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS

from . import utils as test_utils

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
DBT_VERSION = Version(get_dbt_version().to_version_string()[1:])
AIRFLOW_VERSION = Version(airflow.__version__)
KUBERNETES_DAGS = ["jaffle_shop_kubernetes"]

MIN_VER_DAG_FILE: dict[str, list[str]] = {
Expand Down
4 changes: 0 additions & 4 deletions tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import airflow
import pytest
from packaging.version import Version

import cosmos.log
from cosmos.log import CosmosRichLogger, get_logger
from cosmos.provider_info import get_provider_info

AIRFLOW_VERSION = Version(airflow.__version__)


def test_get_logger(monkeypatch):
monkeypatch.setattr(cosmos.log, "rich_logging", False)
Expand Down
3 changes: 1 addition & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Any

import sqlalchemy
from airflow import __version__ as airflow_version
from airflow.configuration import secrets_backend_list
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
Expand All @@ -22,7 +21,7 @@
from packaging.version import Version
from sqlalchemy.orm.session import Session

AIRFLOW_VERSION = version.parse(airflow_version)
from cosmos.constants import AIRFLOW_VERSION

log = logging.getLogger(__name__)

Expand Down