From ccdbd5412d2c3674483ccb3b1cc9dad67b59210d Mon Sep 17 00:00:00 2001 From: schattian Date: Tue, 10 May 2022 15:01:20 +0200 Subject: [PATCH 1/5] Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) --- .../cncf/kubernetes/utils/pod_manager.py | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 993ba12e313fe..fd8fb2299752e 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. """Launches PODs""" +import asyncio import json import math import time @@ -193,6 +194,35 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt ) return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) + def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: + for line in logs: + timestamp, message = self.parse_log_line(line.decode('utf-8')) + self.log.info(message) + return timestamp + + def consume_container_logs_stream(self, pod: V1Pod, container_name: str, stream: Iterable[bytes]) -> Optional[DateTime]: + async def consume_log_stream() -> Optional[DateTime]: + return self.log_iterable(stream) + + async def async_await_container_completion() -> None: + self.await_container_completion(pod=pod, container_name=container_name) + + await_container_completion = asyncio.create_task(async_await_container_completion()) + log_stream = asyncio.create_task(consume_log_stream()) + asyncio.run(asyncio.wait({log_stream, await_container_completion}, return_when=asyncio.FIRST_COMPLETED)) + + if log_stream.done(): + return log_stream.result() + + log_stream.cancel() + self.log.warning( + "Pod %s log read was interrupted at some point caused by log rotation " + "see https://github.com/kubernetes/kubernetes/issues/59902 ", + "and https://github.com/apache/airflow/issues/23497 for reference.", + pod.metadata.name, + container_name, + ) + def fetch_container_logs( self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None ) -> PodLoggingStatus: @@ -220,9 +250,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) ), follow=follow, ) - for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) - self.log.info(message) + if follow: + timestamp = self.consume_container_logs_stream(pod, container_name, logs) + else: + timestamp = self.log_iterable(logs) except BaseHTTPError as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " @@ -255,7 +286,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) time.sleep(1) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: - while not self.container_is_running(pod=pod, container_name=container_name): + while self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) def await_pod_completion(self, pod: V1Pod) -> V1Pod: From 362c1505ad3fb1624cb787751130cb57d48189e9 Mon Sep 17 00:00:00 2001 From: schattian Date: Tue, 10 May 2022 13:45:05 -0700 Subject: [PATCH 2/5] Add event loop if not running; make linter happy --- .../cncf/kubernetes/utils/pod_manager.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index fd8fb2299752e..dfe419e549fd4 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -200,16 +200,20 @@ def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: self.log.info(message) return timestamp - def consume_container_logs_stream(self, pod: V1Pod, container_name: str, stream: Iterable[bytes]) -> Optional[DateTime]: + def consume_container_logs_stream( + self, pod: V1Pod, container_name: str, stream: Iterable[bytes] + ) -> Optional[DateTime]: async def consume_log_stream() -> Optional[DateTime]: return self.log_iterable(stream) async def async_await_container_completion() -> None: - self.await_container_completion(pod=pod, container_name=container_name) + return self.await_container_completion(pod=pod, container_name=container_name) - await_container_completion = asyncio.create_task(async_await_container_completion()) - log_stream = asyncio.create_task(consume_log_stream()) - asyncio.run(asyncio.wait({log_stream, await_container_completion}, return_when=asyncio.FIRST_COMPLETED)) + loop = asyncio.get_event_loop() + await_container_completion = loop.create_task(async_await_container_completion()) + log_stream = loop.create_task(consume_log_stream()) + tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream} + loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) if log_stream.done(): return log_stream.result() @@ -222,6 +226,7 @@ async def async_await_container_completion() -> None: pod.metadata.name, container_name, ) + return None def fetch_container_logs( self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None From 109f87b2f4a6b96d12e8b4ffa41d1807a89ea051 Mon Sep 17 00:00:00 2001 From: schattian Date: Tue, 10 May 2022 14:16:47 -0700 Subject: [PATCH 3/5] Fix merge conflicts --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 62631f420a8c8..bedcfc99f9f96 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -196,7 +196,7 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8'), errors="backslashreplace") + timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace")) self.log.info(message) return timestamp From 5605a5a60819bc3a195e56c2a0ddad29f26da553 Mon Sep 17 00:00:00 2001 From: schattian Date: Tue, 10 May 2022 15:00:42 -0700 Subject: [PATCH 4/5] Fix crash when container logs are empty --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index bedcfc99f9f96..a39dc270c8f21 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -195,6 +195,7 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: + timestamp = None for line in logs: timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace")) self.log.info(message) From 86171ff43f8977021708cfa241da64f96c4c15e2 Mon Sep 17 00:00:00 2001 From: schattian Date: Wed, 11 May 2022 06:59:14 -0700 Subject: [PATCH 5/5] Add test for kube api hanging logs stream --- .../cncf/kubernetes/utils/pod_manager.py | 26 ++++++++--------- .../cncf/kubernetes/utils/test_pod_manager.py | 28 +++++++++++++++++-- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index a39dc270c8f21..0a10d02d0dc43 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -16,6 +16,7 @@ # under the License. """Launches PODs""" import asyncio +import concurrent import json import math import time @@ -204,29 +205,28 @@ def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: def consume_container_logs_stream( self, pod: V1Pod, container_name: str, stream: Iterable[bytes] ) -> Optional[DateTime]: - async def consume_log_stream() -> Optional[DateTime]: - return self.log_iterable(stream) - async def async_await_container_completion() -> None: - return self.await_container_completion(pod=pod, container_name=container_name) + await asyncio.sleep(1) + while self.container_is_running(pod=pod, container_name=container_name): + await asyncio.sleep(1) loop = asyncio.get_event_loop() await_container_completion = loop.create_task(async_await_container_completion()) - log_stream = loop.create_task(consume_log_stream()) + log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream)) tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream} loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) - if log_stream.done(): return log_stream.result() log_stream.cancel() - self.log.warning( - "Pod %s log read was interrupted at some point caused by log rotation " - "see https://github.com/kubernetes/kubernetes/issues/59902 ", - "and https://github.com/apache/airflow/issues/23497 for reference.", - pod.metadata.name, - container_name, - ) + try: + loop.run_until_complete(log_stream) + except concurrent.futures.CancelledError: + self.log.warning( + "Container %s log read was interrupted at some point caused by log rotation " + "see https://github.com/apache/airflow/issues/23497 for reference.", + container_name, + ) return None def fetch_container_logs( diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 8070c3c3532b5..7d105a1ac3a69 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. import logging +import time +from typing import Generator from unittest import mock from unittest.mock import MagicMock @@ -312,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now): args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0] assert kwargs['since_seconds'] == 5 - @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)]) + @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)]) @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') def test_fetch_container_running_follow( self, container_running_mock, follow, is_running_calls, exp_running @@ -322,13 +324,35 @@ def test_fetch_container_running_follow( When called with follow=False, should return immediately even though still running. """ mock_pod = MagicMock() - container_running_mock.side_effect = [True, True, False] # only will be called once + container_running_mock.side_effect = [True, False, False, False] # called once when follow=False self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi'] ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow) assert len(container_running_mock.call_args_list) == is_running_calls assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC')) assert ret.running is exp_running + @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)]) + @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') + def test_fetch_container_running_follow_when_kube_api_hangs( + self, container_running_mock, follow, is_running_calls, exp_running + ): + """ + When called with follow, should keep looping even after disconnections, if pod still running. + """ + mock_pod = MagicMock() + container_running_mock.side_effect = [True, False, False] + + def stream_logs() -> Generator: + while True: + time.sleep(1) # this is intentional: urllib3.response.stream() is not async + yield b'2021-01-01 hi' + + self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs() + ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow) + assert len(container_running_mock.call_args_list) == is_running_calls + assert ret.running is exp_running + assert ret.last_log_time is None + def params_for_test_container_is_running(): """The `container_is_running` method is designed to handle an assortment of bad objects