Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@
from airflow.kubernetes.kube_config import KubeConfig
from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key, create_pod_id
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models.taskinstance import TaskInstance
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState

if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.models.taskinstance import TaskInstanceKey

# TaskInstance key, command, configuration, pod_template_file
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
Expand Down Expand Up @@ -226,8 +227,24 @@ def process_status(
self.log.error("Event: %s Failed", pod_name)
self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version))
elif status == "Succeeded":
# We get multiple events once the pod hits a terminal state, and we only want to
# send it along to the scheduler once.
# If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has
# a deletion timestamp, we've already seen the initial Succeeded event and sent it
# along to the scheduler.
pod = event["object"]
if (
event["type"] == "DELETED"
or POD_EXECUTOR_DONE_KEY in pod.metadata.labels
or pod.metadata.deletion_timestamp
):
self.log.info(
"Skipping event for Succeeded pod %s - event for this pod already sent to executor",
pod_name,
)
return
self.log.info("Event: %s Succeeded", pod_name)
self.watcher_queue.put((pod_name, namespace, State.SUCCESS, annotations, resource_version))
self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version))
elif status == "Running":
if event["type"] == "DELETED":
self.log.info("Event: Pod %s deleted before it could complete", pod_name)
Expand Down Expand Up @@ -725,7 +742,15 @@ def sync(self) -> None:
next_event = self.event_scheduler.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)

def _change_state(self, key: TaskInstanceKey, state: str | None, pod_name: str, namespace: str) -> None:
@provide_session
def _change_state(
self,
key: TaskInstanceKey,
state: str | None,
pod_name: str,
namespace: str,
session: Session = NEW_SESSION,
) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler

Expand All @@ -745,10 +770,12 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_name: str,
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
else:
# We get multiple events once the pod hits a terminal state, and we only want to
# do this once, so only do it when we remove the task from running
self.event_buffer[key] = state, None

# If we don't have a TI state, look it up from the db. event_buffer expects the TI state
if state is None:
state = session.query(TaskInstance.state).filter(TaskInstance.filter_for_tis([key])).scalar()

self.event_buffer[key] = state, None

@staticmethod
def _get_pod_namespace(ti: TaskInstance):
Expand Down
5 changes: 4 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,10 @@ def _process_executor_events(self, session: Session) -> int:
# but that is handled by the zombie detection.

ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED
ti_requeued = ti.queued_by_job_id != self.job.id or self.job.executor.has_task(ti)
ti_requeued = (
ti.queued_by_job_id != self.job.id # Another scheduler has queued this task again
or self.job.executor.has_task(ti) # This scheduler has this task already
)

if ti_queued and not ti_requeued:
Stats.incr(
Expand Down
58 changes: 56 additions & 2 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.state import State, TaskInstanceState
from tests.test_utils.config import conf_vars

try:
from airflow.executors.kubernetes_executor import (
POD_EXECUTOR_DONE_KEY,
AirflowKubernetesScheduler,
KubernetesExecutor,
KubernetesJobWatcher,
Expand All @@ -50,7 +52,6 @@
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.utils.state import State
except ImportError:
AirflowKubernetesScheduler = None # type: ignore

Expand Down Expand Up @@ -576,6 +577,34 @@ def test_change_state_failed_no_deletion(
finally:
executor.end()

@pytest.mark.parametrize(
"ti_state", [TaskInstanceState.SUCCESS, TaskInstanceState.FAILED, TaskInstanceState.DEFERRED]
)
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
def test_change_state_none(
self,
mock_delete_pod,
mock_get_kube_client,
mock_kubernetes_job_watcher,
ti_state,
create_task_instance,
):
"""Ensure that when change_state gets state=None, it looks up the TI state from the db"""
executor = self.kubernetes_executor
executor.start()
try:
ti = create_task_instance(state=ti_state)
key = ti.key
executor.running = {key}
executor._change_state(key, None, "pod_name", "default")
assert executor.event_buffer[key][0] == ti_state
assert executor.running == set()
mock_delete_pod.assert_called_once_with(pod_name="pod_name", namespace="default")
finally:
executor.end()

@pytest.mark.parametrize(
"multi_namespace_mode_namespace_list, watchers_keys",
[
Expand Down Expand Up @@ -1156,6 +1185,7 @@ def setup_method(self):
annotations={"airflow-worker": "bar", **self.core_annotations},
namespace="airflow",
resource_version="456",
labels={},
),
status=k8s.V1PodStatus(phase="Pending"),
)
Expand Down Expand Up @@ -1207,7 +1237,31 @@ def test_process_status_succeeded(self):
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
self.assert_watcher_queue_called_once_with_state(State.SUCCESS)
# We don't know the TI state, so we send in None
self.assert_watcher_queue_called_once_with_state(None)

def test_process_status_succeeded_dedup_label(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.labels[POD_EXECUTOR_DONE_KEY] = "True"
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_dedup_timestamp(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.deletion_timestamp = datetime.utcnow()
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_type_delete(self):
self.pod.status.phase = "Succeeded"
self.events.append({"type": "DELETED", "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"
Expand Down