diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md index b40cd0557126..4e731b864491 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md @@ -1,4 +1,8 @@ # Release History + +## 1.0.0b4 (2019-10-09) +This release has trivial internal changes only. No feature changes. + ## 1.0.0b1 (2019-09-10) **New features** diff --git a/sdk/eventhub/azure-eventhubs/HISTORY.md b/sdk/eventhub/azure-eventhubs/HISTORY.md index f0577a7640d9..85fc63509e55 100644 --- a/sdk/eventhub/azure-eventhubs/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs/HISTORY.md @@ -1,10 +1,22 @@ # Release History -## 5.0.0b4 (2019-XX-XX) +## 5.0.0b4 (2019-10-08) **New features** -- Support for tracing #7153 +- Added support for tracing (issue #7153). +- Added the capability of tracking last enqueued event properties of the partition to `EventHubConsumer` . + - Added new boolean type parameter`track_last_enqueued_event_properties` in method `EventHubClient.create_consumer()`. + - Added new property `last_enqueued_event_properties` of on `EventHubConsumer` which contains sequence_number, offset, enqueued_time and retrieval_time information. + - By default the capability is disabled as it will cost extra band width for transferring more information if turned on. + +**Breaking changes** + +- Removed support for IoT Hub direct connection. + - [EventHubs compatible connection string](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin) of an IotHub can be used to create `EventHubClient` and read properties or events from an IoT Hub. +- Removed support for sending EventData to IoT Hub. +- Removed parameter `exception` in method `close()` of `EventHubConsumer` and `EventHubProcuer`. +- Updated uAMQP dependency to 1.2.3. ## 5.0.0b3 (2019-09-10) diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index f456159d938f..e4fc68fd38e0 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -13,7 +13,7 @@ The Azure Event Hubs client library allows for publishing and consuming of Azure - Observe interesting operations and interactions happening within your business or other ecosystem, allowing loosely coupled systems to interact without the need to bind them together. - Receive events from one or more publishers, transform them to better meet the needs of your ecosystem, then publish the transformed events to a new stream for consumers to observe. -[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/5.0.0b3) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) +[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/5.0.0b4) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) ## Getting started @@ -119,6 +119,7 @@ The following sections provide several code snippets covering some of the most c - [Async publish events to an Event Hub](#async-publish-events-to-an-event-hub) - [Async consume events from an Event Hub](#async-consume-events-from-an-event-hub) - [Consume events using an Event Processor](#consume-events-using-an-event-processor) +- [Use EventHubClient to work with IoT Hub](#use-eventhubclient-to-work-with-iot-hub) ### Inspect an Event Hub @@ -359,6 +360,24 @@ if __name__ == '__main__': loop.run_until_complete(main()) ``` +### Use EventHubClient to work with IoT Hub + +You can use `EventHubClient` to work with IoT Hub as well. This is useful for receiving telemetry data of IoT Hub from the +linked EventHub. The associated connection string will not have send claims, hence sending events is not possible. + +- Please notice that the connection string needs to be for an + [Event Hub-compatible endpoint](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin) + e.g. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name" + +```python +from azure.eventhub import EventHubClient + +connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name' +client = EventHubClient.from_connection_string(connection_str) + +partition_ids = client.get_partition_ids() +``` + ## Troubleshooting ### General diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index a14da749ee78..44ea7d09b493 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -18,46 +18,34 @@ def __init__(self): self._client = None self._handler = None self._name = None + self._running = False + self._closed = False def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close(exc_val) + self.close() def _check_closed(self): - if self._error: + if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass - def _redirect(self, redirect): - self._redirected = redirect - self._running = False - self._close_connection() - def _open(self): - """ - Open the EventHubConsumer/EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. + """Open the EventHubConsumer/EventHubProducer using the supplied connection. """ # pylint: disable=protected-access if not self._running: if self._handler: self._handler.close() - if self._redirected: - alt_creds = { - "username": self._client._auth_config.get("iot_username"), - "password": self._client._auth_config.get("iot_password")} - else: - alt_creds = {} self._create_handler() self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access self._client._address.hostname, - self._client._get_auth(**alt_creds) + self._client._create_auth() )) while not self._handler.client_ready(): time.sleep(0.05) @@ -66,7 +54,8 @@ def _open(self): self._running = True def _close_handler(self): - self._handler.close() # close the link (sharing connection) or connection (not sharing) + if self._handler: + self._handler.close() # close the link (sharing connection) or connection (not sharing) self._running = False def _close_connection(self): @@ -76,8 +65,6 @@ def _close_connection(self): def _handle_exception(self, exception): if not self._running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") - return _handle_exception(exception, self) - return _handle_exception(exception, self) def _do_retryable_operation(self, operation, timeout=100000, **kwargs): @@ -102,16 +89,11 @@ def _do_retryable_operation(self, operation, timeout=100000, **kwargs): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - def close(self, exception=None): - # type:(Exception) -> None + def close(self): + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -122,16 +104,7 @@ def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: # type: ignore - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("{} handler is closed.".format(self._name)) if self._handler: self._handler.close() # this will close link if sharing connection. Otherwise close connection + self._running = False + self._closed = True diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 444edd15a8a1..939c78a1a458 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -7,7 +7,7 @@ import time from uamqp import errors, constants, compat # type: ignore -from azure.eventhub.error import EventHubError, ConnectError +from azure.eventhub.error import EventHubError from ..aio.error_async import _handle_exception log = logging.getLogger(__name__) @@ -19,46 +19,35 @@ def __init__(self): self._client = None self._handler = None self._name = None + self._running = False + self._closed = False async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close(exc_val) + await self.close() def _check_closed(self): - if self._error: + if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass - async def _redirect(self, redirect): - self._redirected = redirect - self._running = False - await self._close_connection() - async def _open(self): """ Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. """ # pylint: disable=protected-access if not self._running: if self._handler: await self._handler.close_async() - if self._redirected: - alt_creds = { - "username": self._client._auth_config.get("iot_username"), - "password": self._client._auth_config.get("iot_password")} - else: - alt_creds = {} self._create_handler() await self._handler.open_async(connection=await self._client._conn_manager.get_connection( self._client._address.hostname, - self._client._get_auth(**alt_creds) + self._client._create_auth() )) while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) @@ -67,7 +56,8 @@ async def _open(self): self._running = True async def _close_handler(self): - await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) + if self._handler: + await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) self._running = False async def _close_connection(self): @@ -103,16 +93,11 @@ async def _do_retryable_operation(self, operation, timeout=100000, **kwargs): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -123,18 +108,7 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: #type: ignore - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self._error = ConnectError(str(exception), exception) - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("This receive handler is now closed.") if self._handler: await self._handler.close_async() + self._running = False + self._closed = True diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 88b693d157ec..e679afaa1cf1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -11,10 +11,7 @@ from typing import Any, List, Dict, Union, TYPE_CHECKING from uamqp import authentication, constants # type: ignore -from uamqp import ( - Message, - AMQPClientAsync, -) # type: ignore +from uamqp import Message, AMQPClientAsync # type: ignore from azure.eventhub.common import parse_sas_token, EventPosition, \ EventHubSharedKeyCredential, EventHubSASTokenCredential @@ -58,23 +55,19 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() - def _create_auth(self, username=None, password=None): + def _create_auth(self): """ Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate the session. - :param username: The name of the shared access policy. - :type username: str - :param password: The shared access key. - :type password: str """ http_proxy = self._config.http_proxy transport_type = self._config.transport_type auth_timeout = self._config.auth_timeout if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = username or self._auth_config['username'] - password = password or self._auth_config['password'] + username = self._credential.policy + password = self._credential.key if "@sas.root" in username: return authentication.SASLPlain( self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) @@ -117,14 +110,10 @@ async def _try_delay(self, retried_times, last_exception, timeout_time=None, ent raise last_exception async def _management_request(self, mgmt_msg, op_type): - alt_creds = { - "username": self._auth_config.get("iot_username"), - "password": self._auth_config.get("iot_password") - } - retried_times = 0 + last_exception = None while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth(**alt_creds) + mgmt_auth = self._create_auth() mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing) try: conn = await self._conn_manager.get_connection(self._host, mgmt_auth) @@ -142,18 +131,8 @@ async def _management_request(self, mgmt_msg, op_type): retried_times += 1 finally: await mgmt_client.close_async() - - async def _iothub_redirect(self): - async with self._lock: - if self._is_iothub and not self._iothub_redirect_info: - if not self._redirect_consumer: - self._redirect_consumer = self.create_consumer(consumer_group='$default', - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - async with self._redirect_consumer: - await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access - self._redirect_consumer = None + log.info("%r returns an exception %r", self._container_id, last_exception) # pylint:disable=specify-parameter-names-in-call + raise last_exception async def get_properties(self): # type:() -> Dict[str, Any] @@ -168,8 +147,6 @@ async def get_properties(self): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name}) response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub') output = {} @@ -209,8 +186,6 @@ async def get_partition_properties(self, partition): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name, 'partition': partition}) response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition') @@ -246,11 +221,16 @@ def create_consumer( :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. :type owner_level: int - :param operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. + :type track_last_enqueued_event_properties: bool :param loop: An event loop. If not specified the default event loop will be used. :rtype: ~azure.eventhub.aio.consumer_async.EventHubConsumer @@ -264,22 +244,21 @@ def create_consumer( """ owner_level = kwargs.get("owner_level") - operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) loop = kwargs.get("loop") - path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self._address.hostname, path, consumer_group, partition_id) + self._address.hostname, self._address.path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, - prefetch=prefetch, loop=loop) + prefetch=prefetch, + track_last_enqueued_event_properties=track_last_enqueued_event_properties, loop=loop) return handler def create_producer( self, *, partition_id: str = None, - operation: str = None, send_timeout: float = None, loop: asyncio.AbstractEventLoop = None ) -> EventHubProducer: @@ -290,9 +269,6 @@ def create_producer( If omitted, the events will be distributed to available partitions via round-robin. :type partition_id: str - :param operation: An optional operation to be appended to the hostname in the target URL. - The value must start with `/` character. - :type operation: str :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float @@ -310,8 +286,6 @@ def create_producer( """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) - if operation: - target = target + operation send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 015b11190212..7aff6980b9e0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -5,14 +5,16 @@ import asyncio import uuid import logging -from typing import List +from typing import List, Any import time +from distutils.version import StrictVersion -from uamqp import errors, types # type: ignore +import uamqp # type: ignore +from uamqp import errors, types, utils # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore from azure.eventhub import EventData, EventPosition -from azure.eventhub.error import EventHubError, ConnectError, _error_handler +from azure.eventhub.error import _error_handler from ._consumer_producer_mixin_async import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -36,6 +38,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan _timeout = 0 _epoch_symbol = b'com.microsoft:epoch' _timeout_symbol = b'com.microsoft:timeout' + _receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric' def __init__( # pylint: disable=super-init-not-called self, client, source, **kwargs): @@ -55,6 +58,14 @@ def __init__( # pylint: disable=super-init-not-called :param owner_level: The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set. :type owner_level: int + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. + :type track_last_enqueued_event_properties: bool :param loop: An event loop. """ event_position = kwargs.get("event_position", None) @@ -62,11 +73,11 @@ def __init__( # pylint: disable=super-init-not-called owner_level = kwargs.get("owner_level", None) keep_alive = kwargs.get("keep_alive", None) auto_reconnect = kwargs.get("auto_reconnect", True) + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) loop = kwargs.get("loop", None) super(EventHubConsumer, self).__init__() self._loop = loop or asyncio.get_event_loop() - self._running = False self._client = client self._source = source self._offset = event_position @@ -77,8 +88,6 @@ def __init__( # pylint: disable=super-init-not-called self._auto_reconnect = auto_reconnect self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 - self._redirected = None - self._error = None self._link_properties = {} partition = self._source.split('/')[-1] self._partition = partition @@ -88,6 +97,8 @@ def __init__( # pylint: disable=super-init-not-called link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None + self._track_last_enqueued_event_properties = track_last_enqueued_event_properties + self._last_enqueued_event_properties = {} def __aiter__(self): return self @@ -105,6 +116,8 @@ async def __anext__(self): event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 + if self._track_last_enqueued_event_properties: + self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = await self._handle_exception(exception) @@ -115,17 +128,21 @@ async def __anext__(self): raise last_exception def _create_handler(self): - alt_creds = { - "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access - "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access - } - source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access + + if StrictVersion(uamqp.__version__) < StrictVersion("1.2.3"): # backward compatible until uamqp 1.2.3 released + desired_capabilities = {} + elif self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = {"desired_capabilities": utils.data_factory(types.AMQPArray(symbol_array))} + else: + desired_capabilities = {"desired_capabilities": None} + self._handler = ReceiveClientAsync( source, - auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, @@ -133,30 +150,14 @@ def _create_handler(self): error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, + receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete, + auto_complete=False, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access + **desired_capabilities, # pylint:disable=protected-access loop=self._loop) self._messages_iter = None - async def _redirect(self, redirect): - self._messages_iter = None - await super(EventHubConsumer, self)._redirect(redirect) - - async def _open(self): - """ - Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - # pylint: disable=protected-access - self._redirected = self._redirected or self._client._iothub_redirect_info - - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) - self._source = self._redirected.address - await super(EventHubConsumer, self)._open() - async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) @@ -178,16 +179,37 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self._offset = EventPosition(event_data.offset) data_batch.append(event_data) event_data._trace_link_message() # pylint:disable=protected-access + if data_batch: + self._offset = EventPosition(data_batch[-1].offset) + + if self._track_last_enqueued_event_properties and data_batch: + self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access + return data_batch async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): return await self._do_retryable_operation(self._receive, timeout=timeout, max_batch_size=max_batch_size, **kwargs) + @property + def last_enqueued_event_properties(self): + """ + The latest enqueued event information. This property will be updated each time an event is received when + the receiver is created with `track_last_enqueued_event_properties` being `True`. + The dict includes following information of the partition: + + - `sequence_number` + - `offset` + - `enqueued_time` + - `retrieval_time` + + :rtype: dict or None + """ + return self._last_enqueued_event_properties if self._track_last_enqueued_event_properties else None + @property def queue_size(self): # type: () -> int @@ -202,7 +224,7 @@ def queue_size(self): return 0 async def receive(self, *, max_batch_size=None, timeout=None): - # type: (int, float) -> List[EventData] + # type: (Any, int, float) -> List[EventData] """ Receive events asynchronously from the EventHub. @@ -235,16 +257,11 @@ async def receive(self, *, max_batch_size=None, timeout=None): return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -255,18 +272,4 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self._error = ConnectError(str(exception), exception) - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("This receive handler is now closed.") - if self._handler: - await self._handler.close_async() + await super(EventHubConsumer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py index ae1cd8084f3d..7bbc3b6153c1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py @@ -68,11 +68,6 @@ async def _handle_exception(exception, closable): # pylint:disable=too-many-bra if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): await closable._close_connection() # pylint:disable=protected-access - elif isinstance(exception, errors.LinkRedirect): - log.info("%r link redirect received. Redirecting...", name) - redirect = exception - if hasattr(closable, "_redirect"): - await closable._redirect(redirect) # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): await closable._close_handler() # pylint:disable=protected-access diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 3183dc051ac8..14af12444c0b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -9,8 +9,8 @@ import asyncio import logging -from azure.core.tracing import SpanKind -from azure.core.settings import settings +from azure.core.tracing import SpanKind # type: ignore +from azure.core.settings import settings # type: ignore from azure.eventhub import EventPosition, EventHubError from azure.eventhub.aio import EventHubClient diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 8ef299b0e6da..75b39217ddb4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -5,14 +5,14 @@ import uuid import asyncio import logging -from typing import Iterable, Union +from typing import Iterable, Union, Type import time from uamqp import types, constants, errors # type: ignore from uamqp import SendClientAsync # type: ignore -from azure.core.tracing import SpanKind -from azure.core.settings import settings +from azure.core.tracing import SpanKind, AbstractSpan # type: ignore +from azure.core.settings import settings # type: ignore from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError @@ -66,7 +66,6 @@ def __init__( # pylint: disable=super-init-not-called super(EventHubProducer, self).__init__() self._loop = loop or asyncio.get_event_loop() self._max_message_size_on_link = None - self._running = False self._client = client self._target = target self._partition = partition @@ -77,7 +76,6 @@ def __init__( # pylint: disable=super-init-not-called self._reconnect_backoff = 1 self._name = "EHProducer-{}".format(uuid.uuid4()) self._unsent_events = None - self._redirected = None self._error = None if partition: self._target += "/Partitions/" + partition @@ -90,7 +88,7 @@ def __init__( # pylint: disable=super-init-not-called def _create_handler(self): self._handler = SendClientAsync( self._target, - auth=self._client._get_auth(), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, @@ -101,18 +99,6 @@ def _create_handler(self): self._client._config.user_agent), # pylint:disable=protected-access loop=self._loop) - async def _open(self): - """ - Open the EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access - self._target = self._redirected.address - await super(EventHubProducer, self)._open() - async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) @@ -127,7 +113,7 @@ async def _send_event_data(self, timeout_time=None, last_exception=None): error = OperationTimeoutError("send operation timed out") log.info("%r send operation timed out. (%r)", self._name, error) raise error - self._handler._msg_timeout = remaining_time # pylint: disable=protected-access + self._handler._msg_timeout = remaining_time * 1000 # pylint: disable=protected-access self._handler.queue_message(*self._unsent_events) await self._handler.wait_async() self._unsent_events = self._handler.pending_messages @@ -250,16 +236,11 @@ async def send( else: await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -270,4 +251,4 @@ async def close(self, exception=None): :caption: Close down the handler. """ - await super(EventHubProducer, self).close(exception) + await super(EventHubProducer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 06d264b5b9ac..9a7ca85e3330 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -58,7 +58,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def _create_auth(self, username=None, password=None): + def _create_auth(self): """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -74,8 +74,8 @@ def _create_auth(self, username=None, password=None): # TODO: the following code can be refactored to create auth from classes directly instead of using if-else if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = username or self._auth_config['username'] - password = password or self._auth_config['password'] + username = self._credential.policy + password = self._credential.key if "@sas.root" in username: return authentication.SASLPlain( self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) @@ -119,14 +119,10 @@ def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_na raise last_exception def _management_request(self, mgmt_msg, op_type): - alt_creds = { - "username": self._auth_config.get("iot_username"), - "password": self._auth_config.get("iot_password") - } - retried_times = 0 + last_exception = None while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth(**alt_creds) + mgmt_auth = self._create_auth() mgmt_client = uamqp.AMQPClient(self._mgmt_target) try: conn = self._conn_manager.get_connection(self._host, mgmt_auth) #pylint:disable=assignment-from-none @@ -144,18 +140,9 @@ def _management_request(self, mgmt_msg, op_type): retried_times += 1 finally: mgmt_client.close() + log.info("%r returns an exception %r", self._container_id, last_exception) # pylint:disable=specify-parameter-names-in-call + raise last_exception - def _iothub_redirect(self): - with self._lock: - if self._is_iothub and not self._iothub_redirect_info: - if not self._redirect_consumer: - self._redirect_consumer = self.create_consumer(consumer_group='$default', - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - with self._redirect_consumer: - self._redirect_consumer._open_with_retry() # pylint: disable=protected-access - self._redirect_consumer = None def get_properties(self): # type:() -> Dict[str, Any] @@ -170,8 +157,6 @@ def get_properties(self): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name}) response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub') output = {} @@ -211,8 +196,6 @@ def get_partition_properties(self, partition): :rtype: dict :raises: ~azure.eventhub.ConnectError """ - if self._is_iothub and not self._iothub_redirect_info: - self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name, 'partition': partition}) response = self._management_request(mgmt_msg, op_type=b'com.microsoft:partition') @@ -244,11 +227,16 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. :type owner_level: int - :param operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. + :type track_last_enqueued_event_properties: bool :rtype: ~azure.eventhub.consumer.EventHubConsumer Example: @@ -261,19 +249,19 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs """ owner_level = kwargs.get("owner_level") - operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) - path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self._address.hostname, path, consumer_group, partition_id) + self._address.hostname, self._address.path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, - prefetch=prefetch) + prefetch=prefetch, + track_last_enqueued_event_properties=track_last_enqueued_event_properties) return handler - def create_producer(self, partition_id=None, operation=None, send_timeout=None): - # type: (str, str, float) -> EventHubProducer + def create_producer(self, partition_id=None, send_timeout=None): + # type: (str, float) -> EventHubProducer """ Create an producer to send EventData object to an EventHub. @@ -300,8 +288,6 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None): """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) - if operation: - target = target + operation send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 62ea791a5894..00f3e867b598 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -6,13 +6,14 @@ import logging import sys +import platform import uuid import time -import functools from abc import abstractmethod -from typing import Dict, Union, Any, TYPE_CHECKING +from typing import Union, Any, TYPE_CHECKING -from azure.eventhub import __version__, EventPosition +from uamqp import types # type: ignore +from azure.eventhub import __version__ from azure.eventhub.configuration import _Configuration from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address @@ -139,53 +140,18 @@ def __init__(self, host, event_hub_path, credential, **kwargs): self._address = _Address() self._address.hostname = host self._address.path = "/" + event_hub_path if event_hub_path else "" - self._auth_config = {} # type:Dict[str,str] self._credential = credential - if isinstance(credential, EventHubSharedKeyCredential): - self._username = credential.policy - self._password = credential.key - self._auth_config['username'] = self._username - self._auth_config['password'] = self._password - self._keep_alive = kwargs.get("keep_alive", 30) self._auto_reconnect = kwargs.get("auto_reconnect", True) self._mgmt_target = "amqps://{}/{}".format(self._host, self.eh_name) self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) - self._get_auth = functools.partial(self._create_auth) self._config = _Configuration(**kwargs) self._debug = self._config.network_tracing - self._is_iothub = False - self._iothub_redirect_info = None - self._redirect_consumer = None log.info("%r: Created the Event Hub client", self._container_id) - @classmethod - def _from_iothub_connection_string(cls, conn_str, **kwargs): - address, policy, key, _ = _parse_conn_str(conn_str) - hub_name = address.split('.')[0] - username = "{}@sas.root.{}".format(policy, hub_name) - password = _generate_sas_token(address, policy, key) - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - client = cls(host, "", EventHubSharedKeyCredential(username, password), **kwargs) - client._auth_config = { # pylint: disable=protected-access - 'iot_username': policy, - 'iot_password': key, - 'username': username, - 'password': password} - client._is_iothub = True # pylint: disable=protected-access - client._redirect_consumer = client.create_consumer(consumer_group='$default', # pylint: disable=protected-access, no-member - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - return client - @abstractmethod - def _create_auth(self, username=None, password=None): + def _create_auth(self): pass def _create_properties(self, user_agent=None): # pylint: disable=no-self-use @@ -196,13 +162,17 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use :rtype: dict """ properties = {} - properties["product"] = "eventhub.python" - properties["version"] = __version__ - properties["framework"] = "Python {}.{}.{}".format(*sys.version_info[0:3]) - properties["platform"] = sys.platform - - final_user_agent = 'azsdk-python-eventhub/{} ({}; {})'.format( - __version__, properties["framework"], sys.platform) + product = "azure-eventhub" + properties[types.AMQPSymbol("product")] = product + properties[types.AMQPSymbol("version")] = __version__ + framework = "Python {}.{}.{}, {}".format( + sys.version_info[0], sys.version_info[1], sys.version_info[2], platform.python_implementation() + ) + properties[types.AMQPSymbol("framework")] = framework + platform_str = platform.platform() + properties[types.AMQPSymbol("platform")] = platform_str + + final_user_agent = '{}/{} ({}, {})'.format(product, __version__, framework, platform_str) if user_agent: final_user_agent = '{}, {}'.format(final_user_agent, user_agent) @@ -210,8 +180,7 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use raise ValueError("The user-agent string cannot be more than {} in length." "Current user_agent string is: {} with length: {}".format( MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent))) - - properties["user-agent"] = final_user_agent + properties[types.AMQPSymbol("user-agent")] = final_user_agent return properties def _add_span_request_attributes(self, span): @@ -219,22 +188,11 @@ def _add_span_request_attributes(self, span): span.add_attribute("message_bus.destination", self._address.path) span.add_attribute("peer.address", self._address.hostname) - def _process_redirect_uri(self, redirect): - redirect_uri = redirect.address.decode('utf-8') - auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") - self._address = urlparse(auth_uri) - self._host = self._address.hostname - self.eh_name = self._address.path.lstrip('/') - self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) - self._mgmt_target = redirect_uri - if self._is_iothub: - self._iothub_redirect_info = redirect - @classmethod def from_connection_string(cls, conn_str, **kwargs): - """Create an EventHubClient from an EventHub/IotHub connection string. + """Create an EventHubClient from an EventHub connection string. - :param conn_str: The connection string of an eventhub or IoT hub + :param conn_str: The connection string of an eventhub :type conn_str: str :param event_hub_path: The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string. @@ -279,15 +237,11 @@ def from_connection_string(cls, conn_str, **kwargs): """ event_hub_path = kwargs.pop("event_hub_path", None) - is_iot_conn_str = conn_str.lstrip().lower().startswith("hostname") - if not is_iot_conn_str: # pylint:disable=no-else-return - address, policy, key, entity = _parse_conn_str(conn_str) - entity = event_hub_path or entity - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) + address, policy, key, entity = _parse_conn_str(conn_str) + entity = event_hub_path or entity + left_slash_pos = address.find("//") + if left_slash_pos != -1: + host = address[left_slash_pos + 2:] else: - return cls._from_iothub_connection_string(conn_str, **kwargs) + host = address + return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 3f2545829748..6e655ee7a8ad 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -11,9 +11,9 @@ import six from uamqp import BatchMessage, Message, types, constants # type: ignore -from uamqp.message import MessageHeader, MessageProperties # type: ignore +from uamqp.message import MessageHeader # type: ignore -from azure.core.settings import settings +from azure.core.settings import settings # type: ignore from azure.eventhub.error import EventDataError @@ -58,31 +58,29 @@ class EventData(object): PROP_PARTITION_KEY = b"x-opt-partition-key" PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY) PROP_TIMESTAMP = b"x-opt-enqueued-time" - PROP_DEVICE_ID = b"iothub-connection-device-id" + PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number" + PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset" + PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc" + PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc" - def __init__(self, body=None, to_device=None): + def __init__(self, body=None): """ Initialize EventData. :param body: The data to send in a single message. :type body: str, bytes or list - :param to_device: An IoT device to route to. - :type to_device: str """ - self._annotations = {} - self._app_properties = {} - self._msg_properties = MessageProperties() - if to_device: - self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) + self._last_enqueued_event_properties = {} if body and isinstance(body, list): - self.message = Message(body[0], properties=self._msg_properties) + self.message = Message(body[0]) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("EventData cannot be None.") else: - self.message = Message(body, properties=self._msg_properties) + self.message = Message(body) + self.message.annotations = {} def __str__(self): dic = { @@ -96,8 +94,6 @@ def __str__(self): dic['offset'] = str(self.offset) if self.enqueued_time: dic['enqueued_time'] = str(self.enqueued_time) - if self.device_id: - dic['device_id'] = str(self.device_id) if self.partition_key: dic['partition_key'] = str(self.partition_key) return str(dic) @@ -109,13 +105,12 @@ def _set_partition_key(self, value): :param value: The partition key to set. :type value: str or bytes """ - annotations = dict(self._annotations) + annotations = dict(self.message.annotations) annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] = value header = MessageHeader() header.durable = True self.message.annotations = annotations self.message.header = header - self._annotations = annotations def _trace_message(self, parent_span=None): """Add tracing information to this message. @@ -146,13 +141,30 @@ def _trace_link_message(self, parent_span=None): if traceparent: current_span.link(traceparent) - @staticmethod - def _from_message(message): - event_data = EventData(body='') + def _get_last_enqueued_event_properties(self): + if self._last_enqueued_event_properties: + return self._last_enqueued_event_properties + + if self.message.delivery_annotations: + self._last_enqueued_event_properties = { + "sequence_number": + self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), + "offset": + self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None), + "enqueued_time": + self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None), + "retrieval_time": + self.message.delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) + } + return self._last_enqueued_event_properties + + return None + + @classmethod + def _from_message(cls, message): + # pylint:disable=protected-access + event_data = cls(body='') event_data.message = message - event_data._msg_properties = message.properties # pylint:disable=protected-access - event_data._annotations = message.annotations # pylint:disable=protected-access - event_data._app_properties = message.application_properties # pylint:disable=protected-access return event_data @property @@ -162,7 +174,7 @@ def sequence_number(self): :rtype: int or long """ - return self._annotations.get(EventData.PROP_SEQ_NUMBER, None) + return self.message.annotations.get(EventData.PROP_SEQ_NUMBER, None) @property def offset(self): @@ -172,7 +184,7 @@ def offset(self): :rtype: str """ try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return self.message.annotations[EventData.PROP_OFFSET].decode('UTF-8') except (KeyError, AttributeError): return None @@ -183,21 +195,11 @@ def enqueued_time(self): :rtype: datetime.datetime """ - timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None) + timestamp = self.message.annotations.get(EventData.PROP_TIMESTAMP, None) if timestamp: return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None - @property - def device_id(self): - """ - The device ID of the event data object. This is only used for - IoT Hub implementations. - - :rtype: bytes - """ - return self._annotations.get(EventData.PROP_DEVICE_ID, None) - @property def partition_key(self): """ @@ -206,9 +208,9 @@ def partition_key(self): :rtype: bytes """ try: - return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] + return self.message.annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] except KeyError: - return self._annotations.get(EventData.PROP_PARTITION_KEY, None) + return self.message.annotations.get(EventData.PROP_PARTITION_KEY, None) @property def application_properties(self): @@ -217,7 +219,7 @@ def application_properties(self): :rtype: dict """ - return self._app_properties + return self.message.application_properties @application_properties.setter def application_properties(self, value): @@ -227,8 +229,7 @@ def application_properties(self, value): :param value: The application properties for the EventData. :type value: dict """ - self._app_properties = value - properties = None if value is None else dict(self._app_properties) + properties = None if value is None else dict(value) self.message.application_properties = properties @property @@ -238,7 +239,7 @@ def system_properties(self): :rtype: dict """ - return self._annotations + return self.message.annotations @property def body(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index ff996a57747a..87869aaa3a7b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -8,8 +8,10 @@ import logging import time from typing import List +from distutils.version import StrictVersion -from uamqp import types, errors # type: ignore +import uamqp # type: ignore +from uamqp import types, errors, utils # type: ignore from uamqp import ReceiveClient, Source # type: ignore from azure.eventhub.common import EventData, EventPosition @@ -38,6 +40,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan _timeout = 0 _epoch_symbol = b'com.microsoft:epoch' _timeout_symbol = b'com.microsoft:timeout' + _receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric' def __init__(self, client, source, **kwargs): """ @@ -54,15 +57,23 @@ def __init__(self, client, source, **kwargs): :param owner_level: The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set. :type owner_level: int + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. + :type track_last_enqueued_event_properties: bool """ event_position = kwargs.get("event_position", None) prefetch = kwargs.get("prefetch", 300) owner_level = kwargs.get("owner_level", None) keep_alive = kwargs.get("keep_alive", None) auto_reconnect = kwargs.get("auto_reconnect", True) + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) super(EventHubConsumer, self).__init__() - self._running = False self._client = client self._source = source self._offset = event_position @@ -71,10 +82,9 @@ def __init__(self, client, source, **kwargs): self._owner_level = owner_level self._keep_alive = keep_alive self._auto_reconnect = auto_reconnect - self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access + self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 self._link_properties = {} - self._redirected = None self._error = None partition = self._source.split('/')[-1] self._partition = partition @@ -84,6 +94,8 @@ def __init__(self, client, source, **kwargs): link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None + self._track_last_enqueued_event_properties = track_last_enqueued_event_properties + self._last_enqueued_event_properties = {} def __iter__(self): return self @@ -101,6 +113,8 @@ def __next__(self): event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 + if self._track_last_enqueued_event_properties: + self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = self._handle_exception(exception) @@ -111,17 +125,21 @@ def __next__(self): raise last_exception def _create_handler(self): - alt_creds = { - "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access - "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access - } - source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access + + if StrictVersion(uamqp.__version__) < StrictVersion("1.2.3"): # backward compatible until uamqp 1.2.3 released + desired_capabilities = {} + elif self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = {"desired_capabilities": utils.data_factory(types.AMQPArray(symbol_array))} + else: + desired_capabilities = {"desired_capabilities": None} + self._handler = ReceiveClient( source, - auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, @@ -129,28 +147,12 @@ def _create_handler(self): error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, + receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete, + auto_complete=False, properties=self._client._create_properties( # pylint:disable=protected-access - self._client._config.user_agent)) # pylint:disable=protected-access - self._messages_iter = None - - def _redirect(self, redirect): + self._client._config.user_agent), # pylint:disable=protected-access + **desired_capabilities) # pylint:disable=protected-access self._messages_iter = None - super(EventHubConsumer, self)._redirect(redirect) - - def _open(self): - """ - Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - # pylint: disable=protected-access - self._redirected = self._redirected or self._client._iothub_redirect_info - - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) - self._source = self._redirected.address - super(EventHubConsumer, self)._open() def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) @@ -172,16 +174,37 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self._offset = EventPosition(event_data.offset) data_batch.append(event_data) event_data._trace_link_message() # pylint:disable=protected-access + if data_batch: + self._offset = EventPosition(data_batch[-1].offset) + + if self._track_last_enqueued_event_properties and data_batch: + self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access + return data_batch def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): return self._do_retryable_operation(self._receive, timeout=timeout, max_batch_size=max_batch_size, **kwargs) + @property + def last_enqueued_event_properties(self): + """ + The latest enqueued event information. This property will be updated each time an event is received when + the receiver is created with `track_last_enqueued_event_properties` being `True`. + The dict includes following information of the partition: + + - `sequence_number` + - `offset` + - `enqueued_time` + - `retrieval_time` + + :rtype: dict or None + """ + return self._last_enqueued_event_properties if self._track_last_enqueued_event_properties else None + @property def queue_size(self): # type:() -> int @@ -229,16 +252,11 @@ def receive(self, max_batch_size=None, timeout=None): return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - def close(self, exception=None): - # type:(Exception) -> None + def close(self): # pylint:disable=useless-super-delegation + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -249,9 +267,6 @@ def close(self, exception=None): :caption: Close down the handler. """ - if self._messages_iter: - self._messages_iter.close() - self._messages_iter = None - super(EventHubConsumer, self).close(exception) + super(EventHubConsumer, self).close() next = __next__ # for python2.7 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 129cf14a3842..755925bca743 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -187,11 +187,6 @@ def _handle_exception(exception, closable): # pylint:disable=too-many-branches, if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access - elif isinstance(exception, errors.LinkRedirect): - log.info("%r link redirect received. Redirecting...", name) - redirect = exception - if hasattr(closable, "_redirect"): - closable._redirect(redirect) # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): closable._close_handler() # pylint:disable=protected-access diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 6e562bbdf051..a7e37980dc18 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -7,13 +7,13 @@ import uuid import logging import time -from typing import Iterable, Union +from typing import Iterable, Union, Type from uamqp import types, constants, errors # type: ignore from uamqp import SendClient # type: ignore -from azure.core.tracing import SpanKind -from azure.core.settings import settings +from azure.core.tracing import SpanKind, AbstractSpan # type: ignore +from azure.core.settings import settings # type: ignore from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError @@ -82,12 +82,10 @@ def __init__(self, client, target, **kwargs): super(EventHubProducer, self).__init__() self._max_message_size_on_link = None - self._running = False self._client = client self._target = target self._partition = partition self._timeout = send_timeout - self._redirected = None self._error = None self._keep_alive = keep_alive self._auto_reconnect = auto_reconnect @@ -106,7 +104,7 @@ def __init__(self, client, target, **kwargs): def _create_handler(self): self._handler = SendClient( self._target, - auth=self._client._get_auth(), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, @@ -115,18 +113,6 @@ def _create_handler(self): link_properties=self._link_properties, properties=self._client._create_properties(self._client._config.user_agent)) # pylint: disable=protected-access - def _open(self): - """ - Open the EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access - self._target = self._redirected.address - super(EventHubProducer, self)._open() - def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) @@ -141,7 +127,7 @@ def _send_event_data(self, timeout_time=None, last_exception=None): error = OperationTimeoutError("send operation timed out") log.info("%r send operation timed out. (%r)", self._name, error) raise error - self._handler._msg_timeout = remaining_time # pylint: disable=protected-access + self._handler._msg_timeout = remaining_time * 1000 # pylint: disable=protected-access self._handler.queue_message(*self._unsent_events) self._handler.wait() self._unsent_events = self._handler.pending_messages @@ -254,23 +240,18 @@ def send(self, event_data, partition_key=None, timeout=None): wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - if span_impl_type is not None: + if span_impl_type is not None and child is not None: with child: self._client._add_span_request_attributes(child) # pylint: disable=protected-access self._send_event_data_with_retry(timeout=timeout) else: self._send_event_data_with_retry(timeout=timeout) - def close(self, exception=None): # pylint:disable=useless-super-delegation - # type:(Exception) -> None + def close(self): # pylint:disable=useless-super-delegation + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -281,4 +262,4 @@ def close(self, exception=None): # pylint:disable=useless-super-delegation :caption: Close down the handler. """ - super(EventHubProducer, self).close(exception) + super(EventHubProducer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/conftest.py b/sdk/eventhub/azure-eventhubs/conftest.py index ed0212e85562..923ed270827b 100644 --- a/sdk/eventhub/azure-eventhubs/conftest.py +++ b/sdk/eventhub/azure-eventhubs/conftest.py @@ -15,8 +15,8 @@ # Ignore async tests for Python < 3.5 collect_ignore = [] if sys.version_info < (3, 5): - collect_ignore.append("tests/asynctests") - collect_ignore.append("tests/eventprocessor_tests") + collect_ignore.append("tests/livetest/asynctests") + collect_ignore.append("tests/eventprocessor") collect_ignore.append("features") collect_ignore.append("examples/async_examples") @@ -82,7 +82,7 @@ def cleanup_eventhub(eventhub_config, hub_name, client=None): client.delete_event_hub(hub_name) -@pytest.fixture() +@pytest.fixture(scope="session") def live_eventhub_config(): try: config = {} @@ -152,22 +152,6 @@ def invalid_policy(live_eventhub_config): live_eventhub_config['event_hub']) -@pytest.fixture() -def iot_connection_str(): - try: - return os.environ['IOTHUB_CONNECTION_STR'] - except KeyError: - pytest.skip("No IotHub connection string found.") - - -@pytest.fixture() -def device_id(): - try: - return os.environ['IOTHUB_DEVICE'] - except KeyError: - pytest.skip("No Iothub device ID found.") - - @pytest.fixture() def aad_credential(): try: diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py new file mode 100644 index 000000000000..53d2e626a7f5 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show running concurrent consumers. +""" + +import os +import time +import asyncio + +from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventPosition, EventHubSharedKeyCredential + +HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net +EVENT_HUB = os.environ['EVENT_HUB_NAME'] +USER = os.environ['EVENT_HUB_SAS_POLICY'] +KEY = os.environ['EVENT_HUB_SAS_KEY'] + +EVENT_POSITION = EventPosition("-1") + + +async def pump(client, partition): + consumer = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, + prefetch=5, track_last_enqueued_event_properties=True) + async with consumer: + total = 0 + start_time = time.time() + for event_data in await consumer.receive(timeout=10): + last_offset = event_data.offset + last_sn = event_data.sequence_number + print("Received: {}, {}".format(last_offset, last_sn)) + total += 1 + end_time = time.time() + run_time = end_time - start_time + print("Consumer last enqueued event properties: {}.".format(consumer.last_enqueued_event_properties)) + print("Received {} messages in {} seconds".format(total, run_time)) + + +loop = asyncio.get_event_loop() +client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), + network_tracing=False) +tasks = [asyncio.ensure_future(pump(client, "0"))] +loop.run_until_complete(asyncio.wait(tasks)) diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py b/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py deleted file mode 100644 index ecc935669d13..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an IoT Hub partition. -""" -import os - -from azure.eventhub import EventHubClient, EventPosition - -iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - -client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) -consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), - operation='/messages/events') - -with consumer: - received = consumer.receive(timeout=5) - print(received) diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py b/sdk/eventhub/azure-eventhubs/examples/iothub_send.py deleted file mode 100644 index c2f8f3379259..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an IoT Hub partition. -""" -import os -from azure.eventhub import EventData, EventHubClient - -iot_device_id = os.environ['IOTHUB_DEVICE'] -iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - -client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) -producer = client.create_producer(operation='/messages/devicebound') -with producer: - producer.send(EventData(b"A single event", to_device=iot_device_id)) diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py b/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py new file mode 100644 index 000000000000..576ef19089e6 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub partition. +""" +import os +import time +from azure.eventhub import EventHubClient, EventPosition, EventHubSharedKeyCredential + +HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + +USER = os.environ['EVENT_HUB_SAS_POLICY'] +KEY = os.environ['EVENT_HUB_SAS_KEY'] + +EVENT_POSITION = EventPosition("-1") +PARTITION = "0" + + +total = 0 +last_sn = -1 +last_offset = "-1" +client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), + network_tracing=False) + +consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, + event_position=EVENT_POSITION, prefetch=5000, + track_last_enqueued_event_properties=True) +with consumer: + start_time = time.time() + batch = consumer.receive(timeout=5) + for event_data in batch: + last_offset = event_data.offset + last_sn = event_data.sequence_number + print("Received: {}, {}".format(last_offset, last_sn)) + print(event_data.body_as_str()) + total += 1 + batch = consumer.receive(timeout=5) + print("Consumer last enqueued event properties: {}.".format(consumer.last_enqueued_event_properties)) + print("Received {} messages in {} seconds".format(total, time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index 52145f9222f0..8ff334ef971f 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -27,16 +27,6 @@ def create_eventhub_client(live_eventhub_config): return client -def create_eventhub_client_from_iothub_connection_string(live_eventhub_config): - # [START create_eventhub_client_iot_connstr] - import os - from azure.eventhub import EventHubClient - - iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - client = EventHubClient.from_connection_string(iot_connection_str) - # [END create_eventhub_client_iot_connstr] - - def test_example_eventhub_sync_send_and_receive(live_eventhub_config): # [START create_eventhub_client_connstr] import os diff --git a/sdk/eventhub/azure-eventhubs/setup.py b/sdk/eventhub/azure-eventhubs/setup.py index aae5cc60b638..7652ff91bdc7 100644 --- a/sdk/eventhub/azure-eventhubs/setup.py +++ b/sdk/eventhub/azure-eventhubs/setup.py @@ -37,7 +37,7 @@ exclude_packages = [ 'tests', - "tests.asynctests", + 'stress', 'examples', # Exclude packages that will be covered by PEP420 or nspkg 'azure', diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py new file mode 100644 index 000000000000..59b8fe76fec8 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -0,0 +1,266 @@ +import sys +import os +import logging +import threading +import time +import asyncio +from logging.handlers import RotatingFileHandler +from argparse import ArgumentParser +from azure.eventhub import EventHubClient, EventPosition, EventData, \ + EventHubConsumer, EventHubProducer, EventHubSharedKeyCredential, EventDataBatch +from azure.eventhub.aio import EventHubClient as EventHubClientAsync +from azure.identity import ClientSecretCredential + + +def stress_receive_sync(receiver, args, logger): + batch = receiver.receive(timeout=5) + return len(batch) + + +async def stress_receive_async(receiver, args, logger): + batch = await receiver.receive(timeout=5) + return len(batch) + + +def stress_receive_iterator_sync(receiver, args, logger): + duration = args.duration + deadline = time.time() + duration + total_count = 0 + logging_count = 0 + try: + for _ in receiver: + total_count += 1 + logging_count += 1 + if logging_count >= args.output_interval: + logger.info("Partition:%r, received:%r", receiver._partition, total_count) + logging_count -= args.output_interval + if time.time() > deadline: + break + finally: + return total_count + + +async def stress_receive_iterator_async(receiver, args, logger): + duration = args.duration + deadline = time.time() + duration + total_count = 0 + logging_count = 0 + try: + async for _ in receiver: + total_count += 1 + logging_count += 1 + if logging_count >= args.output_interval: + logger.info("Partition:%r, received:%r", receiver._partition, total_count) + logging_count -= args.output_interval + if time.time() > deadline: + break + finally: + return total_count + + +def stress_send_sync(producer: EventHubProducer, args, logger): + batch = producer.create_batch() + try: + while True: + event_data = EventData(body=b"D" * args.payload) + batch.try_add(event_data) + except ValueError: + producer.send(batch) + return len(batch) + + +async def stress_send_async(producer, args, logger): + batch = await producer.create_batch() + try: + while True: + event_data = EventData(body=b"D" * args.payload) + batch.try_add(event_data) + except ValueError: + await producer.send(batch) + return len(batch) + + +def get_logger(filename, method_name, level=logging.INFO, print_console=False): + stress_logger = logging.getLogger(method_name) + stress_logger.setLevel(level) + azure_logger = logging.getLogger("azure.eventhub") + azure_logger.setLevel(level) + uamqp_logger = logging.getLogger("uamqp") + uamqp_logger.setLevel(level) + + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + if print_console: + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not azure_logger.handlers: + azure_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + if not stress_logger.handlers: + stress_logger.addHandler(console_handler) + + if filename: + file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) + file_handler.setFormatter(formatter) + azure_logger.addHandler(file_handler) + uamqp_logger.addHandler(file_handler) + stress_logger.addHandler(file_handler) + + return stress_logger + + +class StressTestRunner(object): + def __init__(self, argument_parser): + self.argument_parser = argument_parser + self.argument_parser.add_argument("-m", "--method", required=True) + self.argument_parser.add_argument("--output_interval", type=float, default=1000) + self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + self.argument_parser.add_argument("--consumer", help="Consumer group name", default="$default") + self.argument_parser.add_argument("--offset", help="Starting offset", default="-1") + self.argument_parser.add_argument("-p", "--partitions", help="Comma seperated partition IDs", default="0") + self.argument_parser.add_argument("--conn-str", help="EventHub connection string", + default=os.environ.get('EVENT_HUB_PERF_CONN_STR')) + self.argument_parser.add_argument("--eventhub", help="Name of EventHub") + self.argument_parser.add_argument("--address", help="Address URI to the EventHub entity") + self.argument_parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + self.argument_parser.add_argument("--sas-key", help="Shared access key") + self.argument_parser.add_argument("--aad_client_id", help="AAD client id") + self.argument_parser.add_argument("--aad_secret", help="AAD secret") + self.argument_parser.add_argument("--aad_tenant_id", help="AAD tenant id") + self.argument_parser.add_argument("--payload", help="payload size", type=int, default=1024) + self.argument_parser.add_argument("--print_console", help="print to console", type=bool, default=False) + self.args, _ = parser.parse_known_args() + + self.running = False + + def create_client(self, client_class): + if self.args.conn_str: + client = client_class.from_connection_string( + self.args.conn_str, + event_hub_path=self.args.eventhub, network_tracing=False) + elif self.args.address: + client = client_class(host=self.args.address, + event_hub_path=self.args.eventhub, + credential=EventHubSharedKeyCredential(self.args.sas_policy, self.args.sas_key), + auth_timeout=240, + network_tracing=False) + elif self.args.aad_client_id: + client = client_class(host=self.args.address, + event_hub_path=self.args.eventhub, + credential=ClientSecretCredential( + self.args.aad_client_id, self.args.aad_secret, self.args.tenant_id + ), + network_tracing=False) + else: + raise ValueError("Argument error. Must have one of connection string, sas and aad credentials") + + return client + + def run(self): + method_name = self.args.method + if "async" in method_name: + loop = asyncio.get_event_loop() + loop.run_until_complete(self.run_async()) + else: + self.run_sync() + + def run_sync(self): + method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, + level=logging.INFO, print_console=self.args.print_console) + test_method = globals()[method_name] + client = self.create_client(EventHubClient) + self.running = True + if self.args.partitions.lower() != "all": + partitions = self.args.partitions.split(",") + else: + partitions = client.get_partition_ids() + threads = [] + for pid in partitions: + if "receive" in method_name: + worker = client.create_consumer(consumer_group=self.args.consumer, + partition_id=pid, + event_position=EventPosition(self.args.offset), + prefetch=300) + else: # "send" in method_name + worker = client.create_producer(partition_id=pid) + thread = threading.Thread(target=self.run_test_method, args=(test_method, worker, logger)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + + def stop(self): + self.running = False + + def run_test_method(self, test_method, worker, logger): + deadline = time.time() + self.args.duration + with worker: + total_processed = 0 + iter_processed = 0 + while self.running and time.time() < deadline: + try: + processed = test_method(worker, self.args, logger) + total_processed += processed + iter_processed += processed + if iter_processed >= self.args.output_interval: + logger.info("Partition:%r, Total processed: %r", worker._partition, total_processed) + iter_processed -= self.args.output_interval + except KeyboardInterrupt: + logger.info("Partition:%r, keyboard interrupted", worker._partition) + self.stop() + except Exception as e: + logger.exception("Partition:%r, %r failed:", worker._partition, type(worker)) + self.stop() + logger.info("Partition:%r, %r has finished testing", worker._partition, test_method) + + async def run_async(self): + method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, + level=logging.INFO, print_console=self.args.print_console) + test_method = globals()[method_name] + client = self.create_client(EventHubClientAsync) + self.running = True + if self.args.partitions.lower() != "all": + partitions = self.args.partitions.split(",") + else: + partitions = await client.get_partition_ids() + tasks = [] + for pid in partitions: + if "receive" in method_name: + worker = client.create_consumer(consumer_group=self.args.consumer, + partition_id=pid, + event_position=EventPosition(self.args.offset), + prefetch=300) + else: # "send" in method_name + worker = client.create_producer(partition_id=pid) + task = self.run_test_method_async(test_method, worker, logger) + tasks.append(task) + await asyncio.gather(*tasks) + + async def run_test_method_async(self, test_method, worker, logger): + deadline = time.time() + self.args.duration + async with worker: + total_processed = 0 + iter_processed = 0 + while self.running and time.time() < deadline: + try: + processed = await test_method(worker, self.args, logger) + total_processed += processed + iter_processed += processed + if iter_processed >= self.args.output_interval: + logger.info("Partition:%r, Total processed: %r", worker._partition, total_processed) + iter_processed -= self.args.output_interval + except KeyboardInterrupt: + logger.info("Partition:%r, keyboard interrupted", worker._partition) + self.stop() + except Exception as e: + logger.exception("Partition:%r, %r failed:", worker._partition, type(worker)) + self.stop() + logger.info("Partition:%r, %r has finished testing", worker._partition, test_method) + + +if __name__ == '__main__': + parser = ArgumentParser() + runner = StressTestRunner(parser) + runner.run() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py deleted file mode 100644 index 4ac63eef5d7f..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py +++ /dev/null @@ -1,85 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import asyncio -import pytest - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventPosition - - -async def pump(receiver, sleep=None): - messages = 0 - if sleep: - await asyncio.sleep(sleep) - async with receiver: - batch = await receiver.receive(timeout=10) - messages += len(batch) - return messages - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_receive_multiple_async(iot_connection_str): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - receivers = [] - for p in partitions: - receivers.append(client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1"), prefetch=10, operation='/messages/events')) - outputs = await asyncio.gather(*[pump(r) for r in receivers]) - - assert isinstance(outputs[0], int) and outputs[0] <= 10 - assert isinstance(outputs[1], int) and outputs[1] <= 10 - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_properties_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - properties = await client.get_properties() - assert properties["partition_ids"] == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_partition_ids_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_partition_properties_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partition_properties = await client.get_partition_properties("0") - assert partition_properties["id"] == "0" - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_receive_after_mgmt_ops_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') - async with receiver: - received = await receiver.receive(timeout=10) - assert len(received) == 0 - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_mgmt_ops_after_receive_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - async with receiver: - received = await receiver.receive(timeout=10) - assert len(received) == 0 - - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py deleted file mode 100644 index 1e3cae9eefa7..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py +++ /dev/null @@ -1,156 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import logging -import asyncio -import sys -import os -import argparse -import time -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub.aio import EventHubClient -from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager -from azure.eventhub import EventData - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub.eventprocessor") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("eph_test_async.log", logging.INFO) - - -class MyEventProcessor(PartitionProcessor): - async def close(self, reason, partition_context): - logger.info("PartitionProcessor closed (reason {}, id {})".format( - reason, - partition_context.partition_id - )) - - async def process_events(self, events, partition_context): - if events: - event = events[-1] - print("Processing id {}, offset {}, sq_number {})".format( - partition_context.partition_id, - event.offset, - event.sequence_number)) - await partition_context.update_checkpoint(event.offset, event.sequence_number) - - async def process_error(self, error, partition_context): - logger.info("Event Processor Error for partition {}, {!r}".format(partition_context.partition_id, error)) - - -async def wait_and_close(host, duration): - """ - Run EventProcessorHost for 30 seconds then shutdown. - """ - await asyncio.sleep(duration) - await host.stop() - - -async def pump(pid, sender, duration): - deadline = time.time() + duration - total = 0 - - try: - async with sender: - event_list = [] - while time.time() < deadline: - data = EventData(body=b"D" * 512) - event_list.append(data) - total += 1 - if total % 100 == 0: - await sender.send(event_list) - event_list = [] - logger.info("{}: Send total {}".format(pid, total)) - except Exception as err: - logger.error("{}: Send failed {}".format(pid, err)) - raise - print("{}: Final Sent total {}".format(pid, total)) - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_eph(live_eventhub): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--container", help="Lease container name", default="nocontextleases") - parser.add_argument("--eventhub", help="Name of EventHub", default=live_eventhub['event_hub']) - parser.add_argument("--namespace", help="Namespace of EventHub", default=live_eventhub['namespace']) - parser.add_argument("--suffix", help="Namespace of EventHub", default="servicebus.windows.net") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with", default=live_eventhub['key_name']) - parser.add_argument("--sas-key", help="Shared access key", default=live_eventhub['access_key']) - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if not args.namespace or not args.eventhub: - try: - import pytest - pytest.skip("Must specify '--namespace' and '--eventhub'") - except ImportError: - raise ValueError("Must specify '--namespace' and '--eventhub'") - - # Queue up some events in the Eventhub - conn_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( - live_eventhub['hostname'], - live_eventhub['key_name'], - live_eventhub['access_key'], - live_eventhub['event_hub']) - client = EventHubClient.from_connection_string(conn_str) - pumps = [] - for pid in ["0", "1"]: - sender = client.create_producer(partition_id=pid, send_timeout=0) - pumps.append(pump(pid, sender, 15)) - results = await asyncio.gather(*pumps, return_exceptions=True) - - assert not any(results) - - # Event loop and host - host = EventProcessor( - client, - live_eventhub['consumer_group'], - MyEventProcessor, - SamplePartitionManager() - ) - - tasks = asyncio.gather( - host.start(), - wait_and_close(host, args.duration), return_exceptions=True) - results = await tasks - assert not any(results) - - -if __name__ == '__main__': - config = {} - config['hostname'] = os.environ['EVENT_HUB_HOSTNAME'] - config['event_hub'] = os.environ['EVENT_HUB_NAME'] - config['key_name'] = os.environ['EVENT_HUB_SAS_POLICY'] - config['access_key'] = os.environ['EVENT_HUB_SAS_KEY'] - config['namespace'] = os.environ['EVENT_HUB_NAMESPACE'] - config['consumer_group'] = "$Default" - config['partition'] = "0" - loop = asyncio.get_event_loop() - loop.run_until_complete(test_long_running_eph(config)) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py deleted file mode 100644 index 50ababacf738..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import asyncio -import argparse -import time -import os -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("recv_test_async.log", logging.INFO) - - -async def pump(_pid, receiver, _args, _dl): - total = 0 - iteration = 0 - deadline = time.time() + _dl - - try: - async with receiver: - while time.time() < deadline: - batch = await receiver.receive(timeout=3) - size = len(batch) - total += size - iteration += 1 - if size == 0: - print("{}: No events received, queue size {}, delivered {}".format( - _pid, - receiver.queue_size, - total)) - elif iteration >= 5: - iteration = 0 - print("{}: total received {}, last sn={}, last offset={}".format( - _pid, - total, - batch[-1].sequence_number, - batch[-1].offset)) - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("Partition {} receiver failed: {}".format(_pid, e)) - raise - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_receive_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, auth_timeout=240, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if not args.partitions: - partitions = await client.get_partition_ids() - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - receiver = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300, - loop=loop) - pumps.append(pump(pid, receiver, args, args.duration)) - await asyncio.gather(*pumps) - - -if __name__ == '__main__': - asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py deleted file mode 100644 index 00279d168d70..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python - -""" -send test -""" - -import logging -import argparse -import time -import os -import asyncio -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventData, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("send_test_async.log", logging.INFO) - - -async def get_partitions(args): - eh_data = await args.get_properties() - return eh_data["partition_ids"] - - -async def pump(pid, sender, args, duration): - deadline = time.time() + duration - total = 0 - - try: - async with sender: - event_list = [] - while time.time() < deadline: - data = EventData(body=b"D" * args.payload) - event_list.append(data) - total += 1 - if total % 100 == 0: - await sender.send(event_list) - event_list = [] - logger.info("{}: Send total {}".format(pid, total)) - except Exception as err: - logger.error("{}: Send failed {}".format(pid, err)) - raise - print("{}: Final Sent total {}".format(pid, total)) - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_partition_send_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--payload", help="payload size", type=int, default=1024) - parser.add_argument("--partitions", help="Comma separated partition IDs") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - parser.add_argument("--logger-name", help="Unique log file ID") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - try: - if not args.partitions: - partitions = await client.get_partition_ids() - else: - pid_range = args.partitions.split("-") - if len(pid_range) > 1: - partitions = [str(i) for i in range(int(pid_range[0]), int(pid_range[1]) + 1)] - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - sender = client.create_producer(partition_id=pid, send_timeout=0) - pumps.append(pump(pid, sender, args, args.duration)) - results = await asyncio.gather(*pumps, return_exceptions=True) - assert not results - except Exception as e: - logger.error("EventHubProducer failed: {}".format(e)) - - -if __name__ == '__main__': - asyncio.run(test_long_running_partition_send_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/eventprocessor_tests/test_eventprocessor.py b/sdk/eventhub/azure-eventhubs/tests/eventprocessor/test_eventprocessor.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/eventprocessor_tests/test_eventprocessor.py rename to sdk/eventhub/azure-eventhubs/tests/eventprocessor/test_eventprocessor.py diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_auth_async.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_auth_async.py diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py similarity index 94% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py index 4406da855f59..b7d7142a6f51 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py @@ -218,19 +218,6 @@ async def test_create_batch_with_invalid_hostname_async(invalid_hostname): await sender.close() -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_create_batch_with_none_async(connection_str): - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() - batch_event_data = await sender.create_batch(max_size=300, partition_key="key") - try: - with pytest.raises(ValueError): - batch_event_data.try_add(EventData(None)) - finally: - await sender.close() - - @pytest.mark.liveTest @pytest.mark.asyncio async def test_create_batch_with_too_large_size_async(connection_str): diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_properties_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_properties_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py similarity index 87% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py index 2a2e4836c2d5..55e2fa60ff91 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py @@ -156,6 +156,12 @@ async def test_receive_batch_async(connstr_senders): received = await receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 + for event in received: + assert event.system_properties + assert event.sequence_number is not None + assert event.offset + assert event.enqueued_time + async def pump(receiver, sleep=None): messages = 0 @@ -314,3 +320,38 @@ async def test_receive_over_websocket_async(connstr_senders): received = await receiver.receive(max_batch_size=50, timeout=5) assert len(received) == 20 + + +@pytest.mark.asyncio +@pytest.mark.liveTest +async def test_receive_run_time_metric_async(connstr_senders): + from uamqp import __version__ as uamqp_version + from distutils.version import StrictVersion + if StrictVersion(uamqp_version) < StrictVersion('1.2.3'): + pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") + connection_str, senders = connstr_senders + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, + network_tracing=False) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", + event_position=EventPosition('@latest'), prefetch=500, + track_last_enqueued_event_properties=True) + + event_list = [] + for i in range(20): + event_list.append(EventData("Event Number {}".format(i))) + + async with receiver: + received = await receiver.receive(timeout=5) + assert len(received) == 0 + + senders[0].send(event_list) + + await asyncio.sleep(1) + + received = await receiver.receive(max_batch_size=50, timeout=5) + assert len(received) == 20 + assert receiver.last_enqueued_event_properties + assert receiver.last_enqueued_event_properties.get('sequence_number', None) + assert receiver.last_enqueued_event_properties.get('offset', None) + assert receiver.last_enqueued_event_properties.get('enqueued_time', None) + assert receiver.last_enqueued_event_properties.get('retrieval_time', None) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receiver_iterator_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receiver_iterator_async.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_receiver_iterator_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receiver_iterator_async.py diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py similarity index 50% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py index 05be713e2d8c..24258bc532ae 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py @@ -40,47 +40,3 @@ async def test_send_with_long_interval_async(connstr_receivers, sleep): received.extend(r.receive(timeout=5)) assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" - - -def pump(receiver): - messages = [] - with receiver: - batch = receiver.receive(timeout=1) - messages.extend(batch) - while batch: - batch = receiver.receive(timeout=1) - messages.extend(batch) - return messages - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_send_with_forced_conn_close_async(connstr_receivers, sleep): - pytest.skip("This test is similar to the above one") - connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() - try: - await sender.send(EventData(b"A single event")) - if sleep: - await asyncio.sleep(300) - else: - sender._handler._connection._conn.destroy() - await sender.send(EventData(b"A single event")) - await sender.send(EventData(b"A single event")) - if sleep: - await asyncio.sleep(300) - else: - sender._handler._connection._conn.destroy() - await sender.send(EventData(b"A single event")) - await sender.send(EventData(b"A single event")) - finally: - await sender.close() - - received = [] - for r in receivers: - if not sleep: - r._handler._connection._conn.destroy() - received.extend(pump(r)) - assert len(received) == 5 - assert list(received[0].body)[0] == b"A single event" diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py similarity index 97% rename from sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py index aa301bad3119..306e547ac439 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py @@ -255,6 +255,15 @@ async def test_send_with_create_event_batch_async(connstr_receivers): client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) sender = client.create_producer() + event_data_batch = await sender.create_batch(max_size=100000, partition_key="0") + while True: + try: + event_data_batch.try_add(EventData('A single event data')) + except ValueError: + break + + await sender.send(event_data_batch) + event_data_batch = await sender.create_batch(max_size=100000) while True: try: diff --git a/sdk/eventhub/azure-eventhubs/tests/test_auth.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_auth.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/test_auth.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_auth.py diff --git a/sdk/eventhub/azure-eventhubs/tests/test_negative.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_negative.py similarity index 92% rename from sdk/eventhub/azure-eventhubs/tests/test_negative.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_negative.py index e9d3a9e17f8e..a08751e4f606 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_negative.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_negative.py @@ -21,8 +21,7 @@ @pytest.mark.liveTest -def test_send_with_invalid_hostname(invalid_hostname, connstr_receivers): - _, receivers = connstr_receivers +def test_send_with_invalid_hostname(invalid_hostname): client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): @@ -40,8 +39,7 @@ def test_receive_with_invalid_hostname_sync(invalid_hostname): @pytest.mark.liveTest -def test_send_with_invalid_key(invalid_key, connstr_receivers): - _, receivers = connstr_receivers +def test_send_with_invalid_key(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): @@ -60,8 +58,7 @@ def test_receive_with_invalid_key_sync(invalid_key): @pytest.mark.liveTest -def test_send_with_invalid_policy(invalid_policy, connstr_receivers): - _, receivers = connstr_receivers +def test_send_with_invalid_policy(invalid_policy): client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): @@ -226,16 +223,6 @@ def test_create_batch_with_invalid_hostname_sync(invalid_hostname): sender.close() -@pytest.mark.liveTest -def test_create_batch_with_none_sync(connection_str): - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() - batch_event_data = sender.create_batch(max_size=300, partition_key="key") - with pytest.raises(ValueError): - batch_event_data.try_add(EventData(None)) - sender.close() - - @pytest.mark.liveTest def test_create_batch_with_too_large_size_sync(connection_str): client = EventHubClient.from_connection_string(connection_str, network_tracing=False) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_properties.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_properties.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/test_properties.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_properties.py diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_receive.py similarity index 86% rename from sdk/eventhub/azure-eventhubs/tests/test_receive.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_receive.py index d241a8e6e585..9bd1ea38ee03 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_receive.py @@ -214,6 +214,12 @@ def test_receive_batch(connstr_senders): received = receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 + for event in received: + assert event.system_properties + assert event.sequence_number is not None + assert event.offset + assert event.enqueued_time + @pytest.mark.liveTest def test_receive_batch_with_app_prop_sync(connstr_senders): @@ -272,3 +278,37 @@ def test_receive_over_websocket_sync(connstr_senders): received = receiver.receive(max_batch_size=50, timeout=5) assert len(received) == 20 + + +@pytest.mark.liveTest +def test_receive_run_time_metric(connstr_senders): + from uamqp import __version__ as uamqp_version + from distutils.version import StrictVersion + if StrictVersion(uamqp_version) < StrictVersion('1.2.3'): + pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") + connection_str, senders = connstr_senders + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, + network_tracing=False) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", + event_position=EventPosition('@latest'), prefetch=500, + track_last_enqueued_event_properties=True) + + event_list = [] + for i in range(20): + event_list.append(EventData("Event Number {}".format(i))) + + with receiver: + received = receiver.receive(timeout=5) + assert len(received) == 0 + + senders[0].send(event_list) + + time.sleep(1) + + received = receiver.receive(max_batch_size=50, timeout=5) + assert len(received) == 20 + assert receiver.last_enqueued_event_properties + assert receiver.last_enqueued_event_properties.get('sequence_number', None) + assert receiver.last_enqueued_event_properties.get('offset', None) + assert receiver.last_enqueued_event_properties.get('enqueued_time', None) + assert receiver.last_enqueued_event_properties.get('retrieval_time', None) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receiver_iterator.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_receiver_iterator.py similarity index 100% rename from sdk/eventhub/azure-eventhubs/tests/test_receiver_iterator.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_receiver_iterator.py diff --git a/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_reconnect.py similarity index 52% rename from sdk/eventhub/azure-eventhubs/tests/test_reconnect.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_reconnect.py index 0796cee2178d..d9a76238acea 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_reconnect.py @@ -36,34 +36,3 @@ def test_send_with_long_interval_sync(connstr_receivers, sleep): assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" - - -@pytest.mark.liveTest -def test_send_with_forced_conn_close_sync(connstr_receivers, sleep): - pytest.skip("This test is similar to the above one") - connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() - with sender: - sender.send(EventData(b"A single event")) - sender._handler._connection._conn.destroy() - if sleep: - time.sleep(300) - else: - sender._handler._connection._conn.destroy() - sender.send(EventData(b"A single event")) - sender.send(EventData(b"A single event")) - if sleep: - time.sleep(300) - else: - sender._handler._connection._conn.destroy() - sender.send(EventData(b"A single event")) - sender.send(EventData(b"A single event")) - - received = [] - for r in receivers: - if not sleep: - r._handler._connection._conn.destroy() - received.extend(r.receive(timeout=1)) - assert len(received) == 5 - assert list(received[0].body)[0] == b"A single event" diff --git a/sdk/eventhub/azure-eventhubs/tests/test_send.py b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_send.py similarity index 96% rename from sdk/eventhub/azure-eventhubs/tests/test_send.py rename to sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_send.py index 1c34c672d99f..9346932b6115 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/synctests/test_send.py @@ -52,7 +52,7 @@ def test_send_and_receive_large_body_size(connstr_receivers): received = [] for r in receivers: - received.extend(r.receive(timeout=4)) + received.extend(r.receive(timeout=10)) assert len(received) == 1 assert len(list(received[0].body)[0]) == payload @@ -257,6 +257,15 @@ def test_send_with_create_event_batch_sync(connstr_receivers): client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) sender = client.create_producer() + event_data_batch = sender.create_batch(max_size=100000, partition_key="0") + while True: + try: + event_data_batch.try_add(EventData('A single event data')) + except ValueError: + break + + sender.send(event_data_batch) + event_data_batch = sender.create_batch(max_size=100000) while True: try: diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py deleted file mode 100644 index 595c822b9cb7..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py +++ /dev/null @@ -1,64 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import pytest - -from azure.eventhub import EventPosition, EventHubClient - - -@pytest.mark.liveTest -def test_iothub_receive_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - try: - received = receiver.receive(timeout=10) - assert len(received) == 0 - finally: - receiver.close() - - -@pytest.mark.liveTest -def test_iothub_get_properties_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - properties = client.get_properties() - assert properties["partition_ids"] == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -def test_iothub_get_partition_ids_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -def test_iothub_get_partition_properties_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partition_properties = client.get_partition_properties("0") - assert partition_properties["id"] == "0" - - -@pytest.mark.liveTest -def test_iothub_receive_after_mgmt_ops_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') - with receiver: - received = receiver.receive(timeout=10) - assert len(received) == 0 - - -@pytest.mark.liveTest -def test_iothub_mgmt_ops_after_receive_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - with receiver: - received = receiver.receive(timeout=10) - assert len(received) == 0 - - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py deleted file mode 100644 index 9660a79947bd..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py +++ /dev/null @@ -1,24 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import os -import pytest -import time -import uuid - -from uamqp.message import MessageProperties - -from azure.eventhub import EventData, EventHubClient - - -@pytest.mark.liveTest -def test_iothub_send_single_event(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - sender = client.create_producer(operation='/messages/devicebound') - try: - sender.send(EventData(b"A single event", to_device=device_id)) - finally: - sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py deleted file mode 100644 index 5a6e42a827e3..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import argparse -import time -import os -import sys -import threading -import pytest - -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition -from azure.eventhub import EventHubClient -from azure.eventhub import EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - -logger = get_logger("recv_test.log", logging.INFO) - - -def pump(receiver, duration): - total = 0 - iteration = 0 - deadline = time.time() + duration - with receiver: - try: - while time.time() < deadline: - batch = receiver.receive(timeout=5) - size = len(batch) - total += size - iteration += 1 - if size == 0: - print("{}: No events received, queue size {}, delivered {}".format( - receiver._partition, - receiver.queue_size, - total)) - elif iteration >= 5: - iteration = 0 - print("{}: total received {}, last sn={}, last offset={}".format( - receiver._partition, - total, - batch[-1].sequence_number, - batch[-1].offset)) - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("EventHubConsumer failed: {}".format(e)) - raise - - -@pytest.mark.liveTest -def test_long_running_receive(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if args.partitions: - partitions = args.partitions.split(",") - else: - partitions = client.get_partition_ids() - - threads = [] - for pid in partitions: - consumer = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300) - thread = threading.Thread(target=pump, args=(consumer, args.duration)) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - - -if __name__ == '__main__': - test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py deleted file mode 100644 index 93e7e85b4287..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -send test -""" - -import argparse -import time -import os -import sys -import threading -import logging -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventHubClient, EventDataBatch, EventData, EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("send_test.log", logging.INFO) - - -def send(sender, args): - # sender = client.create_producer() - deadline = time.time() + args.duration - total = 0 - try: - with sender: - batch = sender.create_batch() - while time.time() < deadline: - data = EventData(body=b"D" * args.payload) - try: - batch.try_add(data) - total += 1 - except ValueError: - sender.send(batch, timeout=0) - print("Sent total {} of partition {}".format(total, sender._partition)) - batch = sender.create_batch() - except Exception as err: - print("Partition {} send failed {}".format(sender._partition, err)) - raise - print("Sent total {} of partition {}".format(total, sender._partition)) - - -@pytest.mark.liveTest -def test_long_running_send(connection_str): - if sys.platform.startswith('darwin'): - import pytest - pytest.skip("Skipping on OSX") - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--payload", help="payload size", type=int, default=512) - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - try: - partition_ids = client.get_partition_ids() - threads = [] - for pid in partition_ids: - sender = client.create_producer(partition_id=pid) - thread = threading.Thread(target=send, args=(sender, args)) - thread.start() - threads.append(thread) - thread.join() - except KeyboardInterrupt: - pass - - -if __name__ == '__main__': - test_long_running_send(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/unittest/test_event_data.py b/sdk/eventhub/azure-eventhubs/tests/unittest/test_event_data.py new file mode 100644 index 000000000000..0cf07b27c252 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/tests/unittest/test_event_data.py @@ -0,0 +1,43 @@ +import pytest +from azure.eventhub import EventData, EventDataBatch +from uamqp import Message + + +@pytest.mark.parametrize("test_input, expected_result", + [("", ""), ("AAA", "AAA"), (None, ValueError), (["a", "b", "c"], "abc"), (b"abc", "abc")]) +def test_constructor(test_input, expected_result): + if isinstance(expected_result, type): + with pytest.raises(expected_result): + EventData(test_input) + else: + event_data = EventData(test_input) + assert event_data.body_as_str() == expected_result + assert event_data.partition_key is None + assert event_data.application_properties is None + assert event_data.enqueued_time is None + assert event_data.offset is None + assert event_data.sequence_number is None + assert event_data.system_properties == {} + with pytest.raises(TypeError): + event_data.body_as_json() + + +def test_body_json(): + event_data = EventData('{"a":"b"}') + jo = event_data.body_as_json() + assert jo["a"] == "b" + + +def test_app_properties(): + app_props = {"a": "b"} + event_data = EventData("") + event_data.application_properties = app_props + assert event_data.application_properties["a"] == "b" + + +def test_evetn_data_batch(): + batch = EventDataBatch(max_size=100, partition_key="par") + batch.try_add(EventData("A")) + assert batch.size == 89 and len(batch) == 1 + with pytest.raises(ValueError): + batch.try_add(EventData("A")) diff --git a/sdk/eventhub/tests.yml b/sdk/eventhub/tests.yml index 14dfbdb02073..7eebea067f9a 100644 --- a/sdk/eventhub/tests.yml +++ b/sdk/eventhub/tests.yml @@ -11,6 +11,19 @@ jobs: - template: ../../eng/pipelines/templates/jobs/archetype-sdk-tests.yml parameters: ServiceDirectory: eventhub + Matrix: + Linux_Python35: + OSName: 'Linux' + OSVmImage: 'ubuntu-16.04' + PythonVersion: '3.5' + MacOs_Python37: + OSName: 'MacOS' + OSVmImage: 'macOS-10.14' + PythonVersion: '3.7' + Windows_Python27: + OSName: 'Windows' + OSVmImage: 'windows-2019' + PythonVersion: '2.7' EnvVars: AZURE_STORAGE_ACCOUNT: $(python-eh-livetest-event-hub-storage-account) AZURE_STORAGE_ACCESS_KEY: $(python-eh-livetest-event-hub-storage-access-key)