Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:

for bundle in all_bundles:
if bundle.name in bundles_to_search:
dagbag = DagBag(bundle.path, bundle_path=bundle.path)
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
dagbag.collect_dags()
dags_list.extend(list(dagbag.dags.values()))
dagbag_import_errors += len(dagbag.import_errors)
Expand Down Expand Up @@ -472,7 +472,7 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None:

for bundle in all_bundles:
if bundle.name in bundles_to_search:
dagbag = DagBag(bundle.path, bundle_path=bundle.path)
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
for filename, errors in dagbag.import_errors.items():
data.append({"bundle_name": bundle.name, "filepath": filename, "error": errors})
else:
Expand Down Expand Up @@ -524,7 +524,7 @@ def dag_report(args) -> None:
if bundle.name not in bundles_to_reserialize:
continue
bundle.initialize()
dagbag = DagBag(bundle.path, include_examples=False)
dagbag = DagBag(bundle.path, bundle_name=bundle.name, include_examples=False)
all_dagbag_stats.extend(dagbag.dagbag_stats)

AirflowConsole().print_as(
Expand Down Expand Up @@ -688,5 +688,7 @@ def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
if bundle.name not in bundles_to_reserialize:
continue
bundle.initialize()
dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False)
dag_bag = DagBag(
bundle.path, bundle_path=bundle.path, bundle_name=bundle.name, include_examples=False
)
sync_bag_to_db(dag_bag, bundle.name, bundle_version=bundle.get_current_version(), session=session)
42 changes: 30 additions & 12 deletions airflow-core/src/airflow/dag_processing/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,25 @@ def handle_timeout(signum, frame):
signal.setitimer(signal.ITIMER_REAL, 0)


def _validate_executor_fields(dag: DAG) -> None:
def _executor_exists(executor_name: str, team_name: str | None) -> bool:
"""Check if executor exists, with global fallback for teams."""
try:
# First pass check for team-specific executor or a global executor (i.e. team_name=None)
ExecutorLoader.lookup_executor_name_by_str(executor_name, team_name=team_name)
return True
except UnknownExecutorException:
if team_name:
# If we had a team_name but didn't find an executor, check if there is a global executor that
# satisfies the request.
try:
ExecutorLoader.lookup_executor_name_by_str(executor_name, team_name=None)
return True
except UnknownExecutorException:
pass
return False


def _validate_executor_fields(dag: DAG, bundle_name: str | None = None) -> None:
"""Validate that executors specified in tasks are available and owned by the same team as the dag bundle."""
import logging

Expand All @@ -144,32 +162,30 @@ def _validate_executor_fields(dag: DAG) -> None:
# Check if multi team is available by reading the multi_team configuration (which is boolean)
if conf.getboolean("core", "multi_team"):
# Get team name from bundle configuration if available
if hasattr(dag, "bundle_name") and dag.bundle_name:
if bundle_name:
from airflow.dag_processing.bundles.manager import DagBundlesManager

bundle_manager = DagBundlesManager()
bundle_config = bundle_manager._bundle_config[dag.bundle_name]
bundle_config = bundle_manager._bundle_config[bundle_name]

dag_team_name = bundle_config.team_name
if dag_team_name:
log.debug(
"Found team '%s' for DAG '%s' via bundle '%s'", dag_team_name, dag.dag_id, dag.bundle_name
"Found team '%s' for DAG '%s' via bundle '%s'", dag_team_name, dag.dag_id, bundle_name
)

for task in dag.tasks:
if not task.executor:
continue
try:
# Validate that the executor exists and is available for the DAG's team
ExecutorLoader.lookup_executor_name_by_str(task.executor, team_name=dag_team_name)
except UnknownExecutorException:

if not _executor_exists(task.executor, dag_team_name):
if dag_team_name:
raise UnknownExecutorException(
f"Task '{task.task_id}' specifies executor '{task.executor}', which is not available "
f"for team '{dag_team_name}' (the team associated with DAG '{dag.dag_id}'). "
f"Make sure '{task.executor}' is configured for team '{dag_team_name}' in your "
f"for team '{dag_team_name}' (the team associated with DAG '{dag.dag_id}') or as a global executor. "
f"Make sure '{task.executor}' is configured for team '{dag_team_name}' or globally in your "
"[core] executors configuration, or update the task's executor to use one of the "
f"configured executors for team '{dag_team_name}'."
f"configured executors for team '{dag_team_name}' or available global executors."
)
raise UnknownExecutorException(
f"Task '{task.task_id}' specifies executor '{task.executor}', which is not available. "
Expand Down Expand Up @@ -210,9 +226,11 @@ def __init__(
collect_dags: bool = True,
known_pools: set[str] | None = None,
bundle_path: Path | None = None,
bundle_name: str | None = None,
):
super().__init__()
self.bundle_path = bundle_path
self.bundle_name = bundle_name
include_examples = (
include_examples
if isinstance(include_examples, bool)
Expand Down Expand Up @@ -528,7 +546,7 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):
dag.relative_fileloc = relative_fileloc
try:
dag.validate()
_validate_executor_fields(dag)
_validate_executor_fields(dag, self.bundle_name)
self.bag_dag(dag=dag)
except AirflowClusterPolicySkipDag:
pass
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess:
id=id,
path=dag_file.absolute_path,
bundle_path=cast("Path", dag_file.bundle_path),
bundle_name=dag_file.bundle_name,
callbacks=callback_to_execute_for_file,
selector=self.selector,
logger=logger,
Expand Down
9 changes: 8 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class DagFileParseRequest(BaseModel):
bundle_path: Path
"""Passing bundle path around lets us figure out relative file path."""

bundle_name: str
"""Bundle name for team-specific executor validation."""

callback_requests: list[CallbackRequest] = Field(default_factory=list)
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"

Expand Down Expand Up @@ -203,6 +206,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP
bag = DagBag(
dag_folder=msg.file,
bundle_path=msg.bundle_path,
bundle_name=msg.bundle_name,
include_examples=False,
load_op_links=False,
)
Expand Down Expand Up @@ -493,6 +497,7 @@ def start( # type: ignore[override]
*,
path: str | os.PathLike[str],
bundle_path: Path,
bundle_name: str,
callbacks: list[CallbackRequest],
target: Callable[[], None] = _parse_file_entrypoint,
client: Client,
Expand All @@ -504,18 +509,20 @@ def start( # type: ignore[override]

proc: Self = super().start(target=target, client=client, **kwargs)
proc.had_callbacks = bool(callbacks) # Track if this process had callbacks
proc._on_child_started(callbacks, path, bundle_path)
proc._on_child_started(callbacks, path, bundle_path, bundle_name)
return proc

def _on_child_started(
self,
callbacks: list[CallbackRequest],
path: str | os.PathLike[str],
bundle_path: Path,
bundle_name: str,
) -> None:
msg = DagFileParseRequest(
file=os.fspath(path),
bundle_path=bundle_path,
bundle_name=bundle_name,
callback_requests=callbacks,
)
self.send_msg(msg, request_id=0)
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ class LocalExecutor(BaseExecutor):
workers: dict[int, multiprocessing.Process]
_unread_messages: multiprocessing.sharedctypes.Synchronized[int]

def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
def __init__(self, parallelism: int = PARALLELISM, **kwargs) -> None:
super().__init__(parallelism=parallelism, **kwargs)
if self.parallelism < 0:
raise ValueError("parallelism must be greater than or equal to 0")

Expand Down
12 changes: 9 additions & 3 deletions airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | N
bundle = manager.get_bundle(bundle_name)
with _airflow_parsing_context_manager(dag_id=dag_id):
dagbag = DagBag(
dag_folder=dagfile_path or bundle.path, bundle_path=bundle.path, include_examples=False
dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path,
bundle_name=bundle.name,
include_examples=False,
)
if dag := dagbag.dags.get(dag_id):
return dag
Expand All @@ -290,7 +293,10 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | N
bundle.initialize()
with _airflow_parsing_context_manager(dag_id=dag_id):
dagbag = DagBag(
dag_folder=dagfile_path or bundle.path, bundle_path=bundle.path, include_examples=False
dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path,
bundle_name=bundle.name,
include_examples=False,
)
sync_bag_to_db(dagbag, bundle.name, bundle.version)
if dag := dagbag.dags.get(dag_id):
Expand Down Expand Up @@ -327,7 +333,7 @@ def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False, fr
return [get_bagged_dag(bundle_names=bundle_names, dag_id=dag_id)]

def _find_dag(bundle):
dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path)
dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
matched_dags = [dag for dag in dagbag.dags.values() if re.search(dag_id, dag.dag_id)]
return matched_dags

Expand Down
3 changes: 3 additions & 0 deletions airflow-core/tests/unit/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ def test_dag_test_with_bundle_name(self, mock_dagbag, configure_dag_bundles):
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=TEST_DAGS_FOLDER,
bundle_name="testing",
include_examples=False,
)

Expand All @@ -805,6 +806,7 @@ def test_dag_test_with_dagfile_path(self, mock_dagbag, configure_dag_bundles):
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=str(dag_file),
bundle_name="testing",
include_examples=False,
)

Expand Down Expand Up @@ -836,6 +838,7 @@ def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag, configure
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=str(dag_file),
bundle_name="testing",
include_examples=False,
)

Expand Down
Loading