Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main,optimise-airflow-3-1-IT]
# Also run on pull requests originating from forks. Although this is insecure by default, we need it to run
# integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually
# approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes.
Expand Down
2 changes: 1 addition & 1 deletion scripts/test/integration-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ if [ "$AIRFLOW_VERSION" = "2.6.0" ] ; then
pip freeze | grep -i pydantic
fi

pip install -U openlineage-airflow apache-airflow==$AIRFLOW_VERSION
pip install -U openlineage-airflow apache-airflow-providers-google apache-airflow==$AIRFLOW_VERSION

if [ "$AIRFLOW_VERSION" = "3.1.0" ] ; then
# This error was happening only in Airflow 3.1:
Expand Down
2 changes: 2 additions & 0 deletions scripts/test/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ pytest -vv \
--ignore=tests/test_async_example_dag.py \
--ignore=tests/test_example_k8s_dags.py \
--ignore=tests/operators/test_watcher_kubernetes_integration.py \
--ignore=tests/test_example_dags.py \
--ignore=tests/test_example_dags_no_connections.py \
-k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)'
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from datetime import datetime
from unittest.mock import patch

import pytest
Expand All @@ -15,6 +16,13 @@
base_operator_get_connection_path = "airflow.hooks.base.BaseHook.get_connection"


def pytest_runtest_logreport(report):
"""Log timestamp when tests pass/fail to help diagnose slow transitions."""
if report.when == "call": # Only log for the actual test call, not setup/teardown
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f" [{timestamp}] {report.nodeid} {report.outcome.upper()}")


@pytest.fixture()
def mock_bigquery_conn(): # type: ignore
"""
Expand Down
4 changes: 3 additions & 1 deletion tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun:
session.commit()

# This creates both DagModel and DagVersion records
dagbag = DagBag(include_examples=False)
# NOTE: dag_folder=None is required to create an empty DagBag instead of
# loading all DAGs from the default DAGS_FOLDER (which would be slow)
dagbag = DagBag(dag_folder=None, include_examples=False)
dagbag.bag_dag(dag)
sync_bag_to_db(dagbag, bundle_name="test_bundle_listener", bundle_version="1")

Expand Down
42 changes: 40 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import sys
import time
import warnings
from datetime import datetime
from typing import Any
Expand Down Expand Up @@ -34,7 +35,18 @@ def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bo
"""Check if a DAG was successful, if that Airflow version allows it."""
if dag_run is not None:
if expect_success:
return dag_run.state == DagRunState.SUCCESS
if dag_run.state != DagRunState.SUCCESS:
return False
# Verify all task instances actually reached a terminal state.
for ti in dag_run.get_task_instances():
if ti.state not in ("success", "skipped", "upstream_failed"):
log.error(
"Task %s is in unexpected state '%s' (expected success, skipped, or upstream_failed)",
ti.task_id,
ti.state,
)
return False
return True
else:
return dag_run.state == DagRunState.FAILED
return True
Expand All @@ -49,22 +61,48 @@ def new_test_dag(dag: DAG) -> DagRun:
from airflow.models.dagbundle import DagBundleModel
from airflow.utils.session import create_session

t0 = time.perf_counter()

# Create DagBundle if it doesn't exist (required for DagModel foreign key)
# This mimics what get_bagged_dag does via manager.sync_bundles_to_db()
with create_session() as session:
dag_bundle = DagBundleModel(name="test_bundle")
session.merge(dag_bundle)
session.commit()

t1 = time.perf_counter()
print(f"[TIMING] DagBundle creation: {t1 - t0:.3f}s")

# This creates both DagModel and DagVersion records
dagbag = DagBag(include_examples=False)
# NOTE: dag_folder=None is required to create an empty DagBag instead of
# loading all DAGs from the default DAGS_FOLDER (which would be slow)
dagbag = DagBag(dag_folder=None, include_examples=False)
dagbag.bag_dag(dag)

t2 = time.perf_counter()
print(f"[TIMING] DagBag creation + bag_dag: {t2 - t1:.3f}s")

sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1")

t3 = time.perf_counter()
print(f"[TIMING] sync_bag_to_db: {t3 - t2:.3f}s")

dr = dag.test(logical_date=timezone.utcnow())

t4 = time.perf_counter()
print(f"[TIMING] dag.test() execution: {t4 - t3:.3f}s")
print(f"[TIMING] Total new_test_dag for {dag.dag_id}: {t4 - t0:.3f}s")

elif AIRFLOW_VERSION >= version.Version("3.0"):
t0 = time.perf_counter()
dr = dag.test(logical_date=timezone.utcnow())
t1 = time.perf_counter()
print(f"[TIMING] dag.test() execution for {dag.dag_id}: {t1 - t0:.3f}s")
else:
t0 = time.perf_counter()
dr = dag.test()
t1 = time.perf_counter()
print(f"[TIMING] dag.test() execution for {dag.dag_id}: {t1 - t0:.3f}s")
return dr


Expand Down
Loading