diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index c814b0528d25a..fa8c706c6963d 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,7 +88,7 @@ def pre_exec(): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - line = raw_line.decode(output_encoding).rstrip() + line = raw_line.decode(output_encoding, errors='backslashreplace').rstrip() self.log.info("%s", line) self.sub_process.wait() diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 46e593a2c93df..153abee1dbf95 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -220,8 +220,9 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) ), follow=follow, ) - for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) + for raw_line in logs: + line = raw_line.decode('utf-8', errors="backslashreplace") + timestamp, message = self.parse_log_line(line) self.log.info(message) except BaseHTTPError as e: self.log.warning( diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index 642b219c0197a..10d9b0097044d 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -96,3 +96,9 @@ def test_should_exec_subprocess(self, mock_popen, mock_temporary_directory): stderr=STDOUT, stdout=PIPE, ) + + def test_task_decode(self): + hook = SubprocessHook() + command = ['bash', '-c', 'printf "This will cause a coding error \\xb1\\xa6\\x01\n"'] + result = hook.run_command(command=command) + assert result.exit_code == 0