Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions src/aks-preview/azext_aks_preview/_openai_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import sys
import subprocess
import io

import openai
from colorama import Fore, Style
Expand All @@ -18,8 +19,11 @@
if IS_MS_WINDOWS:
SCRIPT_TYPE = "Windows PowerShell"
else:
import pty
SCRIPT_TYPE = "Bash Script"

AZ_ERROR_FORMATTER = "{} : {}"

AKS_EXPERT = f'''
You are a microsoft Azure Kubernetes Service expert.

Expand Down Expand Up @@ -82,8 +86,69 @@ def run_system_script(script_content: str):
cmd = ["powershell", "-Command", script_content]
else:
cmd = ["bash", "-c", script_content]
result = subprocess.run(cmd, text=True)
return result.returncode
subprocess.run(cmd, text=True)
return ""


def detect_az_error(az_output):
# this is based on how azure cli is logging the error to output (message_details)
# https://github.com/Azure/azure-cli/blob/dev/src/azure-cli-core/azure/cli/core/aaz/_error_format.py#L78
pattern = rb'Code:([^\n]+)\nMessage:([^\x1b]+)\x1b'
matches = re.findall(pattern, az_output)
if matches:
for match in matches:
try:
code = match[0].decode('utf-8').strip()
message = match[1].decode('utf-8').strip()
return AZ_ERROR_FORMATTER.format(code, message)
except Exception:
pass
return ""


def strip_terminal_escapes(byte_string):
# Regular expression pattern to match various terminal escape sequences
patterns = [
b'\033\[[0-9;]*m', # Color/style changes
b'\033\[[0-9]+[ABCD]', # Cursor movements
b'\033\[[0-9;]+[Hf]', # Cursor positioning
b'\033\[2J', # Clear entire screen
b'\033\[K' # Clear from cursor to end of line
]

cleaned = byte_string
for pattern in patterns:
cleaned = re.sub(pattern, b'', cleaned)

return cleaned.decode('utf-8', 'replace')


def spawn_and_run_system_script(script_content: str):
"""
Note: this doesn't work on Windows because there's no pty support
we use spawn to capture child output,
so user can still interact with yes/no answer from script
"""
if IS_MS_WINDOWS:
raise Exception('capture child output is not supported on Windows')
cmd = ["bash", "-c", script_content]

buffer = io.BytesIO()

def read_child_output(fd):
data = os.read(fd, 1024)
buffer.write(data)
return data
return_status = pty.spawn(cmd, read_child_output)
exit_code = os.waitstatus_to_exitcode(return_status)
script_output = buffer.getvalue()
# Enhancement: remove known repetitive text output
az_error = detect_az_error(script_output)
if az_error:
return az_error
if exit_code != 0:
return strip_terminal_escapes(script_output)
return ""


def extract_backticks_commands(text):
Expand Down Expand Up @@ -206,7 +271,9 @@ def prompt_user_to_run_script(scripts):
if ord_0 <= ord_code < ord_0 + n_scripts:
i = ord_code - ord_0
script = scripts[i]
return run_system_script(script)
if IS_MS_WINDOWS:
return run_system_script(script)
return spawn_and_run_system_script(script)


USER_INPUT_PROMPT = "Prompt: "
Expand Down
5 changes: 4 additions & 1 deletion src/aks-preview/azext_aks_preview/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2552,7 +2552,10 @@ def start_chat(prompt=None):
if user_input in ('p', 'P'):
scripts, messages = prompt_chat_gpt(messages, params, insist=False, scripts=scripts)
elif (user_input in ('r', 'R')) and len(scripts) > 0:
prompt_user_to_run_script(scripts)
error_output = prompt_user_to_run_script(scripts)
if error_output:
error_need_help = "I ran into the following error:\n" + error_output
scripts, messages = prompt_chat_gpt(messages, params, start_input=error_need_help, scripts=scripts)
elif user_input in ('q', 'Q'):
# Exiting the program...
break
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------

import os
import io
import pty
import subprocess
import tempfile
Expand Down Expand Up @@ -127,6 +128,25 @@ def assert_openai_interactive_shell_launched(self):
if not pattern in output:
raise CliTestError(f"Output from aks copilot did not contain '{pattern}'. Output:\n{output}")

def test_detect_az_error(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put these test cases to some other file? The test cases in test_aks_commands.py are live test cases.

from azext_aks_preview._openai_wrapper import detect_az_error, AZ_ERROR_FORMATTER
code = "LocationNotAvailableForResourceGroup"
message = "The provided location 'useast' is not available."
# \x1b is the ASCII for ESCAPE in terminal
actual = detect_az_error("Code: {}\nMessage: {}\x1b".format(code, message).encode())
expected = AZ_ERROR_FORMATTER.format(code, message)
self.assertEqual(actual, expected)

actual = detect_az_error(b"Code:1. Creates a resource group in the")
self.assertEqual(actual, "")

def test_strip_terminal_escapes(self):
from azext_aks_preview._openai_wrapper import strip_terminal_escapes
colored_string = b'''Code: LocationNotAvailableForResourceGroup
Message: The provided location 'useast' is not available for resource group. '''
actual = strip_terminal_escapes(colored_string)
self.assertEqual(actual, '''Code: LocationNotAvailableForResourceGroup
Message: The provided location 'useast' is not available for resource group. ''')

@AllowLargeResponse()
@AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='eastus')
Expand Down