Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a9ec74b
Updated to support multihub in iot central
Jackbk Apr 14, 2020
22dc92f
Merge branch 'dev' into multihub-3
Jackbk Apr 14, 2020
aeb11e4
Fixed iot central unit tests
Jackbk Apr 14, 2020
553dc45
Merge branch 'multihub-3' of https://github.com/Jackbk/azure-iot-cli-…
Jackbk Apr 14, 2020
75aaf95
Enable iot central test
Jackbk Apr 14, 2020
1949a9c
Added newline to end of _events
Jackbk Apr 14, 2020
200317b
Merge branch 'dev' into multihub-3
Jackbk Apr 14, 2020
7b5e0c9
Merge branch 'dev' into multihub-3
Jackbk Apr 20, 2020
c7a79fc
Merge branch 'multihub-3' of https://github.com/Jackbk/azure-iot-cli-…
Jackbk Apr 20, 2020
5356b22
Updated re PR comments
Jackbk Apr 21, 2020
2b9c04b
Updated re pr comments
Jackbk Apr 21, 2020
7920db4
Updated help and error information for iot central token
Jackbk Apr 21, 2020
da16e6a
Fixed grammar
Jackbk Apr 21, 2020
4cf0562
Merge branch 'dev' into multihub-3
Jackbk Apr 24, 2020
d0b15b6
Updated re PR comments. Moved device-twin to app device-twin
Jackbk Apr 28, 2020
336bdfc
Added central test for app device-twin
Jackbk Apr 28, 2020
dc2a8f7
no message
Jackbk Apr 28, 2020
63c0d7c
Formatted document
Jackbk Apr 28, 2020
dc7421c
Added additional help to errors
Jackbk Apr 28, 2020
d71cc59
Fixed lint issue
Jackbk Apr 28, 2020
953294b
Fixed linting
Jackbk Apr 28, 2020
0ec4f52
Updated test fixture
Jackbk Apr 28, 2020
8afe9f6
Added fixture
Jackbk Apr 28, 2020
12bd1ff
Added test test_get_aad_token
Jackbk Apr 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions azext_iot/common/_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,12 @@ def _find_iot_dps_from_list(all_dps, dps_name):


def get_iot_central_tokens(cmd, app_id, central_api_uri):
def get_event_hub_token(app_id, iotcAccessToken):
import requests
url = "https://{}/v1-beta/applications/{}/diagnostics/sasTokens".format(central_api_uri, app_id)
response = requests.post(url, headers={'Authorization': 'Bearer {}'.format(iotcAccessToken)})
return response.json()

import requests
Comment thread
Jackbk marked this conversation as resolved.
aad_token = _get_aad_token(cmd, resource="https://apps.azureiotcentral.com")['accessToken']

tokens = get_event_hub_token(app_id, aad_token)
url = "https://{}.{}/system/iothubs/generateSasTokens".format(app_id, central_api_uri)
response = requests.post(url, headers={'Authorization': 'Bearer {}'.format(aad_token)})
tokens = response.json()

if tokens.get('error'):
raise CLIError(
Expand All @@ -259,10 +256,6 @@ def get_event_hub_token(app_id, iotcAccessToken):
return tokens


def get_iot_hub_token_from_central_app_id(cmd, app_id, central_api_uri):
return get_iot_central_tokens(cmd, app_id, central_api_uri)['iothubTenantSasToken']['sasToken']


def get_iot_pnp_connection_string(
cmd,
endpoint,
Expand Down
47 changes: 30 additions & 17 deletions azext_iot/operations/central.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from knack.util import CLIError
from azext_iot._factory import _bind_sdk
from azext_iot.common._azure import get_iot_hub_token_from_central_app_id
from azext_iot.common.shared import SdkType
from azext_iot.common.utility import unpack_msrest_error, init_monitoring
from azext_iot.common.sas_token_auth import BasicSasTokenAuthentication
Expand All @@ -18,17 +17,28 @@ def find_between(s, start, end):


def iot_central_device_show(
cmd, device_id, app_id, central_api_uri="api.azureiotcentral.com"
cmd, device_id, app_id, central_api_uri="azureiotcentral.com"
):
sasToken = get_iot_hub_token_from_central_app_id(cmd, app_id, central_api_uri)
endpoint = find_between(sasToken, "SharedAccessSignature sr=", "&sig=")
target = {"entity": endpoint}
auth = BasicSasTokenAuthentication(sas_token=sasToken)
service_sdk, errors = _bind_sdk(target, SdkType.service_sdk, auth=auth)
try:
return service_sdk.get_twin(device_id)
except errors.CloudError as e:
raise CLIError(unpack_msrest_error(e))
from azext_iot.common._azure import get_iot_central_tokens

tokens = get_iot_central_tokens(cmd, app_id, central_api_uri)
exception = None

# The device could be in any hub associated with the given app.
# We must search through each IoT Hub until device is found.
Comment thread
Jackbk marked this conversation as resolved.
for token_group in tokens.values():
Comment thread
Jackbk marked this conversation as resolved.
sas_token = token_group["iothubTenantSasToken"]["sasToken"]
endpoint = find_between(sas_token, "SharedAccessSignature sr=", "&sig=")
target = {"entity": endpoint}
auth = BasicSasTokenAuthentication(sas_token=sas_token)
service_sdk, errors = _bind_sdk(target, SdkType.service_sdk, auth=auth)
try:
return service_sdk.get_twin(device_id)
except errors.CloudError as e:
if exception is None:
exception = CLIError(unpack_msrest_error(e))

raise exception


def iot_central_device_capability_model_show(
Expand All @@ -49,7 +59,7 @@ def iot_central_validate_messages(
repair=False,
properties=None,
yes=False,
central_api_uri="api.azureiotcentral.com",
central_api_uri="azureiotcentral.com",
):
provider = CentralDeviceProvider(cmd, app_id)
_events3_runner(
Expand Down Expand Up @@ -79,7 +89,7 @@ def iot_central_monitor_events(
repair=False,
properties=None,
yes=False,
central_api_uri="api.azureiotcentral.com",
central_api_uri="azureiotcentral.com",
):
_events3_runner(
cmd=cmd,
Expand Down Expand Up @@ -119,13 +129,16 @@ def _events3_runner(

from azext_iot.operations.events3 import _builders, _events

eventHubTarget = _builders.EventTargetBuilder().build_central_event_hub_target(
eventHubTargets = _builders.EventTargetBuilder().build_central_event_hub_target(
Comment thread
Jackbk marked this conversation as resolved.
Outdated
cmd, app_id, central_api_uri
)
executorTargets = []

_events.executor(
eventHubTarget,
consumer_group=consumer_group,
for target in eventHubTargets:
executorTargets.append(_events.executorData(target, consumer_group))

_events.nExecutor(
executorTargets,
enqueued_time=enqueued_time,
properties=properties,
timeout=timeout,
Expand Down
160 changes: 98 additions & 62 deletions azext_iot/operations/events3/_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,133 +2,169 @@
import uamqp

from azext_iot.common.sas_token_auth import SasTokenAuthentication
from azext_iot.common.utility import (parse_entity, unicode_binary_map, url_encode_str)
from azext_iot.common.utility import parse_entity, unicode_binary_map, url_encode_str

# To provide amqp frame trace
DEBUG = False


class AmqpBuilder():
class AmqpBuilder:
@classmethod
def build_iothub_amqp_endpoint_from_target(cls, target, duration=360):
hub_name = target['entity'].split('.')[0]
user = "{}@sas.root.{}".format(target['policy'], hub_name)
sas_token = SasTokenAuthentication(target['entity'], target['policy'],
target['primarykey'], duration).generate_sas_token()
return url_encode_str(user) + ":{}@{}".format(url_encode_str(sas_token), target['entity'])
hub_name = target["entity"].split(".")[0]
user = "{}@sas.root.{}".format(target["policy"], hub_name)
sas_token = SasTokenAuthentication(
target["entity"], target["policy"], target["primarykey"], duration
).generate_sas_token()
return url_encode_str(user) + ":{}@{}".format(
url_encode_str(sas_token), target["entity"]
)


class EventTargetBuilder():

class EventTargetBuilder:
def __init__(self):
self.eventLoop = asyncio.new_event_loop()
asyncio.set_event_loop(self.eventLoop)

def build_iot_hub_target(self, target):
return self.eventLoop.run_until_complete(self._build_iot_hub_target_async(target))
return self.eventLoop.run_until_complete(
self._build_iot_hub_target_async(target)
)

def build_central_event_hub_target(self, cmd, app_id, central_api_uri):
return self.eventLoop.run_until_complete(self._build_central_event_hub_target_async(cmd, app_id, central_api_uri))
return self.eventLoop.run_until_complete(
self._build_central_event_hub_target_async(cmd, app_id, central_api_uri)
)

def _build_auth_container(self, target):
sas_uri = 'sb://{}/{}'.format(target['events']['endpoint'], target['events']['path'])
return uamqp.authentication.SASTokenAsync.from_shared_access_key(sas_uri, target['policy'], target['primarykey'])
sas_uri = "sb://{}/{}".format(
target["events"]["endpoint"], target["events"]["path"]
)
return uamqp.authentication.SASTokenAsync.from_shared_access_key(
sas_uri, target["policy"], target["primarykey"]
)

def _build_auth_container_from_token(self, endpoint, path, token, tokenExpiry):
sas_uri = 'sb://{}/{}'.format(endpoint, path)
return uamqp.authentication.SASTokenAsync(audience=sas_uri, uri=sas_uri, expires_at=tokenExpiry, token=token)
sas_uri = "sb://{}/{}".format(endpoint, path)
return uamqp.authentication.SASTokenAsync(
audience=sas_uri, uri=sas_uri, expires_at=tokenExpiry, token=token
)

async def _query_meta_data(self, endpoint, path, auth):
source = uamqp.address.Source(endpoint)
receive_client = uamqp.ReceiveClientAsync(source, auth=auth, timeout=30000, debug=DEBUG)
receive_client = uamqp.ReceiveClientAsync(
source, auth=auth, timeout=30000, debug=DEBUG
)
try:
await receive_client.open_async()
message = uamqp.Message(application_properties={'name': path})
message = uamqp.Message(application_properties={"name": path})

response = await receive_client.mgmt_request_async(
message,
b'READ',
op_type=b'com.microsoft:eventhub',
status_code_field=b'status-code',
description_fields=b'status-description',
timeout=30000
b"READ",
op_type=b"com.microsoft:eventhub",
status_code_field=b"status-code",
description_fields=b"status-description",
timeout=30000,
)
test = response.get_data()
return test
finally:
await receive_client.close_async()

async def _evaluate_redirect(self, endpoint):
source = uamqp.address.Source('amqps://{}/messages/events/$management'.format(endpoint))
receive_client = uamqp.ReceiveClientAsync(source, timeout=30000, prefetch=1, debug=DEBUG)
source = uamqp.address.Source(
"amqps://{}/messages/events/$management".format(endpoint)
)
receive_client = uamqp.ReceiveClientAsync(
source, timeout=30000, prefetch=1, debug=DEBUG
)

try:
await receive_client.open_async()
await receive_client.receive_message_batch_async(max_batch_size=1)
except uamqp.errors.LinkRedirect as redirect:
redirect = unicode_binary_map(parse_entity(redirect))
result = {}
result['events'] = {}
result['events']['endpoint'] = redirect['hostname']
result['events']['path'] = redirect['address'].replace('amqps://', '').split('/')[1]
result['events']['address'] = redirect['address']
result["events"] = {}
result["events"]["endpoint"] = redirect["hostname"]
result["events"]["path"] = (
redirect["address"].replace("amqps://", "").split("/")[1]
)
result["events"]["address"] = redirect["address"]
return redirect, result
finally:
await receive_client.close_async()

async def _build_central_event_hub_target_async(self, cmd, app_id, central_api_uri):
from azext_iot.common._azure import get_iot_central_tokens

tokens = get_iot_central_tokens(cmd, app_id, central_api_uri)
eventHubToken = tokens['eventhubSasToken']
hostnameWithoutPrefix = eventHubToken['hostname'].split("/")[2]
endpoint = hostnameWithoutPrefix
path = eventHubToken["entityPath"]
tokenExpiry = tokens['expiry']
auth = self._build_auth_container_from_token(endpoint, path, eventHubToken['sasToken'], tokenExpiry)
address = "amqps://{}/{}/$management".format(hostnameWithoutPrefix, eventHubToken["entityPath"])
async def create_single_iotc_eventhub_target_async(self, tokens):
event_hub_token = tokens["eventhubSasToken"]
hostname_without_prefix = event_hub_token["hostname"].split("/")[2]
endpoint = hostname_without_prefix
path = event_hub_token["entityPath"]
token_expiry = tokens["expiry"]
auth = self._build_auth_container_from_token(
endpoint, path, event_hub_token["sasToken"], token_expiry
)
address = "amqps://{}/{}/$management".format(
hostname_without_prefix, event_hub_token["entityPath"]
)
meta_data = await self._query_meta_data(address, path, auth)
partition_count = meta_data[b'partition_count']
partition_count = meta_data[b"partition_count"]
partition_ids = []
for i in range(int(partition_count)):
partition_ids.append(str(i))
partitions = partition_ids
auth = self._build_auth_container_from_token(endpoint, path, eventHubToken['sasToken'], tokenExpiry)

eventHubTarget = {
'endpoint': endpoint,
'path': path,
'auth': auth,
'partitions': partitions
auth = self._build_auth_container_from_token(
endpoint, path, event_hub_token["sasToken"], token_expiry
)

event_hub_target = {
"endpoint": endpoint,
"path": path,
"auth": auth,
"partitions": partitions,
}

return eventHubTarget
return event_hub_target

async def _build_central_event_hub_target_async(self, cmd, app_id, central_api_uri):
from azext_iot.common._azure import get_iot_central_tokens

all_tokens = get_iot_central_tokens(cmd, app_id, central_api_uri)
targets = [
await self.create_single_iotc_eventhub_target_async(tokens)
Comment thread
Jackbk marked this conversation as resolved.
for tokens in all_tokens.values()
]

return targets

async def _build_iot_hub_target_async(self, target):
if 'events' not in target:
if "events" not in target:
endpoint = AmqpBuilder.build_iothub_amqp_endpoint_from_target(target)
_, update = await self._evaluate_redirect(endpoint)
target['events'] = update['events']
endpoint = target['events']['endpoint']
path = target['events']['path']
target["events"] = update["events"]
endpoint = target["events"]["endpoint"]
path = target["events"]["path"]
auth = self._build_auth_container(target)
meta_data = await self._query_meta_data(target['events']['address'], target['events']['path'], auth)
partition_count = meta_data[b'partition_count']
meta_data = await self._query_meta_data(
target["events"]["address"], target["events"]["path"], auth
)
partition_count = meta_data[b"partition_count"]
partition_ids = []
for i in range(int(partition_count)):
partition_ids.append(str(i))
target['events']['partition_ids'] = partition_ids
target["events"]["partition_ids"] = partition_ids
else:
endpoint = target['events']['endpoint']
path = target['events']['path']
partitions = target['events']['partition_ids']
endpoint = target["events"]["endpoint"]
path = target["events"]["path"]
partitions = target["events"]["partition_ids"]
auth = self._build_auth_container(target)

eventHubTarget = {
'endpoint': endpoint,
'path': path,
'auth': auth,
'partitions': partitions
"endpoint": endpoint,
"path": path,
"auth": auth,
"partitions": partitions,
}

return eventHubTarget
Loading