diff --git a/cosmos/constants.py b/cosmos/constants.py index 3b2c583637..4b8b7ca957 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -168,4 +168,4 @@ def _missing_value_(cls, value): # type: ignore TELEMETRY_VERSION = "v1" TELEMETRY_TIMEOUT = 1.0 -_AIRFLOW3_VERSION = Version("3.0.0a1") +_AIRFLOW3_MAJOR_VERSION = 3 diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index ea2c2838ac..21a1045a54 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -35,7 +35,7 @@ _get_latest_cached_package_lockfile, is_cache_package_lockfile_enabled, ) -from cosmos.constants import _AIRFLOW3_VERSION, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, InvocationMode +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, InvocationMode from cosmos.dataset import get_dataset_alias_name from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError, CosmosDbtRunError, CosmosValueError @@ -481,7 +481,7 @@ def _handle_datasets(self, context: Context) -> None: outlets = self.get_datasets("outputs") self.log.info("Inlets: %s", inlets) self.log.info("Outlets: %s", outlets) - if AIRFLOW_VERSION < _AIRFLOW3_VERSION: + if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: self.register_dataset(inlets, outlets, context) def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None: @@ -492,7 +492,7 @@ def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None: cache._update_partial_parse_cache(partial_parse_file, self.cache_dir) def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None: - if AIRFLOW_VERSION < _AIRFLOW3_VERSION: + if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: self.store_freshness_json(tmp_project_dir, context) self.store_compiled_sql(tmp_project_dir, context) if self.should_upload_compiled_sql: @@ -559,7 +559,7 @@ def run_command( env=env, cwd=tmp_project_dir, ) - if is_openlineage_available and AIRFLOW_VERSION.major < _AIRFLOW3_VERSION.major: + if is_openlineage_available and AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: # Airflow 3 does not support associating 'openlineage_events_completes' with task_instance. The # support for this is expected to be worked upon while addressing issue: # https://github.com/astronomer/astronomer-cosmos/issues/1635 @@ -766,7 +766,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: if arg_key in base_operator_args: base_operator_kwargs[arg_key] = arg_value AbstractDbtLocalBase.__init__(self, **abstract_dbt_local_base_kwargs) - if AIRFLOW_VERSION < _AIRFLOW3_VERSION: + if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION: if ( kwargs.get("emit_datasets", True) and settings.enable_dataset_alias