Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,23 @@ class DuplicateTaskIdFound(AirflowException):
"""Raise when a Task with duplicate task_id is defined in the same DAG."""


class TaskAlreadyInTaskGroup(AirflowException):
"""Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""

def __init__(self, task_id: str, existing_group_id: Optional[str], new_group_id: str) -> None:
super().__init__(task_id, new_group_id)
self.task_id = task_id
self.existing_group_id = existing_group_id
self.new_group_id = new_group_id

def __str__(self) -> str:
if self.existing_group_id is None:
existing_group = "the DAG's root group"
else:
existing_group = f"group {self.existing_group_id!r}"
return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"


class SerializationError(AirflowException):
"""A problem occurred when trying to serialize a DAG."""

Expand Down
18 changes: 16 additions & 2 deletions airflow/utils/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import weakref
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Sequence, Set, Tuple, Union

from airflow.exceptions import AirflowDagCycleException, AirflowException, DuplicateTaskIdFound
from airflow.exceptions import (
AirflowDagCycleException,
AirflowException,
DuplicateTaskIdFound,
TaskAlreadyInTaskGroup,
)
from airflow.models.taskmixin import DAGNode, DependencyMixin
from airflow.serialization.enums import DagAttributeTypes
from airflow.utils.helpers import validate_group_key
Expand Down Expand Up @@ -186,7 +191,16 @@ def __iter__(self):
yield child

def add(self, task: DAGNode) -> None:
"""Add a task to this TaskGroup."""
"""Add a task to this TaskGroup.

:meta private:
"""
from airflow.models.abstractoperator import AbstractOperator

existing_tg = task.task_group
if isinstance(task, AbstractOperator) and existing_tg is not None and existing_tg != self:
raise TaskAlreadyInTaskGroup(task.node_id, existing_tg.node_id, self.node_id)

# Set the TG first, as setting it might change the return value of node_id!
task.task_group = weakref.proxy(self)
key = task.node_id
Expand Down
22 changes: 22 additions & 0 deletions tests/utils/test_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pytest

from airflow.decorators import dag, task_group as task_group_decorator
from airflow.exceptions import TaskAlreadyInTaskGroup
from airflow.models import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -1201,3 +1202,24 @@ def nested_topo(group):
],
task6,
]


def test_add_to_sub_group():
with DAG("test_dag", start_date=pendulum.parse("20200101")):
tg = TaskGroup("section")
task = EmptyOperator(task_id="task")
with pytest.raises(TaskAlreadyInTaskGroup) as ctx:
tg.add(task)

assert str(ctx.value) == "cannot add 'task' to 'section' (already in the DAG's root group)"


def test_add_to_another_group():
with DAG("test_dag", start_date=pendulum.parse("20200101")):
tg = TaskGroup("section_1")
with TaskGroup("section_2"):
task = EmptyOperator(task_id="task")
with pytest.raises(TaskAlreadyInTaskGroup) as ctx:
tg.add(task)

assert str(ctx.value) == "cannot add 'section_2.task' to 'section_1' (already in group 'section_2')"