Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,4 @@ def _missing_value_(cls, value): # type: ignore
TELEMETRY_VERSION = "v1"
TELEMETRY_TIMEOUT = 1.0

_AIRFLOW3_VERSION = Version("3.0.0a1")
_AIRFLOW3_MAJOR_VERSION = 3
10 changes: 5 additions & 5 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
_get_latest_cached_package_lockfile,
is_cache_package_lockfile_enabled,
)
from cosmos.constants import _AIRFLOW3_VERSION, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, InvocationMode
from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, InvocationMode
from cosmos.dataset import get_dataset_alias_name
from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.exceptions import AirflowCompatibilityError, CosmosDbtRunError, CosmosValueError
Expand Down Expand Up @@ -481,7 +481,7 @@ def _handle_datasets(self, context: Context) -> None:
outlets = self.get_datasets("outputs")
self.log.info("Inlets: %s", inlets)
self.log.info("Outlets: %s", outlets)
if AIRFLOW_VERSION < _AIRFLOW3_VERSION:
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
self.register_dataset(inlets, outlets, context)

def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None:
Expand All @@ -492,7 +492,7 @@ def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None:
cache._update_partial_parse_cache(partial_parse_file, self.cache_dir)

def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None:
if AIRFLOW_VERSION < _AIRFLOW3_VERSION:
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
self.store_freshness_json(tmp_project_dir, context)
self.store_compiled_sql(tmp_project_dir, context)
if self.should_upload_compiled_sql:
Expand Down Expand Up @@ -559,7 +559,7 @@ def run_command(
env=env,
cwd=tmp_project_dir,
)
if is_openlineage_available and AIRFLOW_VERSION.major < _AIRFLOW3_VERSION.major:
if is_openlineage_available and AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
# Airflow 3 does not support associating 'openlineage_events_completes' with task_instance. The
# support for this is expected to be worked upon while addressing issue:
# https://github.com/astronomer/astronomer-cosmos/issues/1635
Expand Down Expand Up @@ -766,7 +766,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
if arg_key in base_operator_args:
base_operator_kwargs[arg_key] = arg_value
AbstractDbtLocalBase.__init__(self, **abstract_dbt_local_base_kwargs)
if AIRFLOW_VERSION < _AIRFLOW3_VERSION:
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
if (
kwargs.get("emit_datasets", True)
and settings.enable_dataset_alias
Expand Down