diff --git a/azext_iot/_params.py b/azext_iot/_params.py index 8d0520fb4..dac797ad9 100644 --- a/azext_iot/_params.py +++ b/azext_iot/_params.py @@ -96,6 +96,12 @@ def load_arguments(self, _): context.argument( "module_id", options_list=["--module-id", "-m"], help="Target Module." ) + context.argument( + "device_connection_string", + options_list=["--device-connection-string", "--dcs"], + help="Target device connection string." + "This bypasses the IoT Hub registry and generates the SAS token directly from the supplied symmetric key without further validation." + ) context.argument( "key_type", options_list=["--key-type", "--kt"], diff --git a/azext_iot/_validators.py b/azext_iot/_validators.py index 822726a3c..a6ef0dc62 100644 --- a/azext_iot/_validators.py +++ b/azext_iot/_validators.py @@ -17,12 +17,14 @@ def mode2_iot_login_handler(cmd, namespace): iot_cmd_type = None entity_value = None - if 'hub_name' in args: + if 'device_connection_string' in args: + entity_value = 'NA' + elif 'hub_name' in args: iot_cmd_type = 'IoT Hub' entity_value = args['hub_name'] elif 'dps_name' in args: iot_cmd_type = 'DPS' - entity_value = args['dps_name'] + entity_value = args['dps_name'] if not any([login_value, entity_value]): raise CLIError(error_no_hub_or_login_on_input(iot_cmd_type)) diff --git a/azext_iot/operations/hub.py b/azext_iot/operations/hub.py index 22ec18854..e163b6c06 100644 --- a/azext_iot/operations/hub.py +++ b/azext_iot/operations/hub.py @@ -1611,6 +1611,7 @@ def iot_get_sas_token( cmd, hub_name=None, device_id=None, + device_connection_string=None, policy_name="iothubowner", key_type="primary", duration=3600, @@ -1639,6 +1640,7 @@ def iot_get_sas_token( cmd, hub_name, device_id, + device_connection_string, module_id, policy_name, key_type, @@ -1653,6 +1655,7 @@ def _iot_build_sas_token( cmd, hub_name=None, device_id=None, + device_connection_string=None, module_id=None, policy_name="iothubowner", key_type="primary", @@ -1665,6 +1668,22 @@ def _iot_build_sas_token( parse_iot_device_module_connection_string, ) + uri = None + policy = None + key = None + + if device_connection_string: + try: + parsed_device_cs = parse_iot_device_connection_string(device_connection_string) + except ValueError as e: + logger.debug(e) + raise CLIError("This device does not support SAS auth.") + + uri = "{}/devices/{}".format(parsed_device_cs["HostName"], parsed_device_cs["DeviceId"]) + key = parsed_device_cs["SharedAccessKey"] + + return SasTokenAuthentication(uri, policy, key, duration) + discovery = IotHubDiscovery(cmd) target = discovery.get_target( hub_name=hub_name, @@ -1672,9 +1691,7 @@ def _iot_build_sas_token( policy_name=policy_name, login=login, ) - uri = None - policy = None - key = None + if device_id: logger.info( 'Obtaining device "%s" details from registry, using IoT Hub policy "%s"',