diff --git a/src/aks-preview/azext_aks_preview/_openai_wrapper.py b/src/aks-preview/azext_aks_preview/_openai_wrapper.py index 38cbbc5ad14..3db965282b5 100644 --- a/src/aks-preview/azext_aks_preview/_openai_wrapper.py +++ b/src/aks-preview/azext_aks_preview/_openai_wrapper.py @@ -6,6 +6,7 @@ import os import sys import subprocess +import io import openai from colorama import Fore, Style @@ -18,8 +19,11 @@ if IS_MS_WINDOWS: SCRIPT_TYPE = "Windows PowerShell" else: + import pty SCRIPT_TYPE = "Bash Script" +AZ_ERROR_FORMATTER = "{} : {}" + AKS_EXPERT = f''' You are a microsoft Azure Kubernetes Service expert. @@ -82,8 +86,69 @@ def run_system_script(script_content: str): cmd = ["powershell", "-Command", script_content] else: cmd = ["bash", "-c", script_content] - result = subprocess.run(cmd, text=True) - return result.returncode + subprocess.run(cmd, text=True) + return "" + + +def detect_az_error(az_output): + # this is based on how azure cli is logging the error to output (message_details) + # https://github.com/Azure/azure-cli/blob/dev/src/azure-cli-core/azure/cli/core/aaz/_error_format.py#L78 + pattern = rb'Code:([^\n]+)\nMessage:([^\x1b]+)\x1b' + matches = re.findall(pattern, az_output) + if matches: + for match in matches: + try: + code = match[0].decode('utf-8').strip() + message = match[1].decode('utf-8').strip() + return AZ_ERROR_FORMATTER.format(code, message) + except Exception: + pass + return "" + + +def strip_terminal_escapes(byte_string): + # Regular expression pattern to match various terminal escape sequences + patterns = [ + b'\033\[[0-9;]*m', # Color/style changes + b'\033\[[0-9]+[ABCD]', # Cursor movements + b'\033\[[0-9;]+[Hf]', # Cursor positioning + b'\033\[2J', # Clear entire screen + b'\033\[K' # Clear from cursor to end of line + ] + + cleaned = byte_string + for pattern in patterns: + cleaned = re.sub(pattern, b'', cleaned) + + return cleaned.decode('utf-8', 'replace') + + +def spawn_and_run_system_script(script_content: str): + """ + Note: this doesn't work on Windows because there's no pty support + we use spawn to capture child output, + so user can still interact with yes/no answer from script + """ + if IS_MS_WINDOWS: + raise Exception('capture child output is not supported on Windows') + cmd = ["bash", "-c", script_content] + + buffer = io.BytesIO() + + def read_child_output(fd): + data = os.read(fd, 1024) + buffer.write(data) + return data + return_status = pty.spawn(cmd, read_child_output) + exit_code = os.waitstatus_to_exitcode(return_status) + script_output = buffer.getvalue() + # Enhancement: remove known repetitive text output + az_error = detect_az_error(script_output) + if az_error: + return az_error + if exit_code != 0: + return strip_terminal_escapes(script_output) + return "" def extract_backticks_commands(text): @@ -206,7 +271,9 @@ def prompt_user_to_run_script(scripts): if ord_0 <= ord_code < ord_0 + n_scripts: i = ord_code - ord_0 script = scripts[i] - return run_system_script(script) + if IS_MS_WINDOWS: + return run_system_script(script) + return spawn_and_run_system_script(script) USER_INPUT_PROMPT = "Prompt: " diff --git a/src/aks-preview/azext_aks_preview/custom.py b/src/aks-preview/azext_aks_preview/custom.py index caa042ef479..dcb0d584f1f 100644 --- a/src/aks-preview/azext_aks_preview/custom.py +++ b/src/aks-preview/azext_aks_preview/custom.py @@ -2552,7 +2552,10 @@ def start_chat(prompt=None): if user_input in ('p', 'P'): scripts, messages = prompt_chat_gpt(messages, params, insist=False, scripts=scripts) elif (user_input in ('r', 'R')) and len(scripts) > 0: - prompt_user_to_run_script(scripts) + error_output = prompt_user_to_run_script(scripts) + if error_output: + error_need_help = "I ran into the following error:\n" + error_output + scripts, messages = prompt_chat_gpt(messages, params, start_input=error_need_help, scripts=scripts) elif user_input in ('q', 'Q'): # Exiting the program... break diff --git a/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py b/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py index 33357887c86..5a31b3b030b 100644 --- a/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py +++ b/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py @@ -4,6 +4,7 @@ # -------------------------------------------------------------------------------------------- import os +import io import pty import subprocess import tempfile @@ -127,6 +128,25 @@ def assert_openai_interactive_shell_launched(self): if not pattern in output: raise CliTestError(f"Output from aks copilot did not contain '{pattern}'. Output:\n{output}") + def test_detect_az_error(self): + from azext_aks_preview._openai_wrapper import detect_az_error, AZ_ERROR_FORMATTER + code = "LocationNotAvailableForResourceGroup" + message = "The provided location 'useast' is not available." + # \x1b is the ASCII for ESCAPE in terminal + actual = detect_az_error("Code: {}\nMessage: {}\x1b".format(code, message).encode()) + expected = AZ_ERROR_FORMATTER.format(code, message) + self.assertEqual(actual, expected) + + actual = detect_az_error(b"Code:1. Creates a resource group in the") + self.assertEqual(actual, "") + + def test_strip_terminal_escapes(self): + from azext_aks_preview._openai_wrapper import strip_terminal_escapes + colored_string = b'''Code: LocationNotAvailableForResourceGroup +Message: The provided location 'useast' is not available for resource group. ''' + actual = strip_terminal_escapes(colored_string) + self.assertEqual(actual, '''Code: LocationNotAvailableForResourceGroup +Message: The provided location 'useast' is not available for resource group. ''') @AllowLargeResponse() @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='eastus')