From 7c85920a8244987856ad236200a7368d39e7a298 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 10 May 2024 12:40:16 -0700 Subject: [PATCH 1/4] Add timeout when watching pod events in k8s executor If we don't set a timeout, it may hang indefinitely if there's a network issue. --- .../executors/kubernetes_executor_utils.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index d26df876eff54..06764b3da930c 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -98,9 +98,7 @@ def run(self) -> None: kube_client, self.resource_version, self.scheduler_job_id, self.kube_config ) except ReadTimeoutError: - self.log.warning( - "There was a timeout error accessing the Kube API. Retrying request.", exc_info=True - ) + self.log.info("Kubernetes watch timed out waiting for events. Restarting watch.") time.sleep(1) except Exception: self.log.exception("Unknown error in KubernetesJobWatcher. Failing") @@ -150,6 +148,14 @@ def _run( last_resource_version: str | None = None + # For info about k8s timeout settings see + # https://github.com/kubernetes-client/python/blob/94e42113a1fe5c580917decacdde879eab7406b3/examples/watch/timeout-settings.md + # and https://github.com/kubernetes-client/python/blob/b47caad922709350f477210317ac7f9574a72a97/kubernetes/client/api_client.py#L336-L339 + request_timeout = 30 + server_conn_timeout = 3600 + kwargs["_request_timeout"] = request_timeout + kwargs["timeout_seconds"] = server_conn_timeout + for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs): task = event["object"] self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"]) From 5c5044cb3d5d6e317a952c96ee583416b39005f0 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 10 May 2024 13:32:21 -0700 Subject: [PATCH 2/4] Update airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py Co-authored-by: Ryan Hatter <25823361+RNHTTR@users.noreply.github.com> --- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 06764b3da930c..03fe17155c1bb 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -149,8 +149,8 @@ def _run( last_resource_version: str | None = None # For info about k8s timeout settings see - # https://github.com/kubernetes-client/python/blob/94e42113a1fe5c580917decacdde879eab7406b3/examples/watch/timeout-settings.md - # and https://github.com/kubernetes-client/python/blob/b47caad922709350f477210317ac7f9574a72a97/kubernetes/client/api_client.py#L336-L339 + # https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md + # and https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339 request_timeout = 30 server_conn_timeout = 3600 kwargs["_request_timeout"] = request_timeout From 54a3fe051b14ff9404afae4e8cb8fbeea47211b8 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 13 May 2024 23:12:57 -0700 Subject: [PATCH 3/4] fix name of timeout --- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 03fe17155c1bb..8447c8dad7dbc 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -139,7 +139,7 @@ def _run( ) -> str | None: self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version) - kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"} + kwargs: dict[str:Any] = {"label_selector": f"airflow-worker={scheduler_job_id}"} if resource_version: kwargs["resource_version"] = resource_version if kube_config.kube_client_request_args: @@ -151,9 +151,9 @@ def _run( # For info about k8s timeout settings see # https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md # and https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339 - request_timeout = 30 + client_timeout = 30 server_conn_timeout = 3600 - kwargs["_request_timeout"] = request_timeout + kwargs["_request_timeout"] = client_timeout kwargs["timeout_seconds"] = server_conn_timeout for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs): From 025ca92da1ce66df0f5e2e7aafea134aeb526f39 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 14 May 2024 10:52:41 -0700 Subject: [PATCH 4/4] fix typing --- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 8447c8dad7dbc..b8235bb5ac49b 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -139,7 +139,7 @@ def _run( ) -> str | None: self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version) - kwargs: dict[str:Any] = {"label_selector": f"airflow-worker={scheduler_job_id}"} + kwargs: dict[str, Any] = {"label_selector": f"airflow-worker={scheduler_job_id}"} if resource_version: kwargs["resource_version"] = resource_version if kube_config.kube_client_request_args: