Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
from airflow.models.dagbag import DagBag
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
from packaging.version import Version

from . import utils as test_utils

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
AIRFLOW_VERSION = Version(airflow.__version__)
IGNORED_DAG_FILES = ["example_callbacks.py"]
IGNORED_DAG_FILES = ["example_callbacks.py", "example_http_operator_task.py"]

MIN_VER_DAG_FILE_VER: dict[str, list[str]] = {
# TaskFlow examples unrelated to dynamic task mapping work in earlier versions
Expand All @@ -32,15 +33,9 @@
],
"2.7": ["example_map_index_template.py"],
"2.4": ["example_external_sensor_dag.py"],
"2.9": ["example_map_index_template.py"],
}

# Add HTTP operator DAG to ignored files for providers-http versions without HttpOperator
try:
from airflow.providers.http.operators.http import HttpOperator
HTTP_OPERATOR_AVAILABLE = True
except ImportError:
HTTP_OPERATOR_AVAILABLE = False


@provide_session
def get_session(session=None):
Expand Down Expand Up @@ -88,18 +83,13 @@ def test_example_dag(session, dag_id: str):
dag_bag = get_dag_bag()
dag = dag_bag.get_dag(dag_id)

# Skip http_operator_example_dag in older Airflow versions without HttpOperator
if dag_id == "http_operator_example_dag" and not HTTP_OPERATOR_AVAILABLE:
pytest.skip(f"Skipping {dag_id} because HttpOperator is not available")

# Skip http_operator_example_dag in older Airflow versions
# since it has compatibility issues with our connection handling
if dag_id == "http_operator_example_dag" and AIRFLOW_VERSION < Version("2.7.0"):
pytest.skip(f"Skipping {dag_id} on Airflow version {AIRFLOW_VERSION}")

# This feature is available since Airflow 2.5:
# https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02
dag_run = None
if AIRFLOW_VERSION >= Version("2.5"):
dag.test()
dag_run = dag.test()
else:
test_utils.run_dag(dag)
dag_run = test_utils.run_dag(dag)

if dag_run is not None:
assert dag_run.state == DagRunState.SUCCESS
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_dag(

print("conn_file_path", conn_file_path)

return dr, session
return dr


def add_logger_if_needed(dag: DAG, ti: TaskInstance):
Expand Down