From e3a1c907326b595eff36f0379e78b2b2ac092f7a Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 23 Sep 2019 18:26:06 -0700 Subject: [PATCH 01/18] Remove iothub/link-redirect related code --- .../eventhub/_consumer_producer_mixin.py | 51 +++-------- .../aio/_consumer_producer_mixin_async.py | 47 +++------- .../azure/eventhub/aio/client_async.py | 33 +------ .../azure/eventhub/aio/consumer_async.py | 63 +++----------- .../azure/eventhub/aio/error_async.py | 5 -- .../azure/eventhub/aio/producer_async.py | 16 +--- .../azure-eventhubs/azure/eventhub/client.py | 28 +----- .../azure/eventhub/client_abstract.py | 70 +++------------ .../azure-eventhubs/azure/eventhub/common.py | 15 +--- .../azure/eventhub/consumer.py | 40 ++------- .../azure-eventhubs/azure/eventhub/error.py | 5 -- .../azure/eventhub/producer.py | 28 ++---- sdk/eventhub/azure-eventhubs/conftest.py | 16 ---- .../azure-eventhubs/examples/iothub_recv.py | 23 ----- .../azure-eventhubs/examples/iothub_send.py | 20 ----- .../examples/test_examples_eventhub.py | 10 --- .../asynctests/test_iothub_receive_async.py | 85 ------------------- .../tests/test_iothub_receive.py | 64 -------------- .../azure-eventhubs/tests/test_iothub_send.py | 24 ------ 19 files changed, 62 insertions(+), 581 deletions(-) delete mode 100644 sdk/eventhub/azure-eventhubs/examples/iothub_recv.py delete mode 100644 sdk/eventhub/azure-eventhubs/examples/iothub_send.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index a14da749ee78..e0d3202f6feb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -18,46 +18,34 @@ def __init__(self): self._client = None self._handler = None self._name = None + self._running = False + self._closed = False def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close(exc_val) + self.close() def _check_closed(self): - if self._error: + if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass - def _redirect(self, redirect): - self._redirected = redirect - self._running = False - self._close_connection() - def _open(self): - """ - Open the EventHubConsumer/EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. + """Open the EventHubConsumer/EventHubProducer using the supplied connection. """ # pylint: disable=protected-access if not self._running: if self._handler: self._handler.close() - if self._redirected: - alt_creds = { - "username": self._client._auth_config.get("iot_username"), - "password": self._client._auth_config.get("iot_password")} - else: - alt_creds = {} self._create_handler() self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access self._client._address.hostname, - self._client._get_auth(**alt_creds) + self._client._create_auth() )) while not self._handler.client_ready(): time.sleep(0.05) @@ -76,8 +64,6 @@ def _close_connection(self): def _handle_exception(self, exception): if not self._running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") - return _handle_exception(exception, self) - return _handle_exception(exception, self) def _do_retryable_operation(self, operation, timeout=100000, **kwargs): @@ -102,16 +88,11 @@ def _do_retryable_operation(self, operation, timeout=100000, **kwargs): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - def close(self, exception=None): - # type:(Exception) -> None + def close(self): + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -122,16 +103,8 @@ def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: # type: ignore - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("{} handler is closed.".format(self._name)) if self._handler: self._handler.close() # this will close link if sharing connection. Otherwise close connection + self._running = False + self._closed = True + diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 444edd15a8a1..628a8f4755d3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -19,46 +19,35 @@ def __init__(self): self._client = None self._handler = None self._name = None + self._running = False + self._closed = False async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close(exc_val) + await self.close() def _check_closed(self): - if self._error: + if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) def _create_handler(self): pass - async def _redirect(self, redirect): - self._redirected = redirect - self._running = False - await self._close_connection() - async def _open(self): """ Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. """ # pylint: disable=protected-access if not self._running: if self._handler: await self._handler.close_async() - if self._redirected: - alt_creds = { - "username": self._client._auth_config.get("iot_username"), - "password": self._client._auth_config.get("iot_password")} - else: - alt_creds = {} self._create_handler() await self._handler.open_async(connection=await self._client._conn_manager.get_connection( self._client._address.hostname, - self._client._get_auth(**alt_creds) + self._client._create_auth() )) while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) @@ -103,16 +92,11 @@ async def _do_retryable_operation(self, operation, timeout=100000, **kwargs): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -123,18 +107,7 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: #type: ignore - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self._error = ConnectError(str(exception), exception) - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("This receive handler is now closed.") if self._handler: await self._handler.close_async() + self._running = False + self._closed = True diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 88b693d157ec..2499cb4efa40 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -58,23 +58,19 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() - def _create_auth(self, username=None, password=None): + def _create_auth(self): """ Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate the session. - :param username: The name of the shared access policy. - :type username: str - :param password: The shared access key. - :type password: str """ http_proxy = self._config.http_proxy transport_type = self._config.transport_type auth_timeout = self._config.auth_timeout if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = username or self._auth_config['username'] - password = password or self._auth_config['password'] + username = self._credential.policy + password = self._credential.key if "@sas.root" in username: return authentication.SASLPlain( self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) @@ -117,14 +113,9 @@ async def _try_delay(self, retried_times, last_exception, timeout_time=None, ent raise last_exception async def _management_request(self, mgmt_msg, op_type): - alt_creds = { - "username": self._auth_config.get("iot_username"), - "password": self._auth_config.get("iot_password") - } - retried_times = 0 while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth(**alt_creds) + mgmt_auth = self._create_auth() mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing) try: conn = await self._conn_manager.get_connection(self._host, mgmt_auth) @@ -143,18 +134,6 @@ async def _management_request(self, mgmt_msg, op_type): finally: await mgmt_client.close_async() - async def _iothub_redirect(self): - async with self._lock: - if self._is_iothub and not self._iothub_redirect_info: - if not self._redirect_consumer: - self._redirect_consumer = self.create_consumer(consumer_group='$default', - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - async with self._redirect_consumer: - await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access - self._redirect_consumer = None - async def get_properties(self): # type:() -> Dict[str, Any] """ @@ -168,8 +147,6 @@ async def get_properties(self): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name}) response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub') output = {} @@ -209,8 +186,6 @@ async def get_partition_properties(self, partition): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name, 'partition': partition}) response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition') diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index efad6a3cb7db..05533bbca5a8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -5,7 +5,7 @@ import asyncio import uuid import logging -from typing import List +from typing import List, Any import time from uamqp import errors, types # type: ignore @@ -66,7 +66,6 @@ def __init__( # pylint: disable=super-init-not-called super(EventHubConsumer, self).__init__() self._loop = loop or asyncio.get_event_loop() - self._running = False self._client = client self._source = source self._offset = event_position @@ -77,8 +76,6 @@ def __init__( # pylint: disable=super-init-not-called self._auto_reconnect = auto_reconnect self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 - self._redirected = None - self._error = None self._link_properties = {} partition = self._source.split('/')[-1] self._partition = partition @@ -114,17 +111,12 @@ async def __anext__(self): raise last_exception def _create_handler(self): - alt_creds = { - "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access - "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access - } - source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access self._handler = ReceiveClientAsync( source, - auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, @@ -137,25 +129,6 @@ def _create_handler(self): loop=self._loop) self._messages_iter = None - async def _redirect(self, redirect): - self._messages_iter = None - await super(EventHubConsumer, self)._redirect(redirect) - - async def _open(self): - """ - Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - # pylint: disable=protected-access - self._redirected = self._redirected or self._client._iothub_redirect_info - - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) - self._source = self._redirected.address - await super(EventHubConsumer, self)._open() - async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) @@ -199,7 +172,7 @@ def queue_size(self): return 0 async def receive(self, *, max_batch_size=None, timeout=None): - # type: (int, float) -> List[EventData] + # type: (Any, int, float) -> List[EventData] """ Receive events asynchronously from the EventHub. @@ -232,16 +205,11 @@ async def receive(self, *, max_batch_size=None, timeout=None): return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -252,18 +220,7 @@ async def close(self, exception=None): :caption: Close down the handler. """ - self._running = False - if self._error: - return - if isinstance(exception, errors.LinkRedirect): - self._redirected = exception - elif isinstance(exception, EventHubError): - self._error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self._error = ConnectError(str(exception), exception) - elif exception: - self._error = EventHubError(str(exception)) - else: - self._error = EventHubError("This receive handler is now closed.") - if self._handler: - await self._handler.close_async() + if self._messages_iter: + await self._messages_iter.close() + self._messages_iter = None + await super(EventHubConsumer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py index ae1cd8084f3d..7bbc3b6153c1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py @@ -68,11 +68,6 @@ async def _handle_exception(exception, closable): # pylint:disable=too-many-bra if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): await closable._close_connection() # pylint:disable=protected-access - elif isinstance(exception, errors.LinkRedirect): - log.info("%r link redirect received. Redirecting...", name) - redirect = exception - if hasattr(closable, "_redirect"): - await closable._redirect(redirect) # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): await closable._close_handler() # pylint:disable=protected-access diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index ec4e39c87116..c1dfed7e1e7b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -63,7 +63,6 @@ def __init__( # pylint: disable=super-init-not-called super(EventHubProducer, self).__init__() self._loop = loop or asyncio.get_event_loop() self._max_message_size_on_link = None - self._running = False self._client = client self._target = target self._partition = partition @@ -74,7 +73,6 @@ def __init__( # pylint: disable=super-init-not-called self._reconnect_backoff = 1 self._name = "EHProducer-{}".format(uuid.uuid4()) self._unsent_events = None - self._redirected = None self._error = None if partition: self._target += "/Partitions/" + partition @@ -87,7 +85,7 @@ def __init__( # pylint: disable=super-init-not-called def _create_handler(self): self._handler = SendClientAsync( self._target, - auth=self._client._get_auth(), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, @@ -98,18 +96,6 @@ def _create_handler(self): self._client._config.user_agent), # pylint:disable=protected-access loop=self._loop) - async def _open(self): - """ - Open the EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access - self._target = self._redirected.address - await super(EventHubProducer, self)._open() - async def _open_with_retry(self): return await self._do_retryable_operation(self._open, operation_need_param=False) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 06d264b5b9ac..054dc0f2e712 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -58,7 +58,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def _create_auth(self, username=None, password=None): + def _create_auth(self): """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -74,8 +74,8 @@ def _create_auth(self, username=None, password=None): # TODO: the following code can be refactored to create auth from classes directly instead of using if-else if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = username or self._auth_config['username'] - password = password or self._auth_config['password'] + username = self._credential.policy + password = self._credential.key if "@sas.root" in username: return authentication.SASLPlain( self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) @@ -119,14 +119,9 @@ def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_na raise last_exception def _management_request(self, mgmt_msg, op_type): - alt_creds = { - "username": self._auth_config.get("iot_username"), - "password": self._auth_config.get("iot_password") - } - retried_times = 0 while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth(**alt_creds) + mgmt_auth = self._create_auth() mgmt_client = uamqp.AMQPClient(self._mgmt_target) try: conn = self._conn_manager.get_connection(self._host, mgmt_auth) #pylint:disable=assignment-from-none @@ -145,17 +140,6 @@ def _management_request(self, mgmt_msg, op_type): finally: mgmt_client.close() - def _iothub_redirect(self): - with self._lock: - if self._is_iothub and not self._iothub_redirect_info: - if not self._redirect_consumer: - self._redirect_consumer = self.create_consumer(consumer_group='$default', - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - with self._redirect_consumer: - self._redirect_consumer._open_with_retry() # pylint: disable=protected-access - self._redirect_consumer = None def get_properties(self): # type:() -> Dict[str, Any] @@ -170,8 +154,6 @@ def get_properties(self): :rtype: dict :raises: ~azure.eventhub.EventHubError """ - if self._is_iothub and not self._iothub_redirect_info: - self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name}) response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub') output = {} @@ -211,8 +193,6 @@ def get_partition_properties(self, partition): :rtype: dict :raises: ~azure.eventhub.ConnectError """ - if self._is_iothub and not self._iothub_redirect_info: - self._iothub_redirect() mgmt_msg = Message(application_properties={'name': self.eh_name, 'partition': partition}) response = self._management_request(mgmt_msg, op_type=b'com.microsoft:partition') diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index c6879730266c..504de024f80f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -139,53 +139,18 @@ def __init__(self, host, event_hub_path, credential, **kwargs): self._address = _Address() self._address.hostname = host self._address.path = "/" + event_hub_path if event_hub_path else "" - self._auth_config = {} # type:Dict[str,str] self._credential = credential - if isinstance(credential, EventHubSharedKeyCredential): - self._username = credential.policy - self._password = credential.key - self._auth_config['username'] = self._username - self._auth_config['password'] = self._password - self._keep_alive = kwargs.get("keep_alive", 30) self._auto_reconnect = kwargs.get("auto_reconnect", True) self._mgmt_target = "amqps://{}/{}".format(self._host, self.eh_name) self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) - self._get_auth = functools.partial(self._create_auth) self._config = _Configuration(**kwargs) self._debug = self._config.network_tracing - self._is_iothub = False - self._iothub_redirect_info = None - self._redirect_consumer = None log.info("%r: Created the Event Hub client", self._container_id) - @classmethod - def _from_iothub_connection_string(cls, conn_str, **kwargs): - address, policy, key, _ = _parse_conn_str(conn_str) - hub_name = address.split('.')[0] - username = "{}@sas.root.{}".format(policy, hub_name) - password = _generate_sas_token(address, policy, key) - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - client = cls(host, "", EventHubSharedKeyCredential(username, password), **kwargs) - client._auth_config = { # pylint: disable=protected-access - 'iot_username': policy, - 'iot_password': key, - 'username': username, - 'password': password} - client._is_iothub = True # pylint: disable=protected-access - client._redirect_consumer = client.create_consumer(consumer_group='$default', # pylint: disable=protected-access, no-member - partition_id='0', - event_position=EventPosition('-1'), - operation='/messages/events') - return client - @abstractmethod - def _create_auth(self, username=None, password=None): + def _create_auth(self): pass def _create_properties(self, user_agent=None): # pylint: disable=no-self-use @@ -214,22 +179,11 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use properties["user-agent"] = final_user_agent return properties - def _process_redirect_uri(self, redirect): - redirect_uri = redirect.address.decode('utf-8') - auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") - self._address = urlparse(auth_uri) - self._host = self._address.hostname - self.eh_name = self._address.path.lstrip('/') - self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) - self._mgmt_target = redirect_uri - if self._is_iothub: - self._iothub_redirect_info = redirect - @classmethod def from_connection_string(cls, conn_str, **kwargs): - """Create an EventHubClient from an EventHub/IotHub connection string. + """Create an EventHubClient from an EventHub connection string. - :param conn_str: The connection string of an eventhub or IoT hub + :param conn_str: The connection string of an eventhub :type conn_str: str :param event_hub_path: The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string. @@ -274,15 +228,11 @@ def from_connection_string(cls, conn_str, **kwargs): """ event_hub_path = kwargs.pop("event_hub_path", None) - is_iot_conn_str = conn_str.lstrip().lower().startswith("hostname") - if not is_iot_conn_str: # pylint:disable=no-else-return - address, policy, key, entity = _parse_conn_str(conn_str) - entity = event_hub_path or entity - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) + address, policy, key, entity = _parse_conn_str(conn_str) + entity = event_hub_path or entity + left_slash_pos = address.find("//") + if left_slash_pos != -1: + host = address[left_slash_pos + 2:] else: - return cls._from_iothub_connection_string(conn_str, **kwargs) + host = address + return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5923d7f57972..0ae647236380 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -55,16 +55,13 @@ class EventData(object): PROP_PARTITION_KEY = b"x-opt-partition-key" PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY) PROP_TIMESTAMP = b"x-opt-enqueued-time" - PROP_DEVICE_ID = b"iothub-connection-device-id" - def __init__(self, body=None, to_device=None): + def __init__(self, body=None): """ Initialize EventData. :param body: The data to send in a single message. :type body: str, bytes or list - :param to_device: An IoT device to route to. - :type to_device: str """ self._annotations = {} @@ -156,16 +153,6 @@ def enqueued_time(self): return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None - @property - def device_id(self): - """ - The device ID of the event data object. This is only used for - IoT Hub implementations. - - :rtype: bytes - """ - return self._annotations.get(EventData.PROP_DEVICE_ID, None) - @property def partition_key(self): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 604d9c7d7b82..b909d81a91ce 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -74,7 +74,6 @@ def __init__(self, client, source, **kwargs): self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 self._link_properties = {} - self._redirected = None self._error = None partition = self._source.split('/')[-1] self._partition = partition @@ -110,17 +109,12 @@ def __next__(self): raise last_exception def _create_handler(self): - alt_creds = { - "username": self._client._auth_config.get("iot_username") if self._redirected else None, # pylint:disable=protected-access - "password": self._client._auth_config.get("iot_password") if self._redirected else None # pylint:disable=protected-access - } - source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access self._handler = ReceiveClient( source, - auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, @@ -132,25 +126,6 @@ def _create_handler(self): self._client._config.user_agent)) # pylint:disable=protected-access self._messages_iter = None - def _redirect(self, redirect): - self._messages_iter = None - super(EventHubConsumer, self)._redirect(redirect) - - def _open(self): - """ - Open the EventHubConsumer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - # pylint: disable=protected-access - self._redirected = self._redirected or self._client._iothub_redirect_info - - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) - self._source = self._redirected.address - super(EventHubConsumer, self)._open() - def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) @@ -226,16 +201,11 @@ def receive(self, max_batch_size=None, timeout=None): return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - def close(self, exception=None): - # type:(Exception) -> None + def close(self): + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -249,6 +219,6 @@ def close(self, exception=None): if self._messages_iter: self._messages_iter.close() self._messages_iter = None - super(EventHubConsumer, self).close(exception) + super(EventHubConsumer, self).close() next = __next__ # for python2.7 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 129cf14a3842..755925bca743 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -187,11 +187,6 @@ def _handle_exception(exception, closable): # pylint:disable=too-many-branches, if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access - elif isinstance(exception, errors.LinkRedirect): - log.info("%r link redirect received. Redirecting...", name) - redirect = exception - if hasattr(closable, "_redirect"): - closable._redirect(redirect) # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): closable._close_handler() # pylint:disable=protected-access diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index cab9638f2acc..b1339908ec0e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -77,7 +77,6 @@ def __init__(self, client, target, **kwargs): self._target = target self._partition = partition self._timeout = send_timeout - self._redirected = None self._error = None self._keep_alive = keep_alive self._auto_reconnect = auto_reconnect @@ -96,7 +95,7 @@ def __init__(self, client, target, **kwargs): def _create_handler(self): self._handler = SendClient( self._target, - auth=self._client._get_auth(), # pylint:disable=protected-access + auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, @@ -105,18 +104,6 @@ def _create_handler(self): link_properties=self._link_properties, properties=self._client._create_properties(self._client._config.user_agent)) # pylint: disable=protected-access - def _open(self): - """ - Open the EventHubProducer using the supplied connection. - If the handler has previously been redirected, the redirect - context will be used to create a new handler before opening it. - - """ - if not self._running and self._redirected: - self._client._process_redirect_uri(self._redirected) # pylint: disable=protected-access - self._target = self._redirected.address - super(EventHubProducer, self)._open() - def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) @@ -237,16 +224,11 @@ def send(self, event_data, partition_key=None, timeout=None): self._unsent_events = [wrapper_event_data.message] self._send_event_data_with_retry(timeout=timeout) - def close(self, exception=None): # pylint:disable=useless-super-delegation - # type:(Exception) -> None + def close(self): # pylint:disable=useless-super-delegation + # type:() -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -257,4 +239,4 @@ def close(self, exception=None): # pylint:disable=useless-super-delegation :caption: Close down the handler. """ - super(EventHubProducer, self).close(exception) + super(EventHubProducer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/conftest.py b/sdk/eventhub/azure-eventhubs/conftest.py index ed0212e85562..78174fc2699b 100644 --- a/sdk/eventhub/azure-eventhubs/conftest.py +++ b/sdk/eventhub/azure-eventhubs/conftest.py @@ -152,22 +152,6 @@ def invalid_policy(live_eventhub_config): live_eventhub_config['event_hub']) -@pytest.fixture() -def iot_connection_str(): - try: - return os.environ['IOTHUB_CONNECTION_STR'] - except KeyError: - pytest.skip("No IotHub connection string found.") - - -@pytest.fixture() -def device_id(): - try: - return os.environ['IOTHUB_DEVICE'] - except KeyError: - pytest.skip("No Iothub device ID found.") - - @pytest.fixture() def aad_credential(): try: diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py b/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py deleted file mode 100644 index ecc935669d13..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an IoT Hub partition. -""" -import os - -from azure.eventhub import EventHubClient, EventPosition - -iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - -client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) -consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), - operation='/messages/events') - -with consumer: - received = consumer.receive(timeout=5) - print(received) diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py b/sdk/eventhub/azure-eventhubs/examples/iothub_send.py deleted file mode 100644 index c2f8f3379259..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an IoT Hub partition. -""" -import os -from azure.eventhub import EventData, EventHubClient - -iot_device_id = os.environ['IOTHUB_DEVICE'] -iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - -client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) -producer = client.create_producer(operation='/messages/devicebound') -with producer: - producer.send(EventData(b"A single event", to_device=iot_device_id)) diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index 52145f9222f0..8ff334ef971f 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -27,16 +27,6 @@ def create_eventhub_client(live_eventhub_config): return client -def create_eventhub_client_from_iothub_connection_string(live_eventhub_config): - # [START create_eventhub_client_iot_connstr] - import os - from azure.eventhub import EventHubClient - - iot_connection_str = os.environ['IOTHUB_CONNECTION_STR'] - client = EventHubClient.from_connection_string(iot_connection_str) - # [END create_eventhub_client_iot_connstr] - - def test_example_eventhub_sync_send_and_receive(live_eventhub_config): # [START create_eventhub_client_connstr] import os diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py deleted file mode 100644 index 4ac63eef5d7f..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py +++ /dev/null @@ -1,85 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import asyncio -import pytest - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventPosition - - -async def pump(receiver, sleep=None): - messages = 0 - if sleep: - await asyncio.sleep(sleep) - async with receiver: - batch = await receiver.receive(timeout=10) - messages += len(batch) - return messages - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_receive_multiple_async(iot_connection_str): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - receivers = [] - for p in partitions: - receivers.append(client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1"), prefetch=10, operation='/messages/events')) - outputs = await asyncio.gather(*[pump(r) for r in receivers]) - - assert isinstance(outputs[0], int) and outputs[0] <= 10 - assert isinstance(outputs[1], int) and outputs[1] <= 10 - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_properties_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - properties = await client.get_properties() - assert properties["partition_ids"] == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_partition_ids_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_get_partition_properties_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partition_properties = await client.get_partition_properties("0") - assert partition_properties["id"] == "0" - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_receive_after_mgmt_ops_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') - async with receiver: - received = await receiver.receive(timeout=10) - assert len(received) == 0 - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_iothub_mgmt_ops_after_receive_async(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - async with receiver: - received = await receiver.receive(timeout=10) - assert len(received) == 0 - - partitions = await client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py deleted file mode 100644 index 595c822b9cb7..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_receive.py +++ /dev/null @@ -1,64 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import pytest - -from azure.eventhub import EventPosition, EventHubClient - - -@pytest.mark.liveTest -def test_iothub_receive_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - try: - received = receiver.receive(timeout=10) - assert len(received) == 0 - finally: - receiver.close() - - -@pytest.mark.liveTest -def test_iothub_get_properties_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - properties = client.get_properties() - assert properties["partition_ids"] == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -def test_iothub_get_partition_ids_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - - -@pytest.mark.liveTest -def test_iothub_get_partition_properties_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partition_properties = client.get_partition_properties("0") - assert partition_properties["id"] == "0" - - -@pytest.mark.liveTest -def test_iothub_receive_after_mgmt_ops_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] - receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events') - with receiver: - received = receiver.receive(timeout=10) - assert len(received) == 0 - - -@pytest.mark.liveTest -def test_iothub_mgmt_ops_after_receive_sync(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') - with receiver: - received = receiver.receive(timeout=10) - assert len(received) == 0 - - partitions = client.get_partition_ids() - assert partitions == ["0", "1", "2", "3"] diff --git a/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py b/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py deleted file mode 100644 index 9660a79947bd..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_iothub_send.py +++ /dev/null @@ -1,24 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import os -import pytest -import time -import uuid - -from uamqp.message import MessageProperties - -from azure.eventhub import EventData, EventHubClient - - -@pytest.mark.liveTest -def test_iothub_send_single_event(iot_connection_str, device_id): - client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False) - sender = client.create_producer(operation='/messages/devicebound') - try: - sender.send(EventData(b"A single event", to_device=device_id)) - finally: - sender.close() From e6b12464547a03ed4dde85d81e09f011390a02cc Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 23 Sep 2019 18:41:42 -0700 Subject: [PATCH 02/18] Remove self._running from consumer and producer --- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 1 - sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 1 - 2 files changed, 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index b909d81a91ce..21753c1d9acd 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -62,7 +62,6 @@ def __init__(self, client, source, **kwargs): auto_reconnect = kwargs.get("auto_reconnect", True) super(EventHubConsumer, self).__init__() - self._running = False self._client = client self._source = source self._offset = event_position diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index b1339908ec0e..4b824bb78b72 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -72,7 +72,6 @@ def __init__(self, client, target, **kwargs): super(EventHubProducer, self).__init__() self._max_message_size_on_link = None - self._running = False self._client = client self._target = target self._partition = partition From 7208b584968677a124abd6a1aa9be2ef0ae48ccb Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 23 Sep 2019 18:52:38 -0700 Subject: [PATCH 03/18] Remove IoT related params "operation" and "device" --- .../azure/eventhub/aio/client_async.py | 13 +------------ .../azure-eventhubs/azure/eventhub/client.py | 13 +++---------- .../azure-eventhubs/azure/eventhub/common.py | 4 ---- 3 files changed, 4 insertions(+), 26 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 2499cb4efa40..d7418bbecdb9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -221,9 +221,6 @@ def create_consumer( :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. :type owner_level: int - :param operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param loop: An event loop. If not specified the default event loop will be used. @@ -239,13 +236,11 @@ def create_consumer( """ owner_level = kwargs.get("owner_level") - operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch loop = kwargs.get("loop") - path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self._address.hostname, path, consumer_group, partition_id) + self._address.hostname, self._address.path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, prefetch=prefetch, loop=loop) @@ -254,7 +249,6 @@ def create_consumer( def create_producer( self, *, partition_id: str = None, - operation: str = None, send_timeout: float = None, loop: asyncio.AbstractEventLoop = None ) -> EventHubProducer: @@ -265,9 +259,6 @@ def create_producer( If omitted, the events will be distributed to available partitions via round-robin. :type partition_id: str - :param operation: An optional operation to be appended to the hostname in the target URL. - The value must start with `/` character. - :type operation: str :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float @@ -285,8 +276,6 @@ def create_producer( """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) - if operation: - target = target + operation send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 054dc0f2e712..3871292df64f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -224,9 +224,6 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. :type owner_level: int - :param operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :rtype: ~azure.eventhub.consumer.EventHubConsumer @@ -241,19 +238,17 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs """ owner_level = kwargs.get("owner_level") - operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch - path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self._address.hostname, path, consumer_group, partition_id) + self._address.hostname, self._address.path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, prefetch=prefetch) return handler - def create_producer(self, partition_id=None, operation=None, send_timeout=None): - # type: (str, str, float) -> EventHubProducer + def create_producer(self, partition_id=None, send_timeout=None): + # type: (str, str) -> EventHubProducer """ Create an producer to send EventData object to an EventHub. @@ -280,8 +275,6 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None): """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) - if operation: - target = target + operation send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 0ae647236380..9ae585cc6b41 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -67,8 +67,6 @@ def __init__(self, body=None): self._annotations = {} self._app_properties = {} self._msg_properties = MessageProperties() - if to_device: - self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) if body and isinstance(body, list): self.message = Message(body[0], properties=self._msg_properties) for more in body[1:]: @@ -90,8 +88,6 @@ def __str__(self): dic['offset'] = str(self.offset) if self.enqueued_time: dic['enqueued_time'] = str(self.enqueued_time) - if self.device_id: - dic['device_id'] = str(self.device_id) if self.partition_key: dic['partition_key'] = str(self.partition_key) return str(dic) From 1785ec1fe55ac516381bead9efda81c998ecd153 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 24 Sep 2019 12:12:08 -0700 Subject: [PATCH 04/18] Remove exception from close() --- .../azure/eventhub/aio/producer_async.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index c1dfed7e1e7b..afefca294474 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -218,16 +218,11 @@ async def send( self._unsent_events = [wrapper_event_data.message] await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor - async def close(self, exception=None): - # type: (Exception) -> None + async def close(self): + # type: () -> None """ Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception + this will be a no op. Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py @@ -238,4 +233,4 @@ async def close(self, exception=None): :caption: Close down the handler. """ - await super(EventHubProducer, self).close(exception) + await super(EventHubProducer, self).close() From f28cb296460918129d0c00e6ac22985ee887e256 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 24 Sep 2019 23:52:10 -0700 Subject: [PATCH 05/18] add iterator long running test --- ...test_longrunning_receive_iterator_async.py | 126 ++++++++++++++++++ .../test_longrunning_receive_iterator.py | 126 ++++++++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py create mode 100644 sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py new file mode 100644 index 000000000000..ed9d07166212 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +receive test. +""" + +import logging +import asyncio +import argparse +import time +import os +import sys +import pytest +from logging.handlers import RotatingFileHandler + +from azure.eventhub import EventPosition, EventHubSharedKeyCredential +from azure.eventhub.aio import EventHubClient + + +def get_logger(filename, level=logging.INFO): + azure_logger = logging.getLogger("azure.eventhub") + azure_logger.setLevel(level) + uamqp_logger = logging.getLogger("uamqp") + uamqp_logger.setLevel(logging.INFO) + + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not azure_logger.handlers: + azure_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + + if filename: + file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) + file_handler.setFormatter(formatter) + azure_logger.addHandler(file_handler) + uamqp_logger.addHandler(file_handler) + + return azure_logger + + +logger = get_logger("recv_iterator_test_async.log", logging.INFO) + + +async def pump(_pid, receiver, _args, _dl): + total = 0 + iteration = 0 + deadline = time.time() + _dl + + try: + async with receiver: + async for e in receiver: + total += 1 + iteration += 1 + if iteration == 1000: + print("{}: total received {}, last sn={}, last offset={}".format( + receiver._partition, + total, + e.sequence_number, + e.offset)) + iteration = 0 + if time.time() >= deadline: + break + print("{}: Total received {}".format(receiver._partition, total)) + except Exception as e: + print("Partition {} receiver failed: {}".format(_pid, e)) + raise + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_long_running_receive_async(connection_str): + parser = argparse.ArgumentParser() + parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + parser.add_argument("--consumer", help="Consumer group name", default="$default") + parser.add_argument("--partitions", help="Comma seperated partition IDs") + parser.add_argument("--offset", help="Starting offset", default="-1") + parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) + parser.add_argument("--eventhub", help="Name of EventHub") + parser.add_argument("--address", help="Address URI to the EventHub entity") + parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + parser.add_argument("--sas-key", help="Shared access key") + + loop = asyncio.get_event_loop() + args, _ = parser.parse_known_args() + if args.conn_str: + client = EventHubClient.from_connection_string( + args.conn_str, + event_hub_path=args.eventhub, auth_timeout=240, network_tracing=False) + elif args.address: + client = EventHubClient(host=args.address, + event_hub_path=args.eventhub, + credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), + auth_timeout=240, + network_tracing=False) + + else: + try: + import pytest + pytest.skip("Must specify either '--conn-str' or '--address'") + except ImportError: + raise ValueError("Must specify either '--conn-str' or '--address'") + + if not args.partitions: + partitions = await client.get_partition_ids() + else: + partitions = args.partitions.split(",") + pumps = [] + for pid in partitions: + receiver = client.create_consumer(consumer_group="$default", + partition_id=pid, + event_position=EventPosition(args.offset), + prefetch=300, + loop=loop) + pumps.append(pump(pid, receiver, args, args.duration)) + await asyncio.gather(*pumps) + + +if __name__ == '__main__': + asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py new file mode 100644 index 000000000000..d997c0bfdab2 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +receive test. +""" + +import logging +import argparse +import time +import os +import sys +import threading +import pytest + +from logging.handlers import RotatingFileHandler + +from azure.eventhub import EventPosition +from azure.eventhub import EventHubClient +from azure.eventhub import EventHubSharedKeyCredential + + +def get_logger(filename, level=logging.INFO): + azure_logger = logging.getLogger("azure.eventhub") + azure_logger.setLevel(level) + uamqp_logger = logging.getLogger("uamqp") + uamqp_logger.setLevel(logging.INFO) + + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not azure_logger.handlers: + azure_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + + if filename: + file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) + file_handler.setFormatter(formatter) + azure_logger.addHandler(file_handler) + uamqp_logger.addHandler(file_handler) + + return azure_logger + +logger = get_logger("recv_iterator_test.log", logging.INFO) + + +def pump(receiver, duration): + total = 0 + iteration = 0 + deadline = time.time() + duration + with receiver: + try: + for e in receiver: + total += 1 + iteration += 1 + if iteration == 1000: + print("{}: total received {}, last sn={}, last offset={}".format( + receiver._partition, + total, + e.sequence_number, + e.offset)) + iteration = 0 + if time.time() >= deadline: + break + print("{}: Total received {}".format(receiver._partition, total)) + except Exception as e: + print("EventHubConsumer failed: {}".format(e)) + raise + + +@pytest.mark.liveTest +def test_long_running_receive(connection_str): + parser = argparse.ArgumentParser() + parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + parser.add_argument("--consumer", help="Consumer group name", default="$default") + parser.add_argument("--partitions", help="Comma seperated partition IDs") + parser.add_argument("--offset", help="Starting offset", default="-1") + parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) + parser.add_argument("--eventhub", help="Name of EventHub") + parser.add_argument("--address", help="Address URI to the EventHub entity") + parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + parser.add_argument("--sas-key", help="Shared access key") + + args, _ = parser.parse_known_args() + if args.conn_str: + client = EventHubClient.from_connection_string( + args.conn_str, + event_hub_path=args.eventhub, network_tracing=False) + elif args.address: + client = EventHubClient(host=args.address, + event_hub_path=args.eventhub, + credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), + auth_timeout=240, + network_tracing=False) + else: + try: + import pytest + pytest.skip("Must specify either '--conn-str' or '--address'") + except ImportError: + raise ValueError("Must specify either '--conn-str' or '--address'") + + if args.partitions: + partitions = args.partitions.split(",") + else: + partitions = client.get_partition_ids() + + threads = [] + for pid in partitions: + consumer = client.create_consumer(consumer_group="$default", + partition_id=pid, + event_position=EventPosition(args.offset), + prefetch=300) + thread = threading.Thread(target=pump, args=(consumer, args.duration)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + + +if __name__ == '__main__': + test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR')) From 566c415703d5cf4477b72e52fbce0f021b4bdafb Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 25 Sep 2019 00:01:45 -0700 Subject: [PATCH 06/18] small bug fix --- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 5 ++--- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 05533bbca5a8..355b7bff99e3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -220,7 +220,6 @@ async def close(self): :caption: Close down the handler. """ - if self._messages_iter: - await self._messages_iter.close() - self._messages_iter = None + if self._handler: + await self._handler.close_async() await super(EventHubConsumer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 21753c1d9acd..c03e2e6867c9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -215,9 +215,8 @@ def close(self): :caption: Close down the handler. """ - if self._messages_iter: - self._messages_iter.close() - self._messages_iter = None + if self._handler: + self._handler.close() super(EventHubConsumer, self).close() next = __next__ # for python2.7 From 047b1b6738a631a4adb173deb3577dbde57a9794 Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 25 Sep 2019 00:06:40 -0700 Subject: [PATCH 07/18] small bug fix --- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 2 -- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 355b7bff99e3..7f7a933433ce 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -220,6 +220,4 @@ async def close(self): :caption: Close down the handler. """ - if self._handler: - await self._handler.close_async() await super(EventHubConsumer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index c03e2e6867c9..39e5e6a0be8d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -215,8 +215,6 @@ def close(self): :caption: Close down the handler. """ - if self._handler: - self._handler.close() super(EventHubConsumer, self).close() next = __next__ # for python2.7 From 27160e663d2858e0790257f24eaf883b2db5d526 Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 26 Sep 2019 19:35:33 -0700 Subject: [PATCH 08/18] Fix connection properties bug and format --- .../azure/eventhub/client_abstract.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 504de024f80f..d1e518cfb6b6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -6,13 +6,14 @@ import logging import sys +import platform import uuid import time -import functools from abc import abstractmethod -from typing import Dict, Union, Any, TYPE_CHECKING +from typing import Union, Any, TYPE_CHECKING -from azure.eventhub import __version__, EventPosition +from uamqp import types +from azure.eventhub import __version__ from azure.eventhub.configuration import _Configuration from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address @@ -161,13 +162,15 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use :rtype: dict """ properties = {} - properties["product"] = "eventhub.python" - properties["version"] = __version__ - properties["framework"] = "Python {}.{}.{}".format(*sys.version_info[0:3]) - properties["platform"] = sys.platform - - final_user_agent = 'azsdk-python-eventhub/{} ({}; {})'.format( - __version__, properties["framework"], sys.platform) + product = "azsdk-python-eventhubs" + properties[types.AMQPSymbol("product")] = product + properties[types.AMQPSymbol("version")] = __version__ + framework = "Python {}.{}.{}, {}".format(*sys.version_info[0:3], platform.python_implementation()) + properties[types.AMQPSymbol("framework")] = framework + platform_str = platform.platform() + properties[types.AMQPSymbol("platform")] = platform_str + + final_user_agent = '{}/{} ({}, {})'.format(product, __version__, framework, platform_str) if user_agent: final_user_agent = '{}, {}'.format(final_user_agent, user_agent) @@ -175,8 +178,7 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use raise ValueError("The user-agent string cannot be more than {} in length." "Current user_agent string is: {} with length: {}".format( MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent))) - - properties["user-agent"] = final_user_agent + properties[types.AMQPSymbol("user-agent")] = final_user_agent return properties @classmethod From d3780b30686362e533ea8e4196780bc91351ae1e Mon Sep 17 00:00:00 2001 From: yijxie Date: Fri, 27 Sep 2019 09:26:45 -0700 Subject: [PATCH 09/18] Changed product to azure-eventhub in user agent --- sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index d1e518cfb6b6..a2fd18ae098a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -162,7 +162,7 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use :rtype: dict """ properties = {} - product = "azsdk-python-eventhubs" + product = "azure-eventhub" properties[types.AMQPSymbol("product")] = product properties[types.AMQPSymbol("version")] = __version__ framework = "Python {}.{}.{}, {}".format(*sys.version_info[0:3], platform.python_implementation()) From 6de2f7237ede7fed14275f7a45692cf138481c2e Mon Sep 17 00:00:00 2001 From: yijxie Date: Fri, 27 Sep 2019 16:46:55 -0700 Subject: [PATCH 10/18] Fix a type hint --- sdk/eventhub/azure-eventhubs/azure/eventhub/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 3871292df64f..e42c01597570 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -248,7 +248,7 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs return handler def create_producer(self, partition_id=None, send_timeout=None): - # type: (str, str) -> EventHubProducer + # type: (str, float) -> EventHubProducer """ Create an producer to send EventData object to an EventHub. From fb756f4eb17b5957eb1c2b455f5631b63831fb22 Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 30 Sep 2019 18:22:13 -0700 Subject: [PATCH 11/18] Improve stress script --- .../stress/azure_eventhub_stress.py | 263 ++++++++++++++++++ .../test_long_running_eventprocessor.py | 156 ----------- .../test_longrunning_receive_async.py | 131 --------- ...test_longrunning_receive_iterator_async.py | 126 --------- .../asynctests/test_longrunning_send_async.py | 125 --------- .../tests/test_longrunning_receive.py | 131 --------- .../test_longrunning_receive_iterator.py | 126 --------- .../tests/test_longrunning_send.py | 118 -------- 8 files changed, 263 insertions(+), 913 deletions(-) create mode 100644 sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py delete mode 100644 sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py new file mode 100644 index 000000000000..0fc76e938efa --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -0,0 +1,263 @@ +import sys +import os +import logging +import threading +import time +import asyncio +from logging.handlers import RotatingFileHandler +from argparse import ArgumentParser +from azure.eventhub import EventHubClient, EventPosition, EventData, \ + EventHubConsumer, EventHubProducer, EventHubSharedKeyCredential, EventDataBatch +from azure.eventhub.aio import EventHubClient as EventHubClientAsync +from azure.identity import ClientSecretCredential + + +def stress_receive_sync(receiver, args, logger): + batch = receiver.receive(timeout=5) + return len(batch) + + +async def stress_receive_async(receiver, args, logger): + batch = await receiver.receive(timeout=5) + return len(batch) + + +def stress_receive_iterator_sync(receiver, args, logger): + duration = args.duration + deadline = time.time() + duration + total_count = 0 + logging_count = 0 + try: + for _ in receiver: + total_count += 1 + logging_count += 1 + if logging_count >= args.output_interval: + logger.info("Partition:%r, received:%r", receiver._partition, total_count) + logging_count -= args.output_interval + if time.time() > deadline: + break + finally: + return total_count + + +async def stress_receive_iterator_async(receiver, args, logger): + duration = args.duration + deadline = time.time() + duration + total_count = 0 + logging_count = 0 + try: + async for _ in receiver: + total_count += 1 + logging_count += 1 + if logging_count >= args.output_interval: + logger.info("Partition:%r, received:%r", receiver._partition, total_count) + logging_count -= args.output_interval + if time.time() > deadline: + break + finally: + return total_count + + +def stress_send_sync(producer: EventHubProducer, args, logger): + batch = producer.create_batch() + try: + while True: + event_data = EventData(body=b"D" * args.payload) + batch.try_add(event_data) + except ValueError: + producer.send(batch) + return len(batch) + + +async def stress_send_async(producer: EventHubProducer, args, logger): + batch = await producer.create_batch() + try: + while True: + event_data = EventData(body=b"D" * args.payload) + batch.try_add(event_data) + except ValueError: + await producer.send(batch) + return len(batch) + + +def get_logger(filename, method_name, level=logging.INFO): + stress_logger = logging.getLogger(method_name) + stress_logger.setLevel(level) + azure_logger = logging.getLogger("azure.eventhub") + azure_logger.setLevel(level) + uamqp_logger = logging.getLogger("uamqp") + uamqp_logger.setLevel(level) + + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not azure_logger.handlers: + azure_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + if not stress_logger.handlers: + stress_logger.addHandler(console_handler) + + if filename: + file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) + file_handler.setFormatter(formatter) + azure_logger.addHandler(file_handler) + uamqp_logger.addHandler(file_handler) + stress_logger.addHandler(file_handler) + + return stress_logger + + +class StressTestRunner(object): + def __init__(self, argument_parser): + self.argument_parser = argument_parser + self.argument_parser.add_argument("-m", "--method", required=True) + self.argument_parser.add_argument("--output_interval", type=float, default=1000) + self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + self.argument_parser.add_argument("--consumer", help="Consumer group name", default="$default") + self.argument_parser.add_argument("--offset", help="Starting offset", default="-1") + self.argument_parser.add_argument("-p", "--partitions", help="Comma seperated partition IDs", default="0") + self.argument_parser.add_argument("--conn-str", help="EventHub connection string", + default=os.environ.get('EVENT_HUB_PERF_CONN_STR')) + self.argument_parser.add_argument("--eventhub", help="Name of EventHub") + self.argument_parser.add_argument("--address", help="Address URI to the EventHub entity") + self.argument_parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + self.argument_parser.add_argument("--sas-key", help="Shared access key") + self.argument_parser.add_argument("--aad_client_id", help="AAD client id") + self.argument_parser.add_argument("--aad_secret", help="AAD secret") + self.argument_parser.add_argument("--aad_tenant_id", help="AAD tenant id") + self.argument_parser.add_argument("--payload", help="payload size", type=int, default=1024) + self.args, _ = parser.parse_known_args() + + self.running = False + + def create_client(self, client_class): + if self.args.conn_str: + client = client_class.from_connection_string( + self.args.conn_str, + event_hub_path=self.args.eventhub, network_tracing=False) + elif self.args.address: + client = client_class(host=self.args.address, + event_hub_path=self.args.eventhub, + credential=EventHubSharedKeyCredential(self.args.sas_policy, self.args.sas_key), + auth_timeout=240, + network_tracing=False) + elif self.args.aad_client_id: + client = client_class(host=self.args.address, + event_hub_path=self.args.eventhub, + credential=ClientSecretCredential( + self.args.aad_client_id, self.args.aad_secret, self.args.tenant_id + ), + network_tracing=False) + else: + raise ValueError("Argument error. Must have one of connection string, sas and aad credentials") + + return client + + def run(self): + method_name = self.args.method + if "async" in method_name: + loop = asyncio.get_event_loop() + loop.run_until_complete(self.run_async()) + else: + self.run_sync() + + def run_sync(self): + method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, level=logging.INFO) + method_name = self.args.method + test_method = globals()[method_name] + client = self.create_client(EventHubClient) + self.running = True + if self.args.partitions.lower() != "all": + partitions = self.args.partitions.split(",") + else: + partitions = client.get_partition_ids() + threads = [] + for pid in partitions: + if "receive" in method_name: + worker = client.create_consumer(consumer_group="$default", + partition_id=pid, + event_position=EventPosition(self.args.offset), + prefetch=300) + else: # "send" in method_name + worker = client.create_producer(partition_id=pid) + thread = threading.Thread(target=self.run_test_method, args=(test_method, worker, logger)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + + def stop(self): + self.running = False + + def run_test_method(self, test_method, worker, logger): + deadline = time.time() + self.args.duration + with worker: + total_processed = 0 + iter_processed = 0 + while self.running and time.time() < deadline: + try: + processed = test_method(worker, self.args, logger) + total_processed += processed + iter_processed += processed + if iter_processed >= self.args.output_interval: + logger.info("Partition:%r, Total processed: %r", worker._partition, total_processed) + iter_processed -= self.args.output_interval + except KeyboardInterrupt: + logger.info("Partition:%r, keyboard interrupted", worker._partition) + self.stop() + except Exception as e: + logger.exception("Partition:%r, %r failed:", worker._partition, type(worker)) + self.stop() + logger.info("Partition:%r, %r has finished testing", worker._partition, test_method) + + async def run_async(self): + method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, level=logging.INFO) + test_method = globals()[method_name] + client = self.create_client(EventHubClientAsync) + self.running = True + if self.args.partitions.lower() != "all": + partitions = self.args.partitions.split(",") + else: + partitions = await client.get_partition_ids() + tasks = [] + for pid in partitions: + if "receive" in method_name: + worker = client.create_consumer(consumer_group="$default", + partition_id=pid, + event_position=EventPosition(self.args.offset), + prefetch=300) + else: # "send" in method_name + worker = client.create_producer(partition_id=pid) + task = self.run_test_method_async(test_method, worker, logger) + tasks.append(task) + await asyncio.gather(*tasks) + + async def run_test_method_async(self, test_method, worker, logger): + deadline = time.time() + self.args.duration + async with worker: + total_processed = 0 + iter_processed = 0 + while self.running and time.time() < deadline: + try: + processed = await test_method(worker, self.args, logger) + total_processed += processed + iter_processed += processed + if iter_processed >= self.args.output_interval: + logger.info("Partition:%r, Total processed: %r", worker._partition, total_processed) + iter_processed -= self.args.output_interval + except KeyboardInterrupt: + logger.info("Partition:%r, keyboard interrupted", worker._partition) + self.stop() + except Exception as e: + logger.exception("Partition:%r, %r failed:", worker._partition, type(worker)) + self.stop() + logger.info("Partition:%r, %r has finished testing", worker._partition, test_method) + + +if __name__ == '__main__': + parser = ArgumentParser() + runner = StressTestRunner(parser) + runner.run() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py deleted file mode 100644 index 1e3cae9eefa7..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py +++ /dev/null @@ -1,156 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import logging -import asyncio -import sys -import os -import argparse -import time -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub.aio import EventHubClient -from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager -from azure.eventhub import EventData - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub.eventprocessor") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("eph_test_async.log", logging.INFO) - - -class MyEventProcessor(PartitionProcessor): - async def close(self, reason, partition_context): - logger.info("PartitionProcessor closed (reason {}, id {})".format( - reason, - partition_context.partition_id - )) - - async def process_events(self, events, partition_context): - if events: - event = events[-1] - print("Processing id {}, offset {}, sq_number {})".format( - partition_context.partition_id, - event.offset, - event.sequence_number)) - await partition_context.update_checkpoint(event.offset, event.sequence_number) - - async def process_error(self, error, partition_context): - logger.info("Event Processor Error for partition {}, {!r}".format(partition_context.partition_id, error)) - - -async def wait_and_close(host, duration): - """ - Run EventProcessorHost for 30 seconds then shutdown. - """ - await asyncio.sleep(duration) - await host.stop() - - -async def pump(pid, sender, duration): - deadline = time.time() + duration - total = 0 - - try: - async with sender: - event_list = [] - while time.time() < deadline: - data = EventData(body=b"D" * 512) - event_list.append(data) - total += 1 - if total % 100 == 0: - await sender.send(event_list) - event_list = [] - logger.info("{}: Send total {}".format(pid, total)) - except Exception as err: - logger.error("{}: Send failed {}".format(pid, err)) - raise - print("{}: Final Sent total {}".format(pid, total)) - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_eph(live_eventhub): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--container", help="Lease container name", default="nocontextleases") - parser.add_argument("--eventhub", help="Name of EventHub", default=live_eventhub['event_hub']) - parser.add_argument("--namespace", help="Namespace of EventHub", default=live_eventhub['namespace']) - parser.add_argument("--suffix", help="Namespace of EventHub", default="servicebus.windows.net") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with", default=live_eventhub['key_name']) - parser.add_argument("--sas-key", help="Shared access key", default=live_eventhub['access_key']) - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if not args.namespace or not args.eventhub: - try: - import pytest - pytest.skip("Must specify '--namespace' and '--eventhub'") - except ImportError: - raise ValueError("Must specify '--namespace' and '--eventhub'") - - # Queue up some events in the Eventhub - conn_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( - live_eventhub['hostname'], - live_eventhub['key_name'], - live_eventhub['access_key'], - live_eventhub['event_hub']) - client = EventHubClient.from_connection_string(conn_str) - pumps = [] - for pid in ["0", "1"]: - sender = client.create_producer(partition_id=pid, send_timeout=0) - pumps.append(pump(pid, sender, 15)) - results = await asyncio.gather(*pumps, return_exceptions=True) - - assert not any(results) - - # Event loop and host - host = EventProcessor( - client, - live_eventhub['consumer_group'], - MyEventProcessor, - SamplePartitionManager() - ) - - tasks = asyncio.gather( - host.start(), - wait_and_close(host, args.duration), return_exceptions=True) - results = await tasks - assert not any(results) - - -if __name__ == '__main__': - config = {} - config['hostname'] = os.environ['EVENT_HUB_HOSTNAME'] - config['event_hub'] = os.environ['EVENT_HUB_NAME'] - config['key_name'] = os.environ['EVENT_HUB_SAS_POLICY'] - config['access_key'] = os.environ['EVENT_HUB_SAS_KEY'] - config['namespace'] = os.environ['EVENT_HUB_NAMESPACE'] - config['consumer_group'] = "$Default" - config['partition'] = "0" - loop = asyncio.get_event_loop() - loop.run_until_complete(test_long_running_eph(config)) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py deleted file mode 100644 index 50ababacf738..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import asyncio -import argparse -import time -import os -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("recv_test_async.log", logging.INFO) - - -async def pump(_pid, receiver, _args, _dl): - total = 0 - iteration = 0 - deadline = time.time() + _dl - - try: - async with receiver: - while time.time() < deadline: - batch = await receiver.receive(timeout=3) - size = len(batch) - total += size - iteration += 1 - if size == 0: - print("{}: No events received, queue size {}, delivered {}".format( - _pid, - receiver.queue_size, - total)) - elif iteration >= 5: - iteration = 0 - print("{}: total received {}, last sn={}, last offset={}".format( - _pid, - total, - batch[-1].sequence_number, - batch[-1].offset)) - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("Partition {} receiver failed: {}".format(_pid, e)) - raise - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_receive_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, auth_timeout=240, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if not args.partitions: - partitions = await client.get_partition_ids() - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - receiver = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300, - loop=loop) - pumps.append(pump(pid, receiver, args, args.duration)) - await asyncio.gather(*pumps) - - -if __name__ == '__main__': - asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py deleted file mode 100644 index ed9d07166212..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_iterator_async.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import asyncio -import argparse -import time -import os -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("recv_iterator_test_async.log", logging.INFO) - - -async def pump(_pid, receiver, _args, _dl): - total = 0 - iteration = 0 - deadline = time.time() + _dl - - try: - async with receiver: - async for e in receiver: - total += 1 - iteration += 1 - if iteration == 1000: - print("{}: total received {}, last sn={}, last offset={}".format( - receiver._partition, - total, - e.sequence_number, - e.offset)) - iteration = 0 - if time.time() >= deadline: - break - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("Partition {} receiver failed: {}".format(_pid, e)) - raise - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_receive_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, auth_timeout=240, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if not args.partitions: - partitions = await client.get_partition_ids() - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - receiver = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300, - loop=loop) - pumps.append(pump(pid, receiver, args, args.duration)) - await asyncio.gather(*pumps) - - -if __name__ == '__main__': - asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py deleted file mode 100644 index 00279d168d70..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python - -""" -send test -""" - -import logging -import argparse -import time -import os -import asyncio -import sys -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventData, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("send_test_async.log", logging.INFO) - - -async def get_partitions(args): - eh_data = await args.get_properties() - return eh_data["partition_ids"] - - -async def pump(pid, sender, args, duration): - deadline = time.time() + duration - total = 0 - - try: - async with sender: - event_list = [] - while time.time() < deadline: - data = EventData(body=b"D" * args.payload) - event_list.append(data) - total += 1 - if total % 100 == 0: - await sender.send(event_list) - event_list = [] - logger.info("{}: Send total {}".format(pid, total)) - except Exception as err: - logger.error("{}: Send failed {}".format(pid, err)) - raise - print("{}: Final Sent total {}".format(pid, total)) - - -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_long_running_partition_send_async(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--payload", help="payload size", type=int, default=1024) - parser.add_argument("--partitions", help="Comma separated partition IDs") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - parser.add_argument("--logger-name", help="Unique log file ID") - - loop = asyncio.get_event_loop() - args, _ = parser.parse_known_args() - - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - try: - if not args.partitions: - partitions = await client.get_partition_ids() - else: - pid_range = args.partitions.split("-") - if len(pid_range) > 1: - partitions = [str(i) for i in range(int(pid_range[0]), int(pid_range[1]) + 1)] - else: - partitions = args.partitions.split(",") - pumps = [] - for pid in partitions: - sender = client.create_producer(partition_id=pid, send_timeout=0) - pumps.append(pump(pid, sender, args, args.duration)) - results = await asyncio.gather(*pumps, return_exceptions=True) - assert not results - except Exception as e: - logger.error("EventHubProducer failed: {}".format(e)) - - -if __name__ == '__main__': - asyncio.run(test_long_running_partition_send_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py deleted file mode 100644 index 5a6e42a827e3..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import argparse -import time -import os -import sys -import threading -import pytest - -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition -from azure.eventhub import EventHubClient -from azure.eventhub import EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - -logger = get_logger("recv_test.log", logging.INFO) - - -def pump(receiver, duration): - total = 0 - iteration = 0 - deadline = time.time() + duration - with receiver: - try: - while time.time() < deadline: - batch = receiver.receive(timeout=5) - size = len(batch) - total += size - iteration += 1 - if size == 0: - print("{}: No events received, queue size {}, delivered {}".format( - receiver._partition, - receiver.queue_size, - total)) - elif iteration >= 5: - iteration = 0 - print("{}: total received {}, last sn={}, last offset={}".format( - receiver._partition, - total, - batch[-1].sequence_number, - batch[-1].offset)) - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("EventHubConsumer failed: {}".format(e)) - raise - - -@pytest.mark.liveTest -def test_long_running_receive(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if args.partitions: - partitions = args.partitions.split(",") - else: - partitions = client.get_partition_ids() - - threads = [] - for pid in partitions: - consumer = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300) - thread = threading.Thread(target=pump, args=(consumer, args.duration)) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - - -if __name__ == '__main__': - test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py deleted file mode 100644 index d997c0bfdab2..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive_iterator.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -receive test. -""" - -import logging -import argparse -import time -import os -import sys -import threading -import pytest - -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventPosition -from azure.eventhub import EventHubClient -from azure.eventhub import EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - -logger = get_logger("recv_iterator_test.log", logging.INFO) - - -def pump(receiver, duration): - total = 0 - iteration = 0 - deadline = time.time() + duration - with receiver: - try: - for e in receiver: - total += 1 - iteration += 1 - if iteration == 1000: - print("{}: total received {}, last sn={}, last offset={}".format( - receiver._partition, - total, - e.sequence_number, - e.offset)) - iteration = 0 - if time.time() >= deadline: - break - print("{}: Total received {}".format(receiver._partition, total)) - except Exception as e: - print("EventHubConsumer failed: {}".format(e)) - raise - - -@pytest.mark.liveTest -def test_long_running_receive(connection_str): - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--consumer", help="Consumer group name", default="$default") - parser.add_argument("--partitions", help="Comma seperated partition IDs") - parser.add_argument("--offset", help="Starting offset", default="-1") - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub, network_tracing=False) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - if args.partitions: - partitions = args.partitions.split(",") - else: - partitions = client.get_partition_ids() - - threads = [] - for pid in partitions: - consumer = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=300) - thread = threading.Thread(target=pump, args=(consumer, args.duration)) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - - -if __name__ == '__main__': - test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py deleted file mode 100644 index 93e7e85b4287..000000000000 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -send test -""" - -import argparse -import time -import os -import sys -import threading -import logging -import pytest -from logging.handlers import RotatingFileHandler - -from azure.eventhub import EventHubClient, EventDataBatch, EventData, EventHubSharedKeyCredential - - -def get_logger(filename, level=logging.INFO): - azure_logger = logging.getLogger("azure.eventhub") - azure_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - - if filename: - file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) - file_handler.setFormatter(formatter) - azure_logger.addHandler(file_handler) - uamqp_logger.addHandler(file_handler) - - return azure_logger - - -logger = get_logger("send_test.log", logging.INFO) - - -def send(sender, args): - # sender = client.create_producer() - deadline = time.time() + args.duration - total = 0 - try: - with sender: - batch = sender.create_batch() - while time.time() < deadline: - data = EventData(body=b"D" * args.payload) - try: - batch.try_add(data) - total += 1 - except ValueError: - sender.send(batch, timeout=0) - print("Sent total {} of partition {}".format(total, sender._partition)) - batch = sender.create_batch() - except Exception as err: - print("Partition {} send failed {}".format(sender._partition, err)) - raise - print("Sent total {} of partition {}".format(total, sender._partition)) - - -@pytest.mark.liveTest -def test_long_running_send(connection_str): - if sys.platform.startswith('darwin'): - import pytest - pytest.skip("Skipping on OSX") - parser = argparse.ArgumentParser() - parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) - parser.add_argument("--payload", help="payload size", type=int, default=512) - parser.add_argument("--conn-str", help="EventHub connection string", default=connection_str) - parser.add_argument("--eventhub", help="Name of EventHub") - parser.add_argument("--address", help="Address URI to the EventHub entity") - parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") - parser.add_argument("--sas-key", help="Shared access key") - - args, _ = parser.parse_known_args() - if args.conn_str: - client = EventHubClient.from_connection_string( - args.conn_str, - event_hub_path=args.eventhub) - elif args.address: - client = EventHubClient(host=args.address, - event_hub_path=args.eventhub, - credential=EventHubSharedKeyCredential(args.sas_policy, args.sas_key), - auth_timeout=240, - network_tracing=False) - else: - try: - import pytest - pytest.skip("Must specify either '--conn-str' or '--address'") - except ImportError: - raise ValueError("Must specify either '--conn-str' or '--address'") - - try: - partition_ids = client.get_partition_ids() - threads = [] - for pid in partition_ids: - sender = client.create_producer(partition_id=pid) - thread = threading.Thread(target=send, args=(sender, args)) - thread.start() - threads.append(thread) - thread.join() - except KeyboardInterrupt: - pass - - -if __name__ == '__main__': - test_long_running_send(os.environ.get('EVENT_HUB_PERF_CONN_STR')) From af3d3192a659f5b8fddf991579a77feaa93def1c Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 30 Sep 2019 18:37:21 -0700 Subject: [PATCH 12/18] Print to console configurable --- .../stress/azure_eventhub_stress.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py index 0fc76e938efa..14683c1411e0 100644 --- a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -80,7 +80,7 @@ async def stress_send_async(producer: EventHubProducer, args, logger): return len(batch) -def get_logger(filename, method_name, level=logging.INFO): +def get_logger(filename, method_name, level=logging.INFO, print_console=False): stress_logger = logging.getLogger(method_name) stress_logger.setLevel(level) azure_logger = logging.getLogger("azure.eventhub") @@ -89,14 +89,15 @@ def get_logger(filename, method_name, level=logging.INFO): uamqp_logger.setLevel(level) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not azure_logger.handlers: - azure_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - if not stress_logger.handlers: - stress_logger.addHandler(console_handler) + if print_console: + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not azure_logger.handlers: + azure_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + if not stress_logger.handlers: + stress_logger.addHandler(console_handler) if filename: file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3) @@ -127,6 +128,7 @@ def __init__(self, argument_parser): self.argument_parser.add_argument("--aad_secret", help="AAD secret") self.argument_parser.add_argument("--aad_tenant_id", help="AAD tenant id") self.argument_parser.add_argument("--payload", help="payload size", type=int, default=1024) + self.argument_parser.add_argument("--print_console", help="print to console", type=bool, default=False) self.args, _ = parser.parse_known_args() self.running = False @@ -164,8 +166,8 @@ def run(self): def run_sync(self): method_name = self.args.method - logger = get_logger("{}.log".format(method_name), method_name, level=logging.INFO) - method_name = self.args.method + logger = get_logger("{}.log".format(method_name), method_name, + level=logging.INFO, print_console=self.args.print_console) test_method = globals()[method_name] client = self.create_client(EventHubClient) self.running = True @@ -214,7 +216,8 @@ def run_test_method(self, test_method, worker, logger): async def run_async(self): method_name = self.args.method - logger = get_logger("{}.log".format(method_name), method_name, level=logging.INFO) + logger = get_logger("{}.log".format(method_name), method_name, + level=logging.INFO, print_console=self.args.print_console) test_method = globals()[method_name] client = self.create_client(EventHubClientAsync) self.running = True From a8c915b624bef73ed0fa1774274dec50edbe8f9a Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 30 Sep 2019 22:26:39 -0700 Subject: [PATCH 13/18] small changes --- .../azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py | 3 ++- .../azure/eventhub/aio/_consumer_producer_mixin_async.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index e0d3202f6feb..4735d1a6d8e9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -54,7 +54,8 @@ def _open(self): self._running = True def _close_handler(self): - self._handler.close() # close the link (sharing connection) or connection (not sharing) + if self._handler: + self._handler.close() # close the link (sharing connection) or connection (not sharing) self._running = False def _close_connection(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 628a8f4755d3..123c021f7893 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -56,7 +56,8 @@ async def _open(self): self._running = True async def _close_handler(self): - await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) + if self._handler: + await self._handler.close_async() # close the link (sharing connection) or connection (not sharing) self._running = False async def _close_connection(self): From 41b88a9341d9259fad49497e0552210ea193717d Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 1 Oct 2019 10:51:25 -0700 Subject: [PATCH 14/18] Disable tracking last enqueued event properties for uamqp 1.2.2 --- .../azure/eventhub/aio/consumer_async.py | 14 +++++++++----- .../azure-eventhubs/azure/eventhub/consumer.py | 14 +++++++++----- .../tests/asynctests/test_receive_async.py | 1 + sdk/eventhub/azure-eventhubs/tests/test_receive.py | 1 + 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index c0b31fa95ad0..9d576ac378df 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -8,6 +8,7 @@ from typing import List, Any import time +import uamqp # type: ignore from uamqp import errors, types, utils # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore @@ -130,10 +131,13 @@ def _create_handler(self): if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access - desired_capabilities = None - if self._track_last_enqueued_event_properties: - symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] - desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + if uamqp.__version__ <= "1.2.2": # backward compatible until uamqp 1.2.3 is released + desired_capabilities = {} + elif self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = {"desired_capabilities": utils.data_factory(types.AMQPArray(symbol_array))} + else: + desired_capabilities = {"desired_capabilities": None} self._handler = ReceiveClientAsync( source, @@ -147,7 +151,7 @@ def _create_handler(self): client_name=self._name, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access - desired_capabilities=desired_capabilities, # pylint:disable=protected-access + **desired_capabilities, # pylint:disable=protected-access loop=self._loop) self._messages_iter = None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index af576dd3962a..3f4e11d982c9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -9,6 +9,7 @@ import time from typing import List +import uamqp # type: ignore from uamqp import types, errors, utils # type: ignore from uamqp import ReceiveClient, Source # type: ignore @@ -127,10 +128,13 @@ def _create_handler(self): if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access - desired_capabilities = None - if self._track_last_enqueued_event_properties: - symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] - desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + if uamqp.__version__ <= "1.2.2": # backward compatible until uamqp 1.2.3 is released + desired_capabilities = {} + elif self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = {"desired_capabilities": utils.data_factory(types.AMQPArray(symbol_array))} + else: + desired_capabilities = {"desired_capabilities": None} self._handler = ReceiveClient( source, @@ -144,7 +148,7 @@ def _create_handler(self): client_name=self._name, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access - desired_capabilities=desired_capabilities) # pylint:disable=protected-access + **desired_capabilities) # pylint:disable=protected-access self._messages_iter = None def _open_with_retry(self): diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index a9744f259c8c..03987fbf3b77 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -319,6 +319,7 @@ async def test_receive_over_websocket_async(connstr_senders): @pytest.mark.asyncio @pytest.mark.liveTest async def test_receive_run_time_metric_async(connstr_senders): + pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index c9da841f71b2..eed03ae62bfe 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -276,6 +276,7 @@ def test_receive_over_websocket_sync(connstr_senders): @pytest.mark.liveTest def test_receive_run_time_metric(connstr_senders): + pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) From 72be3952d95dd268e08b00cef8660e2b80814e83 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 1 Oct 2019 11:32:58 -0700 Subject: [PATCH 15/18] use different consumer group --- sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py index 14683c1411e0..573480da6b59 100644 --- a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -178,7 +178,7 @@ def run_sync(self): threads = [] for pid in partitions: if "receive" in method_name: - worker = client.create_consumer(consumer_group="$default", + worker = client.create_consumer(consumer_group=self.args.consumer, partition_id=pid, event_position=EventPosition(self.args.offset), prefetch=300) @@ -228,7 +228,7 @@ async def run_async(self): tasks = [] for pid in partitions: if "receive" in method_name: - worker = client.create_consumer(consumer_group="$default", + worker = client.create_consumer(consumer_group=self.args.consumer, partition_id=pid, event_position=EventPosition(self.args.offset), prefetch=300) From a6eb5248a2eafe19e9475876b27a33dc454601f7 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 1 Oct 2019 12:01:45 -0700 Subject: [PATCH 16/18] fix an issue about consumer group --- .../azure-eventhubs/stress/azure_eventhub_stress.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py index 573480da6b59..2c27e2829fd6 100644 --- a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -69,7 +69,7 @@ def stress_send_sync(producer: EventHubProducer, args, logger): return len(batch) -async def stress_send_async(producer: EventHubProducer, args, logger): +async def stress_send_async(producer, args, logger): batch = await producer.create_batch() try: while True: @@ -178,7 +178,7 @@ def run_sync(self): threads = [] for pid in partitions: if "receive" in method_name: - worker = client.create_consumer(consumer_group=self.args.consumer, + worker = client.create_consumer(consumer_group_name=self.args.consumer, partition_id=pid, event_position=EventPosition(self.args.offset), prefetch=300) @@ -228,7 +228,7 @@ async def run_async(self): tasks = [] for pid in partitions: if "receive" in method_name: - worker = client.create_consumer(consumer_group=self.args.consumer, + worker = client.create_consumer(consumer_group_name=self.args.consumer, partition_id=pid, event_position=EventPosition(self.args.offset), prefetch=300) From 3249a4be3349f129617e6cf1ccee24925f4ae856 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 1 Oct 2019 12:07:52 -0700 Subject: [PATCH 17/18] fix an issue about consumer group --- sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py index 2c27e2829fd6..59b8fe76fec8 100644 --- a/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py +++ b/sdk/eventhub/azure-eventhubs/stress/azure_eventhub_stress.py @@ -178,7 +178,7 @@ def run_sync(self): threads = [] for pid in partitions: if "receive" in method_name: - worker = client.create_consumer(consumer_group_name=self.args.consumer, + worker = client.create_consumer(consumer_group=self.args.consumer, partition_id=pid, event_position=EventPosition(self.args.offset), prefetch=300) @@ -228,7 +228,7 @@ async def run_async(self): tasks = [] for pid in partitions: if "receive" in method_name: - worker = client.create_consumer(consumer_group_name=self.args.consumer, + worker = client.create_consumer(consumer_group=self.args.consumer, partition_id=pid, event_position=EventPosition(self.args.offset), prefetch=300) From 0284f3f398eda8b53e6e4e3228587ce57d324fff Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 1 Oct 2019 17:39:11 -0700 Subject: [PATCH 18/18] Fix a get_properties bug --- .../azure-eventhubs/azure/eventhub/aio/client_async.py | 3 +++ sdk/eventhub/azure-eventhubs/azure/eventhub/client.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 784136189c21..7656eff50442 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -114,6 +114,7 @@ async def _try_delay(self, retried_times, last_exception, timeout_time=None, ent async def _management_request(self, mgmt_msg, op_type): retried_times = 0 + last_exception = None while retried_times <= self._config.max_retries: mgmt_auth = self._create_auth() mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing) @@ -133,6 +134,8 @@ async def _management_request(self, mgmt_msg, op_type): retried_times += 1 finally: await mgmt_client.close_async() + log.info("%r returns an exception %r", self._container_id, last_exception) + raise last_exception async def get_properties(self): # type:() -> Dict[str, Any] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 6e76b5ec6a8e..0b8a95cb22f3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -120,6 +120,7 @@ def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_na def _management_request(self, mgmt_msg, op_type): retried_times = 0 + last_exception = None while retried_times <= self._config.max_retries: mgmt_auth = self._create_auth() mgmt_client = uamqp.AMQPClient(self._mgmt_target) @@ -139,6 +140,8 @@ def _management_request(self, mgmt_msg, op_type): retried_times += 1 finally: mgmt_client.close() + log.info("%r returns an exception %r", self._container_id, last_exception) + raise last_exception def get_properties(self):