Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,9 @@ function check_airflow_python_client_installation() {
}

function start_api_server_with_examples(){
if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" ]]; then
# check if we should not start the api server with examples by checking if both
# START_API_SERVER_WITH_EXAMPLES is false AND the TEST_GROUP env var is not equal to "system"
if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" && ${TEST_GROUP:=""} != "system" ]]; then
return
fi
export AIRFLOW__CORE__LOAD_EXAMPLES=True
Expand Down
15 changes: 10 additions & 5 deletions airflow-core/src/airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,23 @@ class ExecuteTask(BaseWorkload):

@classmethod
def make(
cls, ti: TIModel, dag_rel_path: Path | None = None, generator: JWTGenerator | None = None
cls,
ti: TIModel,
dag_rel_path: Path | None = None,
generator: JWTGenerator | None = None,
bundle_info: BundleInfo | None = None,
) -> ExecuteTask:
from pathlib import Path

from airflow.utils.helpers import log_filename_template_renderer

ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
ser_ti.parent_context_carrier = ti.dag_run.context_carrier
bundle_info = BundleInfo(
name=ti.dag_model.bundle_name,
version=ti.dag_run.bundle_version,
)
if not bundle_info:
bundle_info = BundleInfo(
name=ti.dag_model.bundle_name,
version=ti.dag_run.bundle_version,
)
fname = log_filename_template_renderer()(ti=ti)
token = ""

Expand Down
30 changes: 25 additions & 5 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from contextlib import ExitStack
from datetime import datetime, timedelta
from functools import cache
from pathlib import Path
from re import Pattern
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -76,6 +77,7 @@
UnknownExecutorException,
)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.executors.workloads import BundleInfo
from airflow.models.asset import (
AssetDagRunQueue,
AssetModel,
Expand Down Expand Up @@ -235,10 +237,10 @@ def get_asset_triggered_next_run_info(
}


def _triggerer_is_healthy():
def _triggerer_is_healthy(session: Session):
from airflow.jobs.triggerer_job_runner import TriggererJobRunner

job = TriggererJobRunner.most_recent_job()
job = TriggererJobRunner.most_recent_job(session=session)
return job and job.is_alive()


Expand Down Expand Up @@ -1715,7 +1717,7 @@ def add_logger_if_needed(ti: TaskInstance):
self.log.warning("No tasks to run. unrunnable tasks: %s", ids_unrunnable)
time.sleep(1)

triggerer_running = _triggerer_is_healthy()
triggerer_running = _triggerer_is_healthy(session)
for ti in scheduled_tis:
ti.task = tasks[ti.task_id]

Expand All @@ -1728,8 +1730,26 @@ def add_logger_if_needed(ti: TaskInstance):
if use_executor:
if executor.has_task(ti):
continue
# Send the task to the executor
executor.queue_task_instance(ti, ignore_ti_state=True)
# TODO: Task-SDK: This check is transitionary. Remove once all executors are ported over.
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor

if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
workload = workloads.ExecuteTask.make(
ti,
dag_rel_path=Path(self.fileloc),
generator=executor.jwt_generator,
# For the system test/debug purpose, we use the default bundle which uses
# local file system. If it turns out to be a feature people want, we could
# plumb the Bundle to use as a parameter to dag.test
bundle_info=BundleInfo(name="dags-folder"),
)
executor.queue_workload(workload, session=session)
ti.state = TaskInstanceState.QUEUED
session.commit()
else:
# Send the task to the executor
executor.queue_task_instance(ti, ignore_ti_state=True)
else:
# Run the task locally
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ def system_tests(
collect_only=collect_only,
enable_coverage=enable_coverage,
forward_credentials=forward_credentials,
forward_ports=False,
forward_ports=True,
github_repository=github_repository,
integration=(),
keep_env_variables=keep_env_variables,
Expand Down
3 changes: 3 additions & 0 deletions dev/breeze/src/airflow_breeze/utils/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ def generate_args_for_pytest(
args.append(f"--ignore={group_folder}")
if test_group not in IGNORE_DB_INIT_FOR_TEST_GROUPS:
args.append("--with-db-init")
if test_group == GroupOfTests.SYSTEM:
# System tests will be inited when the api server is started
args.append("--without-db-init")
if test_group == GroupOfTests.PYTHON_API_CLIENT:
args.append("--ignore-glob=clients/python/tmp/*")
args.extend(get_suspended_provider_args())
Expand Down
10 changes: 8 additions & 2 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ def pytest_addoption(parser: pytest.Parser):
"--with-db-init",
action="store_true",
dest="db_init",
help="Forces database initialization before tests",
help="Forces database initialization before tests, if false it a DB reset still may occur.",
)
group.addoption(
"--without-db-init",
action="store_true",
dest="no_db_init",
help="Forces NO database initialization before tests, takes precedent over --with-db-init.",
)
group.addoption(
"--integration",
Expand Down Expand Up @@ -343,7 +349,7 @@ def initialize_airflow_tests(request):

# Initialize Airflow db if required
lock_file = os.path.join(airflow_home, ".airflow_db_initialised")
if not skip_db_tests:
if not skip_db_tests and not request.config.option.no_db_init:
if request.config.option.db_init:
from tests_common.test_utils.db import initial_db_init

Expand Down
4 changes: 3 additions & 1 deletion scripts/docker/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ function check_airflow_python_client_installation() {
}

function start_api_server_with_examples(){
if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" ]]; then
# check if we should not start the api server with examples by checking if both
# START_API_SERVER_WITH_EXAMPLES is false AND the TEST_GROUP env var is not equal to "system"
if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" && ${TEST_GROUP:=""} != "system" ]]; then
return
fi
export AIRFLOW__CORE__LOAD_EXAMPLES=True
Expand Down