diff --git a/azext_iot/_help.py b/azext_iot/_help.py index 7c178e9a4..ce8b2ac4f 100644 --- a/azext_iot/_help.py +++ b/azext_iot/_help.py @@ -472,6 +472,18 @@ authentication.symmetricKey.secondaryKey="" """ +helps[ + "iot hub module-identity renew-key" +] = """ + type: command + short-summary: Renew target keys of an IoT Hub device module with sas authentication. + examples: + - name: Renew the primary key. + text: az iot hub module-identity renew-key -m {module_name} -d {device_id} -n {iothub_name} --kt primary + - name: Swap the primary and secondary keys. + text: az iot hub module-identity renew-key -m {module_name} -d {device_id} -n {iothub_name} --kt swap +""" + helps[ "iot hub module-identity delete" ] = """ diff --git a/azext_iot/_params.py b/azext_iot/_params.py index 908152723..70e4a485c 100644 --- a/azext_iot/_params.py +++ b/azext_iot/_params.py @@ -513,6 +513,14 @@ def load_arguments(self, _): help="To remove all children.", ) + with self.argument_context("iot hub module-identity renew-key") as context: + context.argument( + "renew_key_type", + options_list=["--key-type", "--kt"], + arg_type=get_enum_type(RenewKeyType), + help="Target key type to regenerate.", + ) + with self.argument_context("iot hub distributed-tracing update") as context: context.argument( "sampling_mode", diff --git a/azext_iot/commands.py b/azext_iot/commands.py index 13203e0ce..cd14f3cf5 100644 --- a/azext_iot/commands.py +++ b/azext_iot/commands.py @@ -74,6 +74,7 @@ def load_command_table(self, _): getter_name="iot_device_module_show", setter_name="iot_device_module_update", ) + cmd_group.command("renew-key", "iot_device_module_key_regenerate") with self.command_group( "iot hub module-identity connection-string", command_type=iothub_ops diff --git a/azext_iot/operations/hub.py b/azext_iot/operations/hub.py index 62efb6f63..f98af95e2 100644 --- a/azext_iot/operations/hub.py +++ b/azext_iot/operations/hub.py @@ -885,6 +885,64 @@ def _parse_auth(parameters): return auth, pk, sk +def iot_device_module_key_regenerate( + cmd, + hub_name, + device_id, + module_id, + renew_key_type, + resource_group_name=None, + login=None, + etag=None, + auth_type_dataplane=None, +): + discovery = IotHubDiscovery(cmd) + target = discovery.get_target( + hub_name=hub_name, + resource_group_name=resource_group_name, + login=login, + auth_type=auth_type_dataplane, + ) + resolver = SdkResolver(target=target) + service_sdk = resolver.get_sdk(SdkType.service_sdk) + try: + module = service_sdk.modules.get_identity( + id=device_id, mid=module_id, raw=True + ).response.json() + except CloudError as e: + raise CLIError(unpack_msrest_error(e)) + + if module["authentication"]["type"] != "sas": + raise CLIError("Module authentication should be of type sas") + + pk = module["authentication"]["symmetricKey"]["primaryKey"] + sk = module["authentication"]["symmetricKey"]["secondaryKey"] + + if renew_key_type == RenewKeyType.primary.value: + pk = generate_key() + if renew_key_type == RenewKeyType.secondary.value: + sk = generate_key() + if renew_key_type == RenewKeyType.swap.value: + temp = pk + pk = sk + sk = temp + + module["authentication"]["symmetricKey"]["primaryKey"] = pk + module["authentication"]["symmetricKey"]["secondaryKey"] = sk + + try: + headers = {} + headers["If-Match"] = '"{}"'.format(etag if etag else "*") + return service_sdk.modules.create_or_update_identity( + id=device_id, + mid=module_id, + module=module, + custom_headers=headers, + ) + except CloudError as e: + raise CLIError(unpack_msrest_error(e)) + + def iot_device_module_list( cmd, device_id, diff --git a/azext_iot/tests/iothub/modules/test_iothub_modules_int.py b/azext_iot/tests/iothub/modules/test_iothub_modules_int.py index f9c7b1caf..cded3466b 100644 --- a/azext_iot/tests/iothub/modules/test_iothub_modules_int.py +++ b/azext_iot/tests/iothub/modules/test_iothub_modules_int.py @@ -201,6 +201,67 @@ def test_iothub_module_identity(self): expect_failure=True, ) + def test_iothub_module_renew_key(self): + device_count = 1 + device_ids = self.generate_device_names(device_count) + module_count = 2 + module_ids = self.generate_device_names(module_count) + + self.cmd( + f"iot hub device-identity create -d {device_ids[0]} -n {LIVE_HUB} -g {LIVE_RG}" + ).get_output_in_json() + + symmetric_key_module = self.cmd( + f"iot hub module-identity create -m {module_ids[0]} -d {device_ids[0]} -n {LIVE_HUB} -g {LIVE_RG}" + ).get_output_in_json() + + self.cmd( + f"iot hub module-identity create -m {module_ids[1]} -d {device_ids[0]} -n {LIVE_HUB} -g {LIVE_RG} --am x509_ca" + ) + + for auth_phase in DATAPLANE_AUTH_TYPES: + renew_primary_key_module = self.cmd( + self.set_cmd_auth_type( + f"iot hub module-identity renew-key -m {module_ids[0]} " + f"-d {device_ids[0]} -n {LIVE_HUB} -g {LIVE_RG} --kt primary", + auth_type=auth_phase, + ) + ).get_output_in_json() + assert ( + renew_primary_key_module["authentication"]["symmetricKey"]["primaryKey"] + != symmetric_key_module["authentication"]["symmetricKey"]["primaryKey"] + ) + assert ( + renew_primary_key_module["authentication"]["symmetricKey"][ + "secondaryKey" + ] + == symmetric_key_module["authentication"]["symmetricKey"]["secondaryKey"] + ) + + swap_keys_module = self.cmd( + self.set_cmd_auth_type( + f"iot hub module-identity renew-key -m {module_ids[0]} -d {device_ids[0]} -n {LIVE_HUB} -g {LIVE_RG} --kt swap", + auth_type=auth_phase, + ) + ).get_output_in_json() + assert ( + renew_primary_key_module["authentication"]["symmetricKey"]["primaryKey"] + == swap_keys_module["authentication"]["symmetricKey"]["secondaryKey"] + ) + assert ( + renew_primary_key_module["authentication"]["symmetricKey"]["secondaryKey"] + == swap_keys_module["authentication"]["symmetricKey"]["primaryKey"] + ) + + self.cmd( + self.set_cmd_auth_type( + f"iot hub module-identity renew-key -m {module_ids[1]} " + f"-d {device_ids[0]} -n {LIVE_HUB} -g {LIVE_RG} --kt secondary", + auth_type=auth_phase, + ), + expect_failure=True, + ) + def test_iothub_module_connection_string_show(self): device_count = 1 device_ids = self.generate_device_names(device_count)