Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 84 additions & 37 deletions src/azure-cli-core/azure/cli/core/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ def _get_extension_command_tree(self):
return None
return EXT_CMD_TREE

def _get_all_extensions(self, cmd_chain, ext_set=None):
"""Find all the extension names in cmd_chain (dict of extension command subtree).
An example of cmd_chain may look like (a command sub tree of the 'aks' command group):
{
"create": "aks-preview",
"update": "aks-preview",
"app": {
"up": "deploy-to-azure"
},
"use-dev-spaces": "dev-spaces"
}
Then the resulting ext_set is {'aks-preview', 'deploy-to-azure', 'dev-spaces'}
"""
ext_set = set() if ext_set is None else ext_set
for key in cmd_chain:
if isinstance(cmd_chain[key], str):
ext_set.add(cmd_chain[key])
else:
self._get_all_extensions(cmd_chain[key], ext_set)
return ext_set

def _search_in_extension_commands(self, command_str):
"""Search the command in an extension commands dict which mimics a prefix tree.
If the value of the dict item is a string, then the key represents the end of a complete command
Expand Down Expand Up @@ -346,7 +367,9 @@ def _search_in_extension_commands(self, command_str):
cmd_chain = cmd_chain[part]
except KeyError:
return None
return None
# command_str is prefix of one or more complete commands.
all_exts = self._get_all_extensions(cmd_chain)
return list(all_exts) if all_exts else None

def _get_extension_use_dynamic_install_config(self):
cli_ctx = self.cli_ctx or (self.cli_help.cli_ctx if self.cli_help else None)
Expand All @@ -365,7 +388,7 @@ def _get_extension_run_after_dynamic_install_config(self):
default_value) if cli_ctx else default_value
return run_after_extension_installed

def _check_value(self, action, value): # pylint: disable=too-many-statements, too-many-locals
def _check_value(self, action, value): # pylint: disable=too-many-statements, too-many-locals, too-many-branches
# Override to customize the error message when a argument is not among the available choices
# converted value must be one of the choices (if specified)
if action.choices is not None and value not in action.choices: # pylint: disable=too-many-nested-blocks
Expand All @@ -377,26 +400,41 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t
command_name_inferred = self.prog
error_msg = None
if not self.command_source:
candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7)
if candidates:
# use the most likely candidate to replace the misspelled command
args = self.prog.split() + self._raw_arguments
args_inferred = [item if item != value else candidates[0] for item in args]
command_name_inferred = ' '.join(args_inferred).split('-')[0]

candidates = []
args = self.prog.split() + self._raw_arguments
use_dynamic_install = self._get_extension_use_dynamic_install_config()
if use_dynamic_install != 'no' and not candidates:
if use_dynamic_install != 'no':
# Check if the command is from an extension
from azure.cli.core.util import roughly_parse_command
cmd_list = self.prog.split() + self._raw_arguments
command_str = roughly_parse_command(cmd_list[1:])
command_str = roughly_parse_command(args[1:])
ext_name = self._search_in_extension_commands(command_str)
# The input command matches the prefix of one or more extension commands
if isinstance(ext_name, list):
if len(ext_name) > 1:
from knack.prompting import prompt_choice_list, NoTTYException
try:
prompt_msg = "The command requires the latest version of one of the following " \
"extensions. You need to pick one to install:"
choice_idx = prompt_choice_list(prompt_msg, ext_name)
ext_name = ext_name[choice_idx]
use_dynamic_install = 'yes_without_prompt'
except NoTTYException:
error_msg = "{}{}\nUnable to prompt for selection as no tty available. Please " \
"update or install the extension with 'az extension add --upgrade -n " \
"<extension-name>'.".format(prompt_msg, ext_name)
logger.error(error_msg)
telemetry.set_user_fault(error_msg)
self.exit(2)
else:
ext_name = ext_name[0]

if ext_name:
caused_by_extension_not_installed = True
telemetry.set_command_details(command_str,
parameters=AzCliCommandInvoker._extract_parameter_names(cmd_list), # pylint: disable=protected-access
parameters=AzCliCommandInvoker._extract_parameter_names(args), # pylint: disable=protected-access
extension_name=ext_name)
run_after_extension_installed = self._get_extension_run_after_dynamic_install_config()
prompt_info = ""
if use_dynamic_install == 'yes_without_prompt':
logger.warning('The command requires the extension %s. '
'It will be installed first.', ext_name)
Expand All @@ -413,36 +451,45 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t
try:
go_on = prompt_y_n(prompt_msg, default='y')
if go_on:
prompt_info = " with prompt"
logger.warning(NO_PROMPT_CONFIG_MSG)
except NoTTYException:
logger.warning("The command requires the extension %s.\n "
"Unable to prompt for extension install confirmation as no tty "
"available. %s", ext_name, NO_PROMPT_CONFIG_MSG)
error_msg = "The command requires the extension {}. " \
"Unable to prompt for extension install confirmation as no tty " \
"available. {}".format(ext_name, NO_PROMPT_CONFIG_MSG)
go_on = False
if go_on:
from azure.cli.core.extension.operations import add_extension
add_extension(cli_ctx=cli_ctx, extension_name=ext_name, upgrade=True)
if run_after_extension_installed:
import subprocess
import platform
exit_code = subprocess.call(cmd_list, shell=platform.system() == 'Windows')
error_msg = ("Extension {} dynamically installed and commands will be "
"rerun automatically.").format(ext_name)
exit_code = subprocess.call(args, shell=platform.system() == 'Windows')
error_msg = ("Extension {} dynamically installed{} and commands will be "
"rerun automatically.").format(ext_name, prompt_info)
telemetry.set_user_fault(error_msg)
self.exit(exit_code)
else:
with CommandLoggerContext(logger):
error_msg = 'Extension {} installed. Please rerun your command.'.format(ext_name)
error_msg = 'Extension {} installed{}. Please rerun your command.' \
.format(ext_name, prompt_info)
logger.error(error_msg)
telemetry.set_user_fault(error_msg)
self.exit(2)
else:
error_msg = "The command requires the latest version of extension {ext_name}. " \
"To install, run 'az extension add --upgrade -n {ext_name}'.".format(ext_name=ext_name)
"To install, run 'az extension add --upgrade -n {ext_name}'.".format(
ext_name=ext_name) if not error_msg else error_msg
if not error_msg:
# parser has no `command_source`, value is part of command itself
error_msg = "'{value}' is misspelled or not recognized by the system.".format(value=value)
az_error = CommandNotFoundError(error_msg)
if not caused_by_extension_not_installed:
candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7)
if candidates:
# use the most likely candidate to replace the misspelled command
args_inferred = [item if item != value else candidates[0] for item in args]
command_name_inferred = ' '.join(args_inferred).split('-')[0]

else:
# `command_source` indicates command values have been parsed, value is an argument
Expand All @@ -457,22 +504,22 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t
az_error.set_recommendation("Did you mean '{}' ?".format(candidates[0]))

# recommend a command for user
recommender = CommandRecommender(*command_arguments, error_msg, cli_ctx)
recommender.set_help_examples(self.get_examples(command_name_inferred))
recommended_command = recommender.recommend_a_command()
if recommended_command:
az_error.set_recommendation("Try this: '{}'".format(recommended_command))

# remind user to check extensions if we can not find a command to recommend
if isinstance(az_error, CommandNotFoundError) \
and not az_error.recommendations and self.prog == 'az' \
and use_dynamic_install == 'no':
az_error.set_recommendation(EXTENSION_REFERENCE)

az_error.set_recommendation(OVERVIEW_REFERENCE.format(command=self.prog))

if not caused_by_extension_not_installed:
az_error.print_error()
az_error.send_telemetry()
recommender = CommandRecommender(*command_arguments, error_msg, cli_ctx)
recommender.set_help_examples(self.get_examples(command_name_inferred))
recommended_command = recommender.recommend_a_command()
if recommended_command:
az_error.set_recommendation("Try this: '{}'".format(recommended_command))

# remind user to check extensions if we can not find a command to recommend
if isinstance(az_error, CommandNotFoundError) \
and not az_error.recommendations and self.prog == 'az' \
and use_dynamic_install == 'no':
az_error.set_recommendation(EXTENSION_REFERENCE)

az_error.set_recommendation(OVERVIEW_REFERENCE.format(command=self.prog))

az_error.print_error()
az_error.send_telemetry()

self.exit(2)