diff --git a/src/azure-cli-core/azure/cli/core/parser.py b/src/azure-cli-core/azure/cli/core/parser.py index 58c1d60d9e9..030e295103b 100644 --- a/src/azure-cli-core/azure/cli/core/parser.py +++ b/src/azure-cli-core/azure/cli/core/parser.py @@ -318,6 +318,27 @@ def _get_extension_command_tree(self): return None return EXT_CMD_TREE + def _get_all_extensions(self, cmd_chain, ext_set=None): + """Find all the extension names in cmd_chain (dict of extension command subtree). + An example of cmd_chain may look like (a command sub tree of the 'aks' command group): + { + "create": "aks-preview", + "update": "aks-preview", + "app": { + "up": "deploy-to-azure" + }, + "use-dev-spaces": "dev-spaces" + } + Then the resulting ext_set is {'aks-preview', 'deploy-to-azure', 'dev-spaces'} + """ + ext_set = set() if ext_set is None else ext_set + for key in cmd_chain: + if isinstance(cmd_chain[key], str): + ext_set.add(cmd_chain[key]) + else: + self._get_all_extensions(cmd_chain[key], ext_set) + return ext_set + def _search_in_extension_commands(self, command_str): """Search the command in an extension commands dict which mimics a prefix tree. If the value of the dict item is a string, then the key represents the end of a complete command @@ -346,7 +367,9 @@ def _search_in_extension_commands(self, command_str): cmd_chain = cmd_chain[part] except KeyError: return None - return None + # command_str is prefix of one or more complete commands. + all_exts = self._get_all_extensions(cmd_chain) + return list(all_exts) if all_exts else None def _get_extension_use_dynamic_install_config(self): cli_ctx = self.cli_ctx or (self.cli_help.cli_ctx if self.cli_help else None) @@ -365,7 +388,7 @@ def _get_extension_run_after_dynamic_install_config(self): default_value) if cli_ctx else default_value return run_after_extension_installed - def _check_value(self, action, value): # pylint: disable=too-many-statements, too-many-locals + def _check_value(self, action, value): # pylint: disable=too-many-statements, too-many-locals, too-many-branches # Override to customize the error message when a argument is not among the available choices # converted value must be one of the choices (if specified) if action.choices is not None and value not in action.choices: # pylint: disable=too-many-nested-blocks @@ -377,26 +400,41 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t command_name_inferred = self.prog error_msg = None if not self.command_source: - candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7) - if candidates: - # use the most likely candidate to replace the misspelled command - args = self.prog.split() + self._raw_arguments - args_inferred = [item if item != value else candidates[0] for item in args] - command_name_inferred = ' '.join(args_inferred).split('-')[0] - + candidates = [] + args = self.prog.split() + self._raw_arguments use_dynamic_install = self._get_extension_use_dynamic_install_config() - if use_dynamic_install != 'no' and not candidates: + if use_dynamic_install != 'no': # Check if the command is from an extension from azure.cli.core.util import roughly_parse_command - cmd_list = self.prog.split() + self._raw_arguments - command_str = roughly_parse_command(cmd_list[1:]) + command_str = roughly_parse_command(args[1:]) ext_name = self._search_in_extension_commands(command_str) + # The input command matches the prefix of one or more extension commands + if isinstance(ext_name, list): + if len(ext_name) > 1: + from knack.prompting import prompt_choice_list, NoTTYException + try: + prompt_msg = "The command requires the latest version of one of the following " \ + "extensions. You need to pick one to install:" + choice_idx = prompt_choice_list(prompt_msg, ext_name) + ext_name = ext_name[choice_idx] + use_dynamic_install = 'yes_without_prompt' + except NoTTYException: + error_msg = "{}{}\nUnable to prompt for selection as no tty available. Please " \ + "update or install the extension with 'az extension add --upgrade -n " \ + "'.".format(prompt_msg, ext_name) + logger.error(error_msg) + telemetry.set_user_fault(error_msg) + self.exit(2) + else: + ext_name = ext_name[0] + if ext_name: caused_by_extension_not_installed = True telemetry.set_command_details(command_str, - parameters=AzCliCommandInvoker._extract_parameter_names(cmd_list), # pylint: disable=protected-access + parameters=AzCliCommandInvoker._extract_parameter_names(args), # pylint: disable=protected-access extension_name=ext_name) run_after_extension_installed = self._get_extension_run_after_dynamic_install_config() + prompt_info = "" if use_dynamic_install == 'yes_without_prompt': logger.warning('The command requires the extension %s. ' 'It will be installed first.', ext_name) @@ -413,11 +451,12 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t try: go_on = prompt_y_n(prompt_msg, default='y') if go_on: + prompt_info = " with prompt" logger.warning(NO_PROMPT_CONFIG_MSG) except NoTTYException: - logger.warning("The command requires the extension %s.\n " - "Unable to prompt for extension install confirmation as no tty " - "available. %s", ext_name, NO_PROMPT_CONFIG_MSG) + error_msg = "The command requires the extension {}. " \ + "Unable to prompt for extension install confirmation as no tty " \ + "available. {}".format(ext_name, NO_PROMPT_CONFIG_MSG) go_on = False if go_on: from azure.cli.core.extension.operations import add_extension @@ -425,24 +464,32 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t if run_after_extension_installed: import subprocess import platform - exit_code = subprocess.call(cmd_list, shell=platform.system() == 'Windows') - error_msg = ("Extension {} dynamically installed and commands will be " - "rerun automatically.").format(ext_name) + exit_code = subprocess.call(args, shell=platform.system() == 'Windows') + error_msg = ("Extension {} dynamically installed{} and commands will be " + "rerun automatically.").format(ext_name, prompt_info) telemetry.set_user_fault(error_msg) self.exit(exit_code) else: with CommandLoggerContext(logger): - error_msg = 'Extension {} installed. Please rerun your command.'.format(ext_name) + error_msg = 'Extension {} installed{}. Please rerun your command.' \ + .format(ext_name, prompt_info) logger.error(error_msg) telemetry.set_user_fault(error_msg) self.exit(2) else: error_msg = "The command requires the latest version of extension {ext_name}. " \ - "To install, run 'az extension add --upgrade -n {ext_name}'.".format(ext_name=ext_name) + "To install, run 'az extension add --upgrade -n {ext_name}'.".format( + ext_name=ext_name) if not error_msg else error_msg if not error_msg: # parser has no `command_source`, value is part of command itself error_msg = "'{value}' is misspelled or not recognized by the system.".format(value=value) az_error = CommandNotFoundError(error_msg) + if not caused_by_extension_not_installed: + candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7) + if candidates: + # use the most likely candidate to replace the misspelled command + args_inferred = [item if item != value else candidates[0] for item in args] + command_name_inferred = ' '.join(args_inferred).split('-')[0] else: # `command_source` indicates command values have been parsed, value is an argument @@ -457,22 +504,22 @@ def _check_value(self, action, value): # pylint: disable=too-many-statements, t az_error.set_recommendation("Did you mean '{}' ?".format(candidates[0])) # recommend a command for user - recommender = CommandRecommender(*command_arguments, error_msg, cli_ctx) - recommender.set_help_examples(self.get_examples(command_name_inferred)) - recommended_command = recommender.recommend_a_command() - if recommended_command: - az_error.set_recommendation("Try this: '{}'".format(recommended_command)) - - # remind user to check extensions if we can not find a command to recommend - if isinstance(az_error, CommandNotFoundError) \ - and not az_error.recommendations and self.prog == 'az' \ - and use_dynamic_install == 'no': - az_error.set_recommendation(EXTENSION_REFERENCE) - - az_error.set_recommendation(OVERVIEW_REFERENCE.format(command=self.prog)) - if not caused_by_extension_not_installed: - az_error.print_error() - az_error.send_telemetry() + recommender = CommandRecommender(*command_arguments, error_msg, cli_ctx) + recommender.set_help_examples(self.get_examples(command_name_inferred)) + recommended_command = recommender.recommend_a_command() + if recommended_command: + az_error.set_recommendation("Try this: '{}'".format(recommended_command)) + + # remind user to check extensions if we can not find a command to recommend + if isinstance(az_error, CommandNotFoundError) \ + and not az_error.recommendations and self.prog == 'az' \ + and use_dynamic_install == 'no': + az_error.set_recommendation(EXTENSION_REFERENCE) + + az_error.set_recommendation(OVERVIEW_REFERENCE.format(command=self.prog)) + + az_error.print_error() + az_error.send_telemetry() self.exit(2)