Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class AbstractOperator(LoggingMixin, DAGNode):

owner: str
task_id: str
is_mapped: ClassVar[bool]
Copy link
Member Author

@potiuk potiuk May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr -> maybe there is another way, but I believe the lack of "is_mapped" in AbstractOperator was a bit wrong (and it would prevent adding the proper typing in task_group_to_grid ad AbstractOperator missed is_mapped field).


HIDE_ATTRS_FROM_UI: ClassVar[FrozenSet[str]] = frozenset(
(
Expand Down
3 changes: 2 additions & 1 deletion airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

from airflow import models
from airflow.models import errors
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand Down Expand Up @@ -128,7 +129,7 @@ def get_mapped_summary(parent_instance, task_instances):
}


def get_task_summary(dag_run: DagRun, task, session: Session) -> Optional[Dict[str, Any]]:
def get_task_summary(dag_run: DagRun, task: AbstractOperator, session: Session) -> Optional[Dict[str, Any]]:
task_instance = (
session.query(TaskInstance)
.filter(
Expand Down
24 changes: 19 additions & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import parse_qsl, unquote, urlencode, urlparse

import lazy_object_proxy
Expand Down Expand Up @@ -126,6 +126,7 @@
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.strings import to_boolean
from airflow.utils.task_group import TaskGroup
from airflow.utils.timezone import td_format, utcnow
from airflow.version import version
from airflow.www import auth, utils as wwwutils
Expand Down Expand Up @@ -250,15 +251,26 @@ def _safe_parse_datetime(v):
abort(400, f"Invalid datetime: {v!r}")


def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
def task_group_to_grid(
task_item_or_group: Union[AbstractOperator, TaskGroup],
dag: DAG,
dag_runs: Iterable[DagRun],
session: Session,
) -> Dict[str, Any]:
"""
Create a nested dict representation of this TaskGroup and its children used to construct
the Graph.
"""
if isinstance(task_item_or_group, AbstractOperator):
task_instances: List[Dict[str, Any]] = [
ts
for ts in filter(
None, (wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs)
)
]
return {
'id': task_item_or_group.task_id,
'instances': [wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs],
'instances': task_instances,
'label': task_item_or_group.label,
'extra_links': task_item_or_group.extra_links,
'is_mapped': task_item_or_group.is_mapped,
Expand All @@ -267,9 +279,11 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
# Task Group
task_group = task_item_or_group

children = [task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort()]
children: List[Dict[str, Any]] = [
task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort()
]

def get_summary(dag_run, children):
def get_summary(dag_run: DagRun, children: List[Dict[str, Any]]):
child_instances = [child['instances'] for child in children if 'instances' in child]
child_instances = [item for sublist in child_instances for item in sublist]

Expand Down