diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 778e3e3a..a2d4b398 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -12,6 +12,7 @@ from airflow.models.dagbag import DagBag from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState from packaging.version import Version from . import utils as test_utils @@ -19,7 +20,7 @@ EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" AIRFLOW_VERSION = Version(airflow.__version__) -IGNORED_DAG_FILES = ["example_callbacks.py"] +IGNORED_DAG_FILES = ["example_callbacks.py", "example_http_operator_task.py"] MIN_VER_DAG_FILE_VER: dict[str, list[str]] = { # TaskFlow examples unrelated to dynamic task mapping work in earlier versions @@ -32,15 +33,9 @@ ], "2.7": ["example_map_index_template.py"], "2.4": ["example_external_sensor_dag.py"], + "2.9": ["example_map_index_template.py"], } -# Add HTTP operator DAG to ignored files for providers-http versions without HttpOperator -try: - from airflow.providers.http.operators.http import HttpOperator - HTTP_OPERATOR_AVAILABLE = True -except ImportError: - HTTP_OPERATOR_AVAILABLE = False - @provide_session def get_session(session=None): @@ -88,18 +83,13 @@ def test_example_dag(session, dag_id: str): dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) - # Skip http_operator_example_dag in older Airflow versions without HttpOperator - if dag_id == "http_operator_example_dag" and not HTTP_OPERATOR_AVAILABLE: - pytest.skip(f"Skipping {dag_id} because HttpOperator is not available") - - # Skip http_operator_example_dag in older Airflow versions - # since it has compatibility issues with our connection handling - if dag_id == "http_operator_example_dag" and AIRFLOW_VERSION < Version("2.7.0"): - pytest.skip(f"Skipping {dag_id} on Airflow version {AIRFLOW_VERSION}") - # This feature is available since Airflow 2.5: # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 + dag_run = None if AIRFLOW_VERSION >= Version("2.5"): - dag.test() + dag_run = dag.test() else: - test_utils.run_dag(dag) + dag_run = test_utils.run_dag(dag) + + if dag_run is not None: + assert dag_run.state == DagRunState.SUCCESS diff --git a/tests/utils.py b/tests/utils.py index afe46c1d..d7a5b8ba 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -97,7 +97,7 @@ def test_dag( print("conn_file_path", conn_file_path) - return dr, session + return dr def add_logger_if_needed(dag: DAG, ti: TaskInstance):