From 7b03343c51060c730e24800f59c08b09bce98fe9 Mon Sep 17 00:00:00 2001 From: muhua Date: Fri, 14 Jan 2022 16:51:37 +0800 Subject: [PATCH 01/10] UnicodeDecodeError: 'utf-8' codec can't decode byte 0xXX in position X: invalid start byte File "/opt/work/python395/lib/python3.9/site-packages/airflow/hooks/subprocess.py", line 89, in run_command line = raw_line.decode(output_encoding).rstrip() # raw_line == b'\x00\x00\x00\x11\xa9\x01\n' UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 4: invalid start byte --- airflow/hooks/subprocess.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index c814b0528d25a..0e06f669ede80 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,7 +88,11 @@ def pre_exec(): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - line = raw_line.decode(output_encoding).rstrip() + line = '' + if 'utf-8'==output_encoding: + line = ''.join([chr(_) for _ in raw_line]).rstrip() + else: + line = raw_line.decode(output_encoding).rstrip() self.log.info("%s", line) self.sub_process.wait() From 94f328202c66c599f50cb53862632c622852b5a8 Mon Sep 17 00:00:00 2001 From: muhua Date: Fri, 14 Jan 2022 17:01:34 +0800 Subject: [PATCH 02/10] Update subprocess.py --- airflow/hooks/subprocess.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index 0e06f669ede80..0c51d7f2fc3ca 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,11 +88,7 @@ def pre_exec(): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - line = '' - if 'utf-8'==output_encoding: - line = ''.join([chr(_) for _ in raw_line]).rstrip() - else: - line = raw_line.decode(output_encoding).rstrip() + line = raw_line.decode(output_encoding, 'backslashreplace').rstrip() self.log.info("%s", line) self.sub_process.wait() From 1640079cc0f6df51e1a8ae72f78f7646144fe461 Mon Sep 17 00:00:00 2001 From: muhua Date: Sat, 15 Jan 2022 18:51:50 +0800 Subject: [PATCH 03/10] Update subprocess.py --- airflow/hooks/subprocess.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index 0c51d7f2fc3ca..081b1aa6c1f68 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,8 +88,11 @@ def pre_exec(): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - line = raw_line.decode(output_encoding, 'backslashreplace').rstrip() - self.log.info("%s", line) + try: + line = raw_line.decode(output_encoding).rstrip() + self.log.info("%s", line) + except Exception as err: + print(err, output_encoding, raw_line) self.sub_process.wait() From 0ea9ea2062aac4cecb79adf46906bd9315b54ef1 Mon Sep 17 00:00:00 2001 From: muhua Date: Mon, 24 Jan 2022 10:16:41 +0800 Subject: [PATCH 04/10] Fix: Exception when parsing log #20966 --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 46e593a2c93df..29f094de2d1a9 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -221,7 +221,8 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) follow=follow, ) for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) + line = line.decode('utf-8', errors="backslashreplace") + timestamp, message = self.parse_log_line(line) self.log.info(message) except BaseHTTPError as e: self.log.warning( From 10558afd339d53238880a922a6cf4bc890247ea2 Mon Sep 17 00:00:00 2001 From: muhua Date: Mon, 24 Jan 2022 10:22:44 +0800 Subject: [PATCH 05/10] Fix: Exception when parsing log #20966 Another alternative is: try-catch it. e.g. ``` line = '' for raw_line in iter(self.sub_process.stdout.readline, b''): try: line = raw_line.decode(output_encoding).rstrip() except UnicodeDecodeError as err: print(err, output_encoding, raw_line) self.log.info("%s", line) ``` --- airflow/hooks/subprocess.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index 081b1aa6c1f68..fa8c706c6963d 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,11 +88,8 @@ def pre_exec(): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - try: - line = raw_line.decode(output_encoding).rstrip() - self.log.info("%s", line) - except Exception as err: - print(err, output_encoding, raw_line) + line = raw_line.decode(output_encoding, errors='backslashreplace').rstrip() + self.log.info("%s", line) self.sub_process.wait() From 0f5d03710883f6443c055503568ee2367526fdb9 Mon Sep 17 00:00:00 2001 From: muhua Date: Mon, 24 Jan 2022 17:02:41 +0800 Subject: [PATCH 06/10] Create test_subprocess.sh --- tests/hooks/test_subprocess.sh | 1 + 1 file changed, 1 insertion(+) create mode 100644 tests/hooks/test_subprocess.sh diff --git a/tests/hooks/test_subprocess.sh b/tests/hooks/test_subprocess.sh new file mode 100644 index 0000000000000..1d1265fc9649b --- /dev/null +++ b/tests/hooks/test_subprocess.sh @@ -0,0 +1 @@ +printf 'This will cause a coding error \xb1\xa6\x01\n' # v2.2.3: airflow/hooks/subprocess.py:88: UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb1 in position 31: invalid start byte From c1da84c2ddcff70a0b49c2bee1ef9c972c750cc2 Mon Sep 17 00:00:00 2001 From: muhua Date: Mon, 24 Jan 2022 17:03:18 +0800 Subject: [PATCH 07/10] Update test_subprocess.py --- tests/hooks/test_subprocess.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index 642b219c0197a..5b7fef0c9cf03 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -96,3 +96,9 @@ def test_should_exec_subprocess(self, mock_popen, mock_temporary_directory): stderr=STDOUT, stdout=PIPE, ) + + def test_task_decode(self): + hook = SubprocessHook() + command=['bash', '-c', "source "+__file__[:-2]+'sh'] + result = hook.run_command(command=command) + assert result.exit_code==0 From 8106f2d37448216132073825efef8b50d8a9dc6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Nov=C3=A1k?= Date: Wed, 27 Apr 2022 20:18:51 +0200 Subject: [PATCH 08/10] Added shell directive and license to test_subprocess.sh --- tests/hooks/test_subprocess.sh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/hooks/test_subprocess.sh b/tests/hooks/test_subprocess.sh index 1d1265fc9649b..dc5b249c436ec 100644 --- a/tests/hooks/test_subprocess.sh +++ b/tests/hooks/test_subprocess.sh @@ -1 +1,19 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + printf 'This will cause a coding error \xb1\xa6\x01\n' # v2.2.3: airflow/hooks/subprocess.py:88: UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb1 in position 31: invalid start byte From 535d59afb07d5d075d94b053c7d2f05d859d753e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Nov=C3=A1k?= Date: Wed, 27 Apr 2022 21:45:38 +0200 Subject: [PATCH 09/10] Distinguish between raw and decoded lines as suggested by @uranusjr --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 29f094de2d1a9..153abee1dbf95 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -220,8 +220,8 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) ), follow=follow, ) - for line in logs: - line = line.decode('utf-8', errors="backslashreplace") + for raw_line in logs: + line = raw_line.decode('utf-8', errors="backslashreplace") timestamp, message = self.parse_log_line(line) self.log.info(message) except BaseHTTPError as e: From 252d25277cba7b13eb9138131f38976e66ae9265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Nov=C3=A1k?= Date: Mon, 9 May 2022 09:38:21 +0200 Subject: [PATCH 10/10] simplify test --- tests/hooks/test_subprocess.py | 4 ++-- tests/hooks/test_subprocess.sh | 19 ------------------- 2 files changed, 2 insertions(+), 21 deletions(-) delete mode 100644 tests/hooks/test_subprocess.sh diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index 5b7fef0c9cf03..10d9b0097044d 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -99,6 +99,6 @@ def test_should_exec_subprocess(self, mock_popen, mock_temporary_directory): def test_task_decode(self): hook = SubprocessHook() - command=['bash', '-c', "source "+__file__[:-2]+'sh'] + command = ['bash', '-c', 'printf "This will cause a coding error \\xb1\\xa6\\x01\n"'] result = hook.run_command(command=command) - assert result.exit_code==0 + assert result.exit_code == 0 diff --git a/tests/hooks/test_subprocess.sh b/tests/hooks/test_subprocess.sh deleted file mode 100644 index dc5b249c436ec..0000000000000 --- a/tests/hooks/test_subprocess.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -printf 'This will cause a coding error \xb1\xa6\x01\n' # v2.2.3: airflow/hooks/subprocess.py:88: UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb1 in position 31: invalid start byte