diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index a14da749ee78..4735d1a6d8e9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -18,46 +18,34 @@ def __init__(self): self._client = None self._handler = None self._name = None + self._running = False + self._closed = False def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close(exc_val) + self.close() def _check_closed(self): - if self._error: + if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass - def _redirect(self, redirect): - self._redirected = redirect - self._running = False - self._close_connection() - def _open(self): - """ - Open the EventHubConsumer/EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. + """Open the EventHubConsumer/EventHubProducer using the supplied connection. """ # pylint: disable=protected-access if not self._running: if self._handler: self._handler.close() - if self._redirected: - alt_creds = { - "username": self._client._auth_config.get("iot_username"), - "password": self._client._auth_config.get("iot_password")} - else: - alt_creds = {} self._create_handler() self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access self._client._address.hostname, - self._client._get_auth(**alt_creds) + self._client._create_auth() )) while not self._handler.client_ready(): time.sleep(0.05) @@ -66,7 +54,8 @@ def _open(self): self._running = True def _close_handler(self): - self._handler.close() # close the link (sharing connection) or connection (not sharing) + if self._handler: + self._handler.close() # close the link (sharing connection) or connection (not sharing) self._running = False def _close_connection(self): @@ -76,8 +65,6 @@ def _close_connection(self): def _handle_exception(self, exception): if not self._running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") - return _handle_exception(exception, self) - return _handle_exception(exception, self) def _do_retryable_operation(self, operation, timeout=100000, **kwargs): @@ -102,16 +89,11 @@ def _do_retryable_operation(self, operation, timeout=100000, **kwargs): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - def close(self, exception=None): - # type:(Exception) -> None + def close(self): + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -122,16 +104,8 @@ def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: # type: ignore - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("{} handler is closed.".format(self._name)) if self._handler: self._handler.close() # this will close link if sharing connection. Otherwise close connection + self._running = False + self._closed = True + diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 444edd15a8a1..123c021f7893 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -19,46 +19,35 @@ def __init__(self): self._client = None self._handler = None self._name = None + self._running = False + self._closed = False async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close(exc_val) + await self.close() def _check_closed(self): - if self._error: + if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass - async def _redirect(self, redirect): - self._redirected = redirect - self._running = False - await self._close_connection() - async def _open(self): """ Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. """ # pylint: disable=protected-access if not self._running: if self._handler: await self._handler.close_async() - if self._redirected: - alt_creds = { - "username": self._client._auth_config.get("iot_username"), - "password": self._client._auth_config.get("iot_password")} - else: - alt_creds = {} self._create_handler() await self._handler.open_async(connection=await self._client._conn_manager.get_connection( self._client._address.hostname, - self._client._get_auth(**alt_creds) + self._client._create_auth() )) while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) @@ -67,7 +56,8 @@ async def _open(self): self._running = True async def _close_handler(self): - await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) + if self._handler: + await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) self._running = False async def _close_connection(self): @@ -103,16 +93,11 @@ async def _do_retryable_operation(self, operation, timeout=100000, **kwargs): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -123,18 +108,7 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: #type: ignore - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self._error = ConnectError(str(exception), exception) - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("This receive handler is now closed.") if self._handler: await self._handler.close_async() + self._running = False + self._closed = True diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 578d0f26c059..7656eff50442 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -58,23 +58,19 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() - def _create_auth(self, username=None, password=None): + def _create_auth(self): """ Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate the session. - :param username: The name of the shared access policy. - :type username: str - :param password: The shared access key. - :type password: str """ http_proxy = self._config.http_proxy transport_type = self._config.transport_type auth_timeout = self._config.auth_timeout if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = username or self._auth_config['username'] - password = password or self._auth_config['password'] + username = self._credential.policy + password = self._credential.key if "@sas.root" in username: return authentication.SASLPlain( self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) @@ -117,14 +113,10 @@ async def _try_delay(self, retried_times, last_exception, timeout_time=None, ent raise last_exception async def _management_request(self, mgmt_msg, op_type): - alt_creds = { - "username": self._auth_config.get("iot_username"), - "password": self._auth_config.get("iot_password") - } - retried_times = 0 + last_exception = None while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth(**alt_creds) + mgmt_auth = self._create_auth() mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing) try: conn = await self._conn_manager.get_connection(self._host, mgmt_auth) @@ -142,18 +134,8 @@ async def _management_request(self, mgmt_msg, op_type): retried_times += 1 finally: await mgmt_client.close_async() - - async def _iothub_redirect(self): - async with self._lock: - if self._is_iothub and not self._iothub_redirect_info: - if not self._redirect_consumer: - self._redirect_consumer = self.create_consumer(consumer_group='$default', - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - async with self._redirect_consumer: - await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access - self._redirect_consumer = None + log.info("%r returns an exception %r", self._container_id, last_exception) + raise last_exception async def get_properties(self): # type:() -> Dict[str, Any] @@ -168,8 +150,6 @@ async def get_properties(self): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name}) response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub') output = {} @@ -209,8 +189,6 @@ async def get_partition_properties(self, partition): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name, 'partition': partition}) response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition') @@ -246,9 +224,6 @@ def create_consumer( :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. :type owner_level: int - :param operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :type track_last_enqueued_event_properties: bool @@ -272,14 +247,12 @@ def create_consumer( """ owner_level = kwargs.get("owner_level") - operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) loop = kwargs.get("loop") - path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self._address.hostname, path, consumer_group, partition_id) + self._address.hostname, self._address.path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, prefetch=prefetch, @@ -289,7 +262,6 @@ def create_consumer( def create_producer( self, *, partition_id: str = None, - operation: str = None, send_timeout: float = None, loop: asyncio.AbstractEventLoop = None ) -> EventHubProducer: @@ -300,9 +272,6 @@ def create_producer( If omitted, the events will be distributed to available partitions via round-robin. :type partition_id: str - :param operation: An optional operation to be appended to the hostname in the target URL. - The value must start with `/` character. - :type operation: str :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float @@ -320,8 +289,6 @@ def create_producer( """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) - if operation: - target = target + operation send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 4ae4a3b33b72..9d576ac378df 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -5,9 +5,10 @@ import asyncio import uuid import logging -from typing import List +from typing import List, Any import time +import uamqp # type: ignore from uamqp import errors, types, utils # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore @@ -76,7 +77,6 @@ def __init__( # pylint: disable=super-init-not-called super(EventHubConsumer, self).__init__() self._loop = loop or asyncio.get_event_loop() - self._running = False self._client = client self._source = source self._offset = event_position @@ -87,8 +87,6 @@ def __init__( # pylint: disable=super-init-not-called self._auto_reconnect = auto_reconnect self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 - self._redirected = None - self._error = None self._link_properties = {} partition = self._source.split('/')[-1] self._partition = partition @@ -129,23 +127,21 @@ async def __anext__(self): raise last_exception def _create_handler(self): - alt_creds = { - "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access - "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access - } - source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access - desired_capabilities = None - if self._track_last_enqueued_event_properties: - symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] - desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + if uamqp.__version__ <= "1.2.2": # backward compatible until uamqp 1.2.3 is released + desired_capabilities = {} + elif self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = {"desired_capabilities": utils.data_factory(types.AMQPArray(symbol_array))} + else: + desired_capabilities = {"desired_capabilities": None} self._handler = ReceiveClientAsync( source, - auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, @@ -155,29 +151,10 @@ def _create_handler(self): client_name=self._name, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access - desired_capabilities=desired_capabilities, # pylint:disable=protected-access + **desired_capabilities, # pylint:disable=protected-access loop=self._loop) self._messages_iter = None - async def _redirect(self, redirect): - self._messages_iter = None - await super(EventHubConsumer, self)._redirect(redirect) - - async def _open(self): - """ - Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - # pylint: disable=protected-access - self._redirected = self._redirected or self._client._iothub_redirect_info - - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) - self._source = self._redirected.address - await super(EventHubConsumer, self)._open() - async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) @@ -242,7 +219,7 @@ def queue_size(self): return 0 async def receive(self, *, max_batch_size=None, timeout=None): - # type: (int, float) -> List[EventData] + # type: (Any, int, float) -> List[EventData] """ Receive events asynchronously from the EventHub. @@ -275,16 +252,11 @@ async def receive(self, *, max_batch_size=None, timeout=None): return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -295,18 +267,4 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self._error = ConnectError(str(exception), exception) - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("This receive handler is now closed.") - if self._handler: - await self._handler.close_async() + await super(EventHubConsumer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py index ae1cd8084f3d..7bbc3b6153c1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py @@ -68,11 +68,6 @@ async def _handle_exception(exception, closable): # pylint:disable=too-many-bra if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): await closable._close_connection() # pylint:disable=protected-access - elif isinstance(exception, errors.LinkRedirect): - log.info("%r link redirect received. Redirecting...", name) - redirect = exception - if hasattr(closable, "_redirect"): - await closable._redirect(redirect) # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): await closable._close_handler() # pylint:disable=protected-access diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 8ef299b0e6da..7a6c28156dd2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -66,7 +66,6 @@ def __init__( # pylint: disable=super-init-not-called super(EventHubProducer, self).__init__() self._loop = loop or asyncio.get_event_loop() self._max_message_size_on_link = None - self._running = False self._client = client self._target = target self._partition = partition @@ -77,7 +76,6 @@ def __init__( # pylint: disable=super-init-not-called self._reconnect_backoff = 1 self._name = "EHProducer-{}".format(uuid.uuid4()) self._unsent_events = None - self._redirected = None self._error = None if partition: self._target += "/Partitions/" + partition @@ -90,7 +88,7 @@ def __init__( # pylint: disable=super-init-not-called def _create_handler(self): self._handler = SendClientAsync( self._target, - auth=self._client._get_auth(), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, @@ -101,18 +99,6 @@ def _create_handler(self): self._client._config.user_agent), # pylint:disable=protected-access loop=self._loop) - async def _open(self): - """ - Open the EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access - self._target = self._redirected.address - await super(EventHubProducer, self)._open() - async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) @@ -250,16 +236,11 @@ async def send( else: await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -270,4 +251,4 @@ async def close(self, exception=None): :caption: Close down the handler. """ - await super(EventHubProducer, self).close(exception) + await super(EventHubProducer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index b3ae901c3b55..0b8a95cb22f3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -58,7 +58,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def _create_auth(self, username=None, password=None): + def _create_auth(self): """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -74,8 +74,8 @@ def _create_auth(self, username=None, password=None): # TODO: the following code can be refactored to create auth from classes directly instead of using if-else if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = username or self._auth_config['username'] - password = password or self._auth_config['password'] + username = self._credential.policy + password = self._credential.key if "@sas.root" in username: return authentication.SASLPlain( self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) @@ -119,14 +119,10 @@ def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_na raise last_exception def _management_request(self, mgmt_msg, op_type): - alt_creds = { - "username": self._auth_config.get("iot_username"), - "password": self._auth_config.get("iot_password") - } - retried_times = 0 + last_exception = None while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth(**alt_creds) + mgmt_auth = self._create_auth() mgmt_client = uamqp.AMQPClient(self._mgmt_target) try: conn = self._conn_manager.get_connection(self._host, mgmt_auth) #pylint:disable=assignment-from-none @@ -144,18 +140,9 @@ def _management_request(self, mgmt_msg, op_type): retried_times += 1 finally: mgmt_client.close() + log.info("%r returns an exception %r", self._container_id, last_exception) + raise last_exception - def _iothub_redirect(self): - with self._lock: - if self._is_iothub and not self._iothub_redirect_info: - if not self._redirect_consumer: - self._redirect_consumer = self.create_consumer(consumer_group='$default', - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - with self._redirect_consumer: - self._redirect_consumer._open_with_retry() # pylint: disable=protected-access - self._redirect_consumer = None def get_properties(self): # type:() -> Dict[str, Any] @@ -170,8 +157,6 @@ def get_properties(self): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name}) response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub') output = {} @@ -211,8 +196,6 @@ def get_partition_properties(self, partition): :rtype: dict :raises: ~azure.eventhub.ConnectError """ - if self._is_iothub and not self._iothub_redirect_info: - self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name, 'partition': partition}) response = self._management_request(mgmt_msg, op_type=b'com.microsoft:partition') @@ -244,9 +227,6 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. :type owner_level: int - :param operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :type track_last_enqueued_event_properties: bool @@ -269,21 +249,19 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs """ owner_level = kwargs.get("owner_level") - operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) - path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self._address.hostname, path, consumer_group, partition_id) + self._address.hostname, self._address.path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, prefetch=prefetch, track_last_enqueued_event_properties=track_last_enqueued_event_properties) return handler - def create_producer(self, partition_id=None, operation=None, send_timeout=None): - # type: (str, str, float) -> EventHubProducer + def create_producer(self, partition_id=None, send_timeout=None): + # type: (str, float) -> EventHubProducer """ Create an producer to send EventData object to an EventHub. @@ -310,8 +288,6 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None): """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) - if operation: - target = target + operation send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 62ea791a5894..25376b1b4334 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -6,13 +6,14 @@ import logging import sys +import platform import uuid import time -import functools from abc import abstractmethod -from typing import Dict, Union, Any, TYPE_CHECKING +from typing import Union, Any, TYPE_CHECKING -from azure.eventhub import __version__, EventPosition +from uamqp import types +from azure.eventhub import __version__ from azure.eventhub.configuration import _Configuration from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address @@ -139,53 +140,18 @@ def __init__(self, host, event_hub_path, credential, **kwargs): self._address = _Address() self._address.hostname = host self._address.path = "/" + event_hub_path if event_hub_path else "" - self._auth_config = {} # type:Dict[str,str] self._credential = credential - if isinstance(credential, EventHubSharedKeyCredential): - self._username = credential.policy - self._password = credential.key - self._auth_config['username'] = self._username - self._auth_config['password'] = self._password - self._keep_alive = kwargs.get("keep_alive", 30) self._auto_reconnect = kwargs.get("auto_reconnect", True) self._mgmt_target = "amqps://{}/{}".format(self._host, self.eh_name) self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) - self._get_auth = functools.partial(self._create_auth) self._config = _Configuration(**kwargs) self._debug = self._config.network_tracing - self._is_iothub = False - self._iothub_redirect_info = None - self._redirect_consumer = None log.info("%r: Created the Event Hub client", self._container_id) - @classmethod - def _from_iothub_connection_string(cls, conn_str, **kwargs): - address, policy, key, _ = _parse_conn_str(conn_str) - hub_name = address.split('.')[0] - username = "{}@sas.root.{}".format(policy, hub_name) - password = _generate_sas_token(address, policy, key) - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - client = cls(host, "", EventHubSharedKeyCredential(username, password), **kwargs) - client._auth_config = { # pylint: disable=protected-access - 'iot_username': policy, - 'iot_password': key, - 'username': username, - 'password': password} - client._is_iothub = True # pylint: disable=protected-access - client._redirect_consumer = client.create_consumer(consumer_group='$default', # pylint: disable=protected-access, no-member - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - return client - @abstractmethod - def _create_auth(self, username=None, password=None): + def _create_auth(self): pass def _create_properties(self, user_agent=None): # pylint: disable=no-self-use @@ -196,13 +162,15 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use :rtype: dict """ properties = {} - properties["product"] = "eventhub.python" - properties["version"] = __version__ - properties["framework"] = "Python {}.{}.{}".format(*sys.version_info[0:3]) - properties["platform"] = sys.platform - - final_user_agent = 'azsdk-python-eventhub/{} ({}; {})'.format( - __version__, properties["framework"], sys.platform) + product = "azure-eventhub" + properties[types.AMQPSymbol("product")] = product + properties[types.AMQPSymbol("version")] = __version__ + framework = "Python {}.{}.{}, {}".format(*sys.version_info[0:3], platform.python_implementation()) + properties[types.AMQPSymbol("framework")] = framework + platform_str = platform.platform() + properties[types.AMQPSymbol("platform")] = platform_str + + final_user_agent = '{}/{} ({}, {})'.format(product, __version__, framework, platform_str) if user_agent: final_user_agent = '{}, {}'.format(final_user_agent, user_agent) @@ -210,8 +178,7 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use raise ValueError("The user-agent string cannot be more than {} in length." "Current user_agent string is: {} with length: {}".format( MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent))) - - properties["user-agent"] = final_user_agent + properties[types.AMQPSymbol("user-agent")] = final_user_agent return properties def _add_span_request_attributes(self, span): @@ -219,22 +186,11 @@ def _add_span_request_attributes(self, span): span.add_attribute("message_bus.destination", self._address.path) span.add_attribute("peer.address", self._address.hostname) - def _process_redirect_uri(self, redirect): - redirect_uri = redirect.address.decode('utf-8') - auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") - self._address = urlparse(auth_uri) - self._host = self._address.hostname - self.eh_name = self._address.path.lstrip('/') - self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) - self._mgmt_target = redirect_uri - if self._is_iothub: - self._iothub_redirect_info = redirect - @classmethod def from_connection_string(cls, conn_str, **kwargs): - """Create an EventHubClient from an EventHub/IotHub connection string. + """Create an EventHubClient from an EventHub connection string. - :param conn_str: The connection string of an eventhub or IoT hub + :param conn_str: The connection string of an eventhub :type conn_str: str :param event_hub_path: The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string. @@ -279,15 +235,11 @@ def from_connection_string(cls, conn_str, **kwargs): """ event_hub_path = kwargs.pop("event_hub_path", None) - is_iot_conn_str = conn_str.lstrip().lower().startswith("hostname") - if not is_iot_conn_str: # pylint:disable=no-else-return - address, policy, key, entity = _parse_conn_str(conn_str) - entity = event_hub_path or entity - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) + address, policy, key, entity = _parse_conn_str(conn_str) + entity = event_hub_path or entity + left_slash_pos = address.find("//") + if left_slash_pos != -1: + host = address[left_slash_pos + 2:] else: - return cls._from_iothub_connection_string(conn_str, **kwargs) + host = address + return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 6a4ce46e75b9..5ebb5a430b23 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -58,20 +58,17 @@ class EventData(object): PROP_PARTITION_KEY = b"x-opt-partition-key" PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY) PROP_TIMESTAMP = b"x-opt-enqueued-time" - PROP_DEVICE_ID = b"iothub-connection-device-id" PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number" PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset" PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc" PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc" - def __init__(self, body=None, to_device=None): + def __init__(self, body=None): """ Initialize EventData. :param body: The data to send in a single message. :type body: str, bytes or list - :param to_device: An IoT device to route to. - :type to_device: str """ self._annotations = {} @@ -79,8 +76,6 @@ def __init__(self, body=None, to_device=None): self._app_properties = {} self._msg_properties = MessageProperties() self._runtime_info = {} - if to_device: - self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) if body and isinstance(body, list): self.message = Message(body[0], properties=self._msg_properties) for more in body[1:]: @@ -102,8 +97,6 @@ def __str__(self): dic['offset'] = str(self.offset) if self.enqueued_time: dic['enqueued_time'] = str(self.enqueued_time) - if self.device_id: - dic['device_id'] = str(self.device_id) if self.partition_key: dic['partition_key'] = str(self.partition_key) return str(dic) @@ -154,7 +147,6 @@ def _trace_link_message(self, parent_span=None): @staticmethod def _from_message(message): - # pylint:disable=protected-access event_data = EventData(body='') event_data.message = message event_data._msg_properties = message.properties @@ -207,16 +199,6 @@ def enqueued_time(self): return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None - @property - def device_id(self): - """ - The device ID of the event data object. This is only used for - IoT Hub implementations. - - :rtype: bytes - """ - return self._annotations.get(EventData.PROP_DEVICE_ID, None) - @property def partition_key(self): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 4c6f2eca58a9..3f4e11d982c9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -9,6 +9,7 @@ import time from typing import List +import uamqp # type: ignore from uamqp import types, errors, utils # type: ignore from uamqp import ReceiveClient, Source # type: ignore @@ -72,7 +73,6 @@ def __init__(self, client, source, **kwargs): track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) super(EventHubConsumer, self).__init__() - self._running = False self._client = client self._source = source self._offset = event_position @@ -84,7 +84,6 @@ def __init__(self, client, source, **kwargs): self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 self._link_properties = {} - self._redirected = None self._error = None partition = self._source.split('/')[-1] self._partition = partition @@ -125,23 +124,21 @@ def __next__(self): raise last_exception def _create_handler(self): - alt_creds = { - "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access - "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access - } - source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access - desired_capabilities = None - if self._track_last_enqueued_event_properties: - symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] - desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + if uamqp.__version__ <= "1.2.2": # backward compatible until uamqp 1.2.3 is released + desired_capabilities = {} + elif self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = {"desired_capabilities": utils.data_factory(types.AMQPArray(symbol_array))} + else: + desired_capabilities = {"desired_capabilities": None} self._handler = ReceiveClient( source, - auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, @@ -151,27 +148,8 @@ def _create_handler(self): client_name=self._name, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access - desired_capabilities=desired_capabilities) # pylint:disable=protected-access - self._messages_iter = None - - def _redirect(self, redirect): + **desired_capabilities) # pylint:disable=protected-access self._messages_iter = None - super(EventHubConsumer, self)._redirect(redirect) - - def _open(self): - """ - Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - # pylint: disable=protected-access - self._redirected = self._redirected or self._client._iothub_redirect_info - - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) - self._source = self._redirected.address - super(EventHubConsumer, self)._open() def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) @@ -269,16 +247,11 @@ def receive(self, max_batch_size=None, timeout=None): return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - def close(self, exception=None): - # type:(Exception) -> None + def close(self): + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -289,9 +262,6 @@ def close(self, exception=None): :caption: Close down the handler. """ - if self._messages_iter: - self._messages_iter.close() - self._messages_iter = None - super(EventHubConsumer, self).close(exception) + super(EventHubConsumer, self).close() next = __next__ # for python2.7 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 129cf14a3842..755925bca743 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -187,11 +187,6 @@ def _handle_exception(exception, closable): # pylint:disable=too-many-branches, if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access - elif isinstance(exception, errors.LinkRedirect): - log.info("%r link redirect received. Redirecting...", name) - redirect = exception - if hasattr(closable, "_redirect"): - closable._redirect(redirect) # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): closable._close_handler() # pylint:disable=protected-access diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 6e562bbdf051..6364f271dd5a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -82,12 +82,10 @@ def __init__(self, client, target, **kwargs): super(EventHubProducer, self).__init__() self._max_message_size_on_link = None - self._running = False self._client = client self._target = target self._partition = partition self._timeout = send_timeout - self._redirected = None self._error = None self._keep_alive = keep_alive self._auto_reconnect = auto_reconnect @@ -106,7 +104,7 @@ def __init__(self, client, target, **kwargs): def _create_handler(self): self._handler = SendClient( self._target, - auth=self._client._get_auth(), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, @@ -115,18 +113,6 @@ def _create_handler(self): link_properties=self._link_properties, properties=self._client._create_properties(self._client._config.user_agent)) # pylint: disable=protected-access - def _open(self): - """ - Open the EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access - self._target = self._redirected.address - super(EventHubProducer, self)._open() - def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) @@ -261,16 +247,11 @@ def send(self, event_data, partition_key=None, timeout=None): else: self._send_event_data_with_retry(timeout=timeout) - def close(self, exception=None): # pylint:disable=useless-super-delegation - # type:(Exception) -> None + def close(self): # pylint:disable=useless-super-delegation + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -281,4 +262,4 @@ def close(self, exception=None): # pylint:disable=useless-super-delegation :caption: Close down the handler. """ - super(EventHubProducer, self).close(exception) + super(EventHubProducer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/conftest.py b/sdk/eventhub/azure-eventhubs/conftest.py index ed0212e85562..78174fc2699b 100644 --- a/sdk/eventhub/azure-eventhubs/conftest.py +++ b/sdk/eventhub/azure-eventhubs/conftest.py @@ -152,22 +152,6 @@ def invalid_policy(live_eventhub_config): live_eventhub_config['event_hub']) -@pytest.fixture() -def iot_connection_str(): - try: - return os.environ['IOTHUB_CONNECTION_STR'] - except KeyError: - pytest.skip("No IotHub connection string found.") - - -@pytest.fixture() -def device_id(): - try: - return os.environ['IOTHUB_DEVICE'] - except KeyError: - pytest.skip("No Iothub device ID found.") - - @pytest.fixture() def aad_credential(): try: diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py b/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py deleted file mode 100644 index ecc935669d13..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an IoT Hub partition. -""" -import os - -from azure.eventhub import EventHubClient, EventPosition - -iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - -client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) -consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), - operation='/messages/events') - -with consumer: - received = consumer.receive(timeout=5) - print(received) diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py b/sdk/eventhub/azure-eventhubs/examples/iothub_send.py deleted file mode 100644 index c2f8f3379259..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an IoT Hub partition. -""" -import os -from azure.eventhub import EventData, EventHubClient - -iot_device_id = os.environ['IOTHUB_DEVICE'] -iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - -client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) -producer = client.create_producer(operation='/messages/devicebound') -with producer: - producer.send(EventData(b"A single event", to_device=iot_device_id)) diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index 52145f9222f0..8ff334ef971f 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -27,16 +27,6 @@ def create_eventhub_client(live_eventhub_config): return client -def create_eventhub_client_from_iothub_connection_string(live_eventhub_config): - # [START create_eventhub_client_iot_connstr] - import os - from azure.eventhub import EventHubClient - - iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - client = EventHubClient.from_connection_string(iot_connection_str) - # [END create_eventhub_client_iot_connstr] - - def test_example_eventhub_sync_send_and_receive(live_eventhub_config): # [START create_eventhub_client_connstr] import os diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py new file mode 100644 index 000000000000..59b8fe76fec8 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -0,0 +1,266 @@ +import sys +import os +import logging +import threading +import time +import asyncio +from logging.handlers import RotatingFileHandler +from argparse import ArgumentParser +from azure.eventhub import EventHubClient, EventPosition, EventData, \ + EventHubConsumer, EventHubProducer, EventHubSharedKeyCredential, EventDataBatch +from azure.eventhub.aio import EventHubClient as EventHubClientAsync +from azure.identity import ClientSecretCredential + + +def stress_receive_sync(receiver, args, logger): + batch = receiver.receive(timeout=5) + return len(batch) + + +async def stress_receive_async(receiver, args, logger): + batch = await receiver.receive(timeout=5) + return len(batch) + + +def stress_receive_iterator_sync(receiver, args, logger): + duration = args.duration + deadline = time.time() + duration + total_count = 0 + logging_count = 0 + try: + for _ in receiver: + total_count += 1 + logging_count += 1 + if logging_count >= args.output_interval: + logger.info("Partition:%r, received:%r", receiver._partition, total_count) + logging_count -= args.output_interval + if time.time() > deadline: + break + finally: + return total_count + + +async def stress_receive_iterator_async(receiver, args, logger): + duration = args.duration + deadline = time.time() + duration + total_count = 0 + logging_count = 0 + try: + async for _ in receiver: + total_count += 1 + logging_count += 1 + if logging_count >= args.output_interval: + logger.info("Partition:%r, received:%r", receiver._partition, total_count) + logging_count -= args.output_interval + if time.time() > deadline: + break + finally: + return total_count + + +def stress_send_sync(producer: EventHubProducer, args, logger): + batch = producer.create_batch() + try: + while True: + event_data = EventData(body=b"D" * args.payload) + batch.try_add(event_data) + except ValueError: + producer.send(batch) + return len(batch) + + +async def stress_send_async(producer, args, logger): + batch = await producer.create_batch() + try: + while True: + event_data = EventData(body=b"D" * args.payload) + batch.try_add(event_data) + except ValueError: + await producer.send(batch) + return len(batch) + + +def get_logger(filename, method_name, level=logging.INFO, print_console=False): + stress_logger = logging.getLogger(method_name) + stress_logger.setLevel(level) + azure_logger = logging.getLogger("azure.eventhub") + azure_logger.setLevel(level) + uamqp_logger = logging.getLogger("uamqp") + uamqp_logger.setLevel(level) + + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + if print_console: + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not azure_logger.handlers: + azure_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + if not stress_logger.handlers: + stress_logger.addHandler(console_handler) + + if filename: + file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) + file_handler.setFormatter(formatter) + azure_logger.addHandler(file_handler) + uamqp_logger.addHandler(file_handler) + stress_logger.addHandler(file_handler) + + return stress_logger + + +class StressTestRunner(object): + def __init__(self, argument_parser): + self.argument_parser = argument_parser + self.argument_parser.add_argument("-m", "--method", required=True) + self.argument_parser.add_argument("--output_interval", type=float, default=1000) + self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + self.argument_parser.add_argument("--consumer", help="Consumer group name", default="$default") + self.argument_parser.add_argument("--offset", help="Starting offset", default="-1") + self.argument_parser.add_argument("-p", "--partitions", help="Comma seperated partition IDs", default="0") + self.argument_parser.add_argument("--conn-str", help="EventHub connection string", + default=os.environ.get('EVENT_HUB_PERF_CONN_STR')) + self.argument_parser.add_argument("--eventhub", help="Name of EventHub") + self.argument_parser.add_argument("--address", help="Address URI to the EventHub entity") + self.argument_parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + self.argument_parser.add_argument("--sas-key", help="Shared access key") + self.argument_parser.add_argument("--aad_client_id", help="AAD client id") + self.argument_parser.add_argument("--aad_secret", help="AAD secret") + self.argument_parser.add_argument("--aad_tenant_id", help="AAD tenant id") + self.argument_parser.add_argument("--payload", help="payload size", type=int, default=1024) + self.argument_parser.add_argument("--print_console", help="print to console", type=bool, default=False) + self.args, _ = parser.parse_known_args() + + self.running = False + + def create_client(self, client_class): + if self.args.conn_str: + client = client_class.from_connection_string( + self.args.conn_str, + event_hub_path=self.args.eventhub, network_tracing=False) + elif self.args.address: + client = client_class(host=self.args.address, + event_hub_path=self.args.eventhub, + credential=EventHubSharedKeyCredential(self.args.sas_policy, self.args.sas_key), + auth_timeout=240, + network_tracing=False) + elif self.args.aad_client_id: + client = client_class(host=self.args.address, + event_hub_path=self.args.eventhub, + credential=ClientSecretCredential( + self.args.aad_client_id, self.args.aad_secret, self.args.tenant_id + ), + network_tracing=False) + else: + raise ValueError("Argument error. Must have one of connection string, sas and aad credentials") + + return client + + def run(self): + method_name = self.args.method + if "async" in method_name: + loop = asyncio.get_event_loop() + loop.run_until_complete(self.run_async()) + else: + self.run_sync() + + def run_sync(self): + method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, + level=logging.INFO, print_console=self.args.print_console) + test_method = globals()[method_name] + client = self.create_client(EventHubClient) + self.running = True + if self.args.partitions.lower() != "all": + partitions = self.args.partitions.split(",") + else: + partitions = client.get_partition_ids() + threads = [] + for pid in partitions: + if "receive" in method_name: + worker = client.create_consumer(consumer_group=self.args.consumer, + partition_id=pid, + event_position=EventPosition(self.args.offset), + prefetch=300) + else: # "send" in method_name + worker = client.create_producer(partition_id=pid) + thread = threading.Thread(target=self.run_test_method, args=(test_method, worker, logger)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + + def stop(self): + self.running = False + + def run_test_method(self, test_method, worker, logger): + deadline = time.time() + self.args.duration + with worker: + total_processed = 0 + iter_processed = 0 + while self.running and time.time() < deadline: + try: + processed = test_method(worker, self.args, logger) + total_processed += processed + iter_processed += processed + if iter_processed >= self.args.output_interval: + logger.info("Partition:%r, Total processed: %r", worker._partition, total_processed) + iter_processed -= self.args.output_interval + except KeyboardInterrupt: + logger.info("Partition:%r, keyboard interrupted", worker._partition) + self.stop() + except Exception as e: + logger.exception("Partition:%r, %r failed:", worker._partition, type(worker)) + self.stop() + logger.info("Partition:%r, %r has finished testing", worker._partition, test_method) + + async def run_async(self): + method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, + level=logging.INFO, print_console=self.args.print_console) + test_method = globals()[method_name] + client = self.create_client(EventHubClientAsync) + self.running = True + if self.args.partitions.lower() != "all": + partitions = self.args.partitions.split(",") + else: + partitions = await client.get_partition_ids() + tasks = [] + for pid in partitions: + if "receive" in method_name: + worker = client.create_consumer(consumer_group=self.args.consumer, + partition_id=pid, + event_position=EventPosition(self.args.offset), + prefetch=300) + else: # "send" in method_name + worker = client.create_producer(partition_id=pid) + task = self.run_test_method_async(test_method, worker, logger) + tasks.append(task) + await asyncio.gather(*tasks) + + async def run_test_method_async(self, test_method, worker, logger): + deadline = time.time() + self.args.duration + async with worker: + total_processed = 0 + iter_processed = 0 + while self.running and time.time() < deadline: + try: + processed = await test_method(worker, self.args, logger) + total_processed += processed + iter_processed += processed + if iter_processed >= self.args.output_interval: + logger.info("Partition:%r, Total processed: %r", worker._partition, total_processed) + iter_processed -= self.args.output_interval + except KeyboardInterrupt: + logger.info("Partition:%r, keyboard interrupted", worker._partition) + self.stop() + except Exception as e: + logger.exception("Partition:%r, %r failed:", worker._partition, type(worker)) + self.stop() + logger.info("Partition:%r, %r has finished testing", worker._partition, test_method) + + +if __name__ == '__main__': + parser = ArgumentParser() + runner = StressTestRunner(parser) + runner.run() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py deleted file mode 100644 index 4ac63eef5d7f..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py +++ /dev/null @@ -1,85 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import asyncio -import pytest - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventPosition - - -async def pump(receiver, sleep=None): - messages = 0 - if sleep: - await asyncio.sleep(sleep) - async with receiver: - batch = await receiver.receive(timeout=10) - messages += len(batch) - return messages - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_receive_multiple_async(iot_connection_str): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - receivers = [] - for p in partitions: - receivers.append(client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1"), prefetch=10, operation='/messages/events')) - outputs = await asyncio.gather(*[pump(r) for r in receivers]) - - assert isinstance(outputs[0], int) and outputs[0] <= 10 - assert isinstance(outputs[1], int) and outputs[1] <= 10 - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_properties_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - properties = await client.get_properties() - assert properties["partition_ids"] == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_partition_ids_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_partition_properties_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partition_properties = await client.get_partition_properties("0") - assert partition_properties["id"] == "0" - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_receive_after_mgmt_ops_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') - async with receiver: - received = await receiver.receive(timeout=10) - assert len(received) == 0 - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_mgmt_ops_after_receive_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - async with receiver: - received = await receiver.receive(timeout=10) - assert len(received) == 0 - - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py deleted file mode 100644 index 1e3cae9eefa7..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py +++ /dev/null @@ -1,156 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import logging -import asyncio -import sys -import os -import argparse -import time -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub.aio import EventHubClient -from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager -from azure.eventhub import EventData - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub.eventprocessor") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("eph_test_async.log", logging.INFO) - - -class MyEventProcessor(PartitionProcessor): - async def close(self, reason, partition_context): - logger.info("PartitionProcessor closed (reason {}, id {})".format( - reason, - partition_context.partition_id - )) - - async def process_events(self, events, partition_context): - if events: - event = events[-1] - print("Processing id {}, offset {}, sq_number {})".format( - partition_context.partition_id, - event.offset, - event.sequence_number)) - await partition_context.update_checkpoint(event.offset, event.sequence_number) - - async def process_error(self, error, partition_context): - logger.info("Event Processor Error for partition {}, {!r}".format(partition_context.partition_id, error)) - - -async def wait_and_close(host, duration): - """ - Run EventProcessorHost for 30 seconds then shutdown. - """ - await asyncio.sleep(duration) - await host.stop() - - -async def pump(pid, sender, duration): - deadline = time.time() + duration - total = 0 - - try: - async with sender: - event_list = [] - while time.time() < deadline: - data = EventData(body=b"D" * 512) - event_list.append(data) - total += 1 - if total % 100 == 0: - await sender.send(event_list) - event_list = [] - logger.info("{}: Send total {}".format(pid, total)) - except Exception as err: - logger.error("{}: Send failed {}".format(pid, err)) - raise - print("{}: Final Sent total {}".format(pid, total)) - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_eph(live_eventhub): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--container", help="Lease container name", default="nocontextleases") - parser.add_argument("--eventhub", help="Name of EventHub", default=live_eventhub['event_hub']) - parser.add_argument("--namespace", help="Namespace of EventHub", default=live_eventhub['namespace']) - parser.add_argument("--suffix", help="Namespace of EventHub", default="servicebus.windows.net") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with", default=live_eventhub['key_name']) - parser.add_argument("--sas-key", help="Shared access key", default=live_eventhub['access_key']) - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if not args.namespace or not args.eventhub: - try: - import pytest - pytest.skip("Must specify '--namespace' and '--eventhub'") - except ImportError: - raise ValueError("Must specify '--namespace' and '--eventhub'") - - # Queue up some events in the Eventhub - conn_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( - live_eventhub['hostname'], - live_eventhub['key_name'], - live_eventhub['access_key'], - live_eventhub['event_hub']) - client = EventHubClient.from_connection_string(conn_str) - pumps = [] - for pid in ["0", "1"]: - sender = client.create_producer(partition_id=pid, send_timeout=0) - pumps.append(pump(pid, sender, 15)) - results = await asyncio.gather(*pumps, return_exceptions=True) - - assert not any(results) - - # Event loop and host - host = EventProcessor( - client, - live_eventhub['consumer_group'], - MyEventProcessor, - SamplePartitionManager() - ) - - tasks = asyncio.gather( - host.start(), - wait_and_close(host, args.duration), return_exceptions=True) - results = await tasks - assert not any(results) - - -if __name__ == '__main__': - config = {} - config['hostname'] = os.environ['EVENT_HUB_HOSTNAME'] - config['event_hub'] = os.environ['EVENT_HUB_NAME'] - config['key_name'] = os.environ['EVENT_HUB_SAS_POLICY'] - config['access_key'] = os.environ['EVENT_HUB_SAS_KEY'] - config['namespace'] = os.environ['EVENT_HUB_NAMESPACE'] - config['consumer_group'] = "$Default" - config['partition'] = "0" - loop = asyncio.get_event_loop() - loop.run_until_complete(test_long_running_eph(config)) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py deleted file mode 100644 index 50ababacf738..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import asyncio -import argparse -import time -import os -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("recv_test_async.log", logging.INFO) - - -async def pump(_pid, receiver, _args, _dl): - total = 0 - iteration = 0 - deadline = time.time() + _dl - - try: - async with receiver: - while time.time() < deadline: - batch = await receiver.receive(timeout=3) - size = len(batch) - total += size - iteration += 1 - if size == 0: - print("{}: No events received, queue size {}, delivered {}".format( - _pid, - receiver.queue_size, - total)) - elif iteration >= 5: - iteration = 0 - print("{}: total received {}, last sn={}, last offset={}".format( - _pid, - total, - batch[-1].sequence_number, - batch[-1].offset)) - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("Partition {} receiver failed: {}".format(_pid, e)) - raise - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_receive_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, auth_timeout=240, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if not args.partitions: - partitions = await client.get_partition_ids() - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - receiver = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300, - loop=loop) - pumps.append(pump(pid, receiver, args, args.duration)) - await asyncio.gather(*pumps) - - -if __name__ == '__main__': - asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py deleted file mode 100644 index 00279d168d70..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python - -""" -send test -""" - -import logging -import argparse -import time -import os -import asyncio -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventData, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("send_test_async.log", logging.INFO) - - -async def get_partitions(args): - eh_data = await args.get_properties() - return eh_data["partition_ids"] - - -async def pump(pid, sender, args, duration): - deadline = time.time() + duration - total = 0 - - try: - async with sender: - event_list = [] - while time.time() < deadline: - data = EventData(body=b"D" * args.payload) - event_list.append(data) - total += 1 - if total % 100 == 0: - await sender.send(event_list) - event_list = [] - logger.info("{}: Send total {}".format(pid, total)) - except Exception as err: - logger.error("{}: Send failed {}".format(pid, err)) - raise - print("{}: Final Sent total {}".format(pid, total)) - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_partition_send_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--payload", help="payload size", type=int, default=1024) - parser.add_argument("--partitions", help="Comma separated partition IDs") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - parser.add_argument("--logger-name", help="Unique log file ID") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - try: - if not args.partitions: - partitions = await client.get_partition_ids() - else: - pid_range = args.partitions.split("-") - if len(pid_range) > 1: - partitions = [str(i) for i in range(int(pid_range[0]), int(pid_range[1]) + 1)] - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - sender = client.create_producer(partition_id=pid, send_timeout=0) - pumps.append(pump(pid, sender, args, args.duration)) - results = await asyncio.gather(*pumps, return_exceptions=True) - assert not results - except Exception as e: - logger.error("EventHubProducer failed: {}".format(e)) - - -if __name__ == '__main__': - asyncio.run(test_long_running_partition_send_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index a9744f259c8c..03987fbf3b77 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -319,6 +319,7 @@ async def test_receive_over_websocket_async(connstr_senders): @pytest.mark.asyncio @pytest.mark.liveTest async def test_receive_run_time_metric_async(connstr_senders): + pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py deleted file mode 100644 index 595c822b9cb7..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py +++ /dev/null @@ -1,64 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import pytest - -from azure.eventhub import EventPosition, EventHubClient - - -@pytest.mark.liveTest -def test_iothub_receive_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - try: - received = receiver.receive(timeout=10) - assert len(received) == 0 - finally: - receiver.close() - - -@pytest.mark.liveTest -def test_iothub_get_properties_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - properties = client.get_properties() - assert properties["partition_ids"] == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -def test_iothub_get_partition_ids_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -def test_iothub_get_partition_properties_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partition_properties = client.get_partition_properties("0") - assert partition_properties["id"] == "0" - - -@pytest.mark.liveTest -def test_iothub_receive_after_mgmt_ops_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') - with receiver: - received = receiver.receive(timeout=10) - assert len(received) == 0 - - -@pytest.mark.liveTest -def test_iothub_mgmt_ops_after_receive_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - with receiver: - received = receiver.receive(timeout=10) - assert len(received) == 0 - - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py deleted file mode 100644 index 9660a79947bd..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py +++ /dev/null @@ -1,24 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import os -import pytest -import time -import uuid - -from uamqp.message import MessageProperties - -from azure.eventhub import EventData, EventHubClient - - -@pytest.mark.liveTest -def test_iothub_send_single_event(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - sender = client.create_producer(operation='/messages/devicebound') - try: - sender.send(EventData(b"A single event", to_device=device_id)) - finally: - sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py deleted file mode 100644 index 5a6e42a827e3..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import argparse -import time -import os -import sys -import threading -import pytest - -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition -from azure.eventhub import EventHubClient -from azure.eventhub import EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - -logger = get_logger("recv_test.log", logging.INFO) - - -def pump(receiver, duration): - total = 0 - iteration = 0 - deadline = time.time() + duration - with receiver: - try: - while time.time() < deadline: - batch = receiver.receive(timeout=5) - size = len(batch) - total += size - iteration += 1 - if size == 0: - print("{}: No events received, queue size {}, delivered {}".format( - receiver._partition, - receiver.queue_size, - total)) - elif iteration >= 5: - iteration = 0 - print("{}: total received {}, last sn={}, last offset={}".format( - receiver._partition, - total, - batch[-1].sequence_number, - batch[-1].offset)) - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("EventHubConsumer failed: {}".format(e)) - raise - - -@pytest.mark.liveTest -def test_long_running_receive(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if args.partitions: - partitions = args.partitions.split(",") - else: - partitions = client.get_partition_ids() - - threads = [] - for pid in partitions: - consumer = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300) - thread = threading.Thread(target=pump, args=(consumer, args.duration)) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - - -if __name__ == '__main__': - test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py deleted file mode 100644 index 93e7e85b4287..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -send test -""" - -import argparse -import time -import os -import sys -import threading -import logging -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventHubClient, EventDataBatch, EventData, EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("send_test.log", logging.INFO) - - -def send(sender, args): - # sender = client.create_producer() - deadline = time.time() + args.duration - total = 0 - try: - with sender: - batch = sender.create_batch() - while time.time() < deadline: - data = EventData(body=b"D" * args.payload) - try: - batch.try_add(data) - total += 1 - except ValueError: - sender.send(batch, timeout=0) - print("Sent total {} of partition {}".format(total, sender._partition)) - batch = sender.create_batch() - except Exception as err: - print("Partition {} send failed {}".format(sender._partition, err)) - raise - print("Sent total {} of partition {}".format(total, sender._partition)) - - -@pytest.mark.liveTest -def test_long_running_send(connection_str): - if sys.platform.startswith('darwin'): - import pytest - pytest.skip("Skipping on OSX") - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--payload", help="payload size", type=int, default=512) - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - try: - partition_ids = client.get_partition_ids() - threads = [] - for pid in partition_ids: - sender = client.create_producer(partition_id=pid) - thread = threading.Thread(target=send, args=(sender, args)) - thread.start() - threads.append(thread) - thread.join() - except KeyboardInterrupt: - pass - - -if __name__ == '__main__': - test_long_running_send(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index c9da841f71b2..eed03ae62bfe 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -276,6 +276,7 @@ def test_receive_over_websocket_sync(connstr_senders): @pytest.mark.liveTest def test_receive_run_time_metric(connstr_senders): + pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False)