Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,11 @@ def test_run_operator_dataset_inlets_and_outlets(caplog):
reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards(caplog):
from airflow.models.dataset import DatasetAliasModel
def test_run_operator_dataset_inlets_and_outlets_airflow_210(caplog):
try:
from airflow.models.asset import AssetAliasModel
except ModuleNotFoundError:
from airflow.models.dataset import DatasetAliasModel as AssetAliasModel
Comment thread
tatiana marked this conversation as resolved.

with DAG("test_id_1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
Expand Down Expand Up @@ -504,8 +507,8 @@ def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards(caplog):
seed_operator >> run_operator >> test_operator

assert seed_operator.outlets == [] # because emit_datasets=False,
assert run_operator.outlets == [DatasetAliasModel(name="test_id_1__run")]
assert test_operator.outlets == [DatasetAliasModel(name="test_id_1__test")]
assert run_operator.outlets == [AssetAliasModel(name="test_id_1__run")]
assert test_operator.outlets == [AssetAliasModel(name="test_id_1__test")]


@patch("cosmos.settings.enable_dataset_alias", 0)
Expand Down Expand Up @@ -564,7 +567,7 @@ def test_run_operator_dataset_emission_is_skipped(caplog):
assert run_operator.outlets == []


# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1705
# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1713
@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0")
@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4")
Expand All @@ -574,7 +577,10 @@ def test_run_operator_dataset_emission_is_skipped(caplog):
@pytest.mark.integration
@patch("cosmos.settings.enable_dataset_alias", 0)
def test_run_operator_dataset_url_encoded_names(caplog):
from airflow.datasets import Dataset
try:
from airflow.sdk.definitions.asset import Dataset
except ImportError:
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
run_operator = DbtRunLocalOperator(
Expand Down