Skip to content
Merged
2 changes: 1 addition & 1 deletion airflow/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def pre_exec():
raise RuntimeError("The subprocess should be created here and is None!")
if self.sub_process.stdout is not None:
for raw_line in iter(self.sub_process.stdout.readline, b''):
line = raw_line.decode(output_encoding).rstrip()
line = raw_line.decode(output_encoding, errors='backslashreplace').rstrip()
self.log.info("%s", line)

self.sub_process.wait()
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
for raw_line in logs:
line = raw_line.decode('utf-8', errors="backslashreplace")
timestamp, message = self.parse_log_line(line)
self.log.info(message)
except BaseHTTPError as e:
self.log.warning(
Expand Down
6 changes: 6 additions & 0 deletions tests/hooks/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,9 @@ def test_should_exec_subprocess(self, mock_popen, mock_temporary_directory):
stderr=STDOUT,
stdout=PIPE,
)

def test_task_decode(self):
hook = SubprocessHook()
command = ['bash', '-c', 'printf "This will cause a coding error \\xb1\\xa6\\x01\n"']
result = hook.run_command(command=command)
assert result.exit_code == 0