diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py index a14ce27029b2..d4117cad1ebd 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py @@ -98,7 +98,7 @@ async def get_properties(self): """ alt_creds = { "username": self._auth_config.get("iot_username"), - "password":self._auth_config.get("iot_password")} + "password": self._auth_config.get("iot_password")} try: mgmt_auth = self._create_auth(**alt_creds) mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug) @@ -114,12 +114,12 @@ async def get_properties(self): output = {} if eh_info: output['path'] = eh_info[b'name'].decode('utf-8') - output['created_at'] = datetime.datetime.utcfromtimestamp(float(eh_info[b'created_at'])/1000) + output['created_at'] = datetime.datetime.utcfromtimestamp(float(eh_info[b'created_at']) / 1000) output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']] return output finally: await mgmt_client.close_async() - + async def get_partition_ids(self): """ Get partition ids of the specified EventHub async. @@ -179,7 +179,7 @@ def create_consumer( self, consumer_group, partition_id, event_position, owner_level=None, operation=None, prefetch=None, loop=None): """ - Create an async receiver to the client for a particular consumer group and partition. + Create an async consumer to the client for a particular consumer group and partition. :param consumer_group: The name of the consumer group. Default value is `$Default`. :type consumer_group: str @@ -187,13 +187,13 @@ def create_consumer( :type partition_id: str :param event_position: The position from which to start receiving. :type event_position: ~azure.eventhub.common.EventPosition - :param owner_level: The priority of the exclusive receiver. The client will create an exclusive - receiver if owner_level is set. + :param owner_level: The priority of the exclusive consumer. The client will create an exclusive + consumer if owner_level is set. :type owner_level: int :param operation: An optional operation to be appended to the hostname in the source URL. The value must start with `/` character. :type operation: str - :param prefetch: The message prefetch count of the receiver. Default is 300. + :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param loop: An event loop. If not specified the default event loop will be used. :rtype: ~azure.eventhub.aio.receiver_async.EventHubConsumer @@ -204,7 +204,7 @@ def create_consumer( :end-before: [END create_eventhub_client_async_receiver] :language: python :dedent: 4 - :caption: Add an async receiver to the client for a particular consumer group and partition. + :caption: Add an async consumer to the client for a particular consumer group and partition. """ prefetch = self.config.prefetch if prefetch is None else prefetch @@ -220,7 +220,7 @@ def create_consumer( def create_producer( self, partition_id=None, operation=None, send_timeout=None, loop=None): """ - Create an async sender to the client to send ~azure.eventhub.common.EventData object + Create an async producer to the client to send ~azure.eventhub.common.EventData object to an EventHub. :param partition_id: Optionally specify a particular partition to send to. @@ -243,7 +243,7 @@ def create_producer( :end-before: [END create_eventhub_client_async_sender] :language: python :dedent: 4 - :caption: Add an async sender to the client to + :caption: Add an async producer to the client to send ~azure.eventhub.common.EventData object to an EventHub. """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py index 00818195a479..4fa465b75924 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py @@ -28,7 +28,7 @@ def __init__( # pylint: disable=super-init-not-called self, client, source, event_position=None, prefetch=300, owner_level=None, keep_alive=None, auto_reconnect=True, loop=None): """ - Instantiate an async receiver. + Instantiate an async consumer. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -39,8 +39,8 @@ def __init__( # pylint: disable=super-init-not-called :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int - :param owner_level: The priority of the exclusive receiver. It will an exclusive - receiver if owner_level is set. + :param owner_level: The priority of the exclusive consumer. It will an exclusive + consumer if owner_level is set. :type owner_level: int :param loop: An event loop. """ @@ -155,7 +155,7 @@ async def __anext__(self): def _check_closed(self): if self.error: - raise EventHubError("This receiver has been closed. Please create a new receiver to receive event data.", + raise EventHubError("This consumer has been closed. Please create a new consumer to receive event data.", self.error) async def _open(self): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py index c526d17faf4a..23347bee9069 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py @@ -29,7 +29,7 @@ def __init__( # pylint: disable=super-init-not-called self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True, loop=None): """ - Instantiate an EventHub event SenderAsync handler. + Instantiate an async EventHubProducer. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -44,7 +44,7 @@ def __init__( # pylint: disable=super-init-not-called :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is `None`, i.e. no keep alive pings. :type keep_alive: float - :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + :param auto_reconnect: Whether to automatically reconnect the producer if a retryable error occurs. Default value is `True`. :type auto_reconnect: bool :param loop: An event loop. If not specified the default event loop will be used. @@ -59,7 +59,7 @@ def __init__( # pylint: disable=super-init-not-called self.timeout = send_timeout self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) self.reconnect_backoff = 1 - self.name = "EHSender-{}".format(uuid.uuid4()) + self.name = "EHProducer-{}".format(uuid.uuid4()) self.unsent_events = None self.redirected = None self.error = None @@ -305,7 +305,7 @@ async def _send_event_data(self): def _check_closed(self): if self.error: - raise EventHubError("This sender has been closed. Please create a new sender to send event data.", + raise EventHubError("This producer has been closed. Please create a new producer to send event data.", self.error) @staticmethod diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 66ef6a307a69..ab77bb0e189d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -193,7 +193,7 @@ def create_consumer( ): # type: (str, str, EventPosition, int, str, int) -> EventHubConsumer """ - Create a receiver to the client for a particular consumer group and partition. + Create a consumer to the client for a particular consumer group and partition. :param consumer_group: The name of the consumer group. Default value is `$Default`. :type consumer_group: str @@ -201,13 +201,13 @@ def create_consumer( :type partition_id: str :param event_position: The position from which to start receiving. :type event_position: ~azure.eventhub.common.EventPosition - :param owner_level: The priority of the exclusive receiver. The client will create an exclusive - receiver if owner_level is set. + :param owner_level: The priority of the exclusive consumer. The client will create an exclusive + consumer if owner_level is set. :type owner_level: int :param operation: An optional operation to be appended to the hostname in the source URL. The value must start with `/` character. :type operation: str - :param prefetch: The message prefetch count of the receiver. Default is 300. + :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :rtype: ~azure.eventhub.receiver.EventHubConsumer @@ -217,7 +217,7 @@ def create_consumer( :end-before: [END create_eventhub_client_receiver] :language: python :dedent: 4 - :caption: Add a receiver to the client for a particular consumer group and partition. + :caption: Add a consumer to the client for a particular consumer group and partition. """ prefetch = self.config.prefetch if prefetch is None else prefetch @@ -253,7 +253,7 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None): :end-before: [END create_eventhub_client_sender] :language: python :dedent: 4 - :caption: Add a sender to the client to send EventData object to an EventHub. + :caption: Add a producer to the client to send EventData object to an EventHub. """ target = "amqps://{}{}".format(self.address.hostname, self.address.path) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index f0817e0189c9..1dba09ac09ae 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -117,7 +117,7 @@ def __init__(self, host, event_hub_path, credential, **kwargs): ~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the transport type is explicitly requested. :type transport_type: ~azure.eventhub.TransportType - :param prefetch: The message prefetch count of the receiver. Default is 300. + :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch. @@ -181,7 +181,7 @@ def from_connection_string(cls, conn_str, event_hub_path=None, **kwargs): ~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the transport type is explicitly requested. :type transport_type: ~azure.eventhub.TransportType - :param prefetch: The message prefetch count of the receiver. Default is 300. + :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch. @@ -237,7 +237,7 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): ~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the transport type is explicitly requested. :type transport_type: ~azure.eventhub.TransportType - :param prefetch: The message prefetch count of the receiver. Default is 300. + :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index df526d83da93..2f1cebefc3c7 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -273,7 +273,7 @@ def _set_partition_key(self, value): class EventPosition(object): """ - The position(offset, sequence or timestamp) where a receiver starts. Examples: + The position(offset, sequence or timestamp) where a consumer starts. Examples: Beginning of the event stream: >>> event_pos = EventPosition("-1") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py index d7816d3aeeba..5fb8eaccc80c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py @@ -30,7 +30,7 @@ class EventHubConsumer(object): def __init__(self, client, source, event_position=None, prefetch=300, owner_level=None, keep_alive=None, auto_reconnect=True): """ - Instantiate a receiver. + Instantiate a consumer. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient @@ -39,8 +39,8 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int - :param owner_level: The priority of the exclusive receiver. It will an exclusive - receiver if owner_level is set. + :param owner_level: The priority of the exclusive consumer. It will an exclusive + consumer if owner_level is set. :type owner_level: int """ self.running = False @@ -157,7 +157,7 @@ def __next__(self): def _check_closed(self): if self.error: - raise EventHubError("This receiver has been closed. Please create a new receiver to receive event data.", + raise EventHubError("This consumer has been closed. Please create a new consumer to receive event data.", self.error) def _redirect(self, redirect): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py index 736bcd7a397c..2380ffb7932c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py @@ -28,7 +28,7 @@ class EventHubProducer(object): def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ - Instantiate an EventHub event EventHubProducer handler. + Instantiate an EventHubProducer. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. @@ -43,7 +43,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings. :type keep_alive: float - :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + :param auto_reconnect: Whether to automatically reconnect the producer if a retryable error occurs. Default value is `True`. :type auto_reconnect: bool """ @@ -58,7 +58,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) self.reconnect_backoff = 1 - self.name = "EHSender-{}".format(uuid.uuid4()) + self.name = "EHProducer-{}".format(uuid.uuid4()) self.unsent_events = None if partition: self.target += "/Partitions/" + partition @@ -300,7 +300,7 @@ def _send_event_data(self): def _check_closed(self): if self.error: - raise EventHubError("This sender has been closed. Please create a new sender to send event data.", self.error) + raise EventHubError("This producer has been closed. Please create a new producer to send event data.", self.error) @staticmethod def _set_partition_key(event_datas, partition_key): diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py index 7bf5227f8054..7e66ad71e28e 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py @@ -6,7 +6,7 @@ # -------------------------------------------------------------------------------------------- """ -An example to show iterator receiver. +An example to show iterator consumer. """ import os @@ -30,10 +30,10 @@ EVENT_POSITION = EventPosition.first_available_event() -async def iter_receiver(receiver): - async with receiver: - async for item in receiver: - print(item.body_as_str(), item.offset.value, receiver.name) +async def iter_consumer(consumer): + async with consumer: + async for item in consumer: + print(item.body_as_str(), item.offset.value, consumer.name) async def main(): @@ -41,8 +41,8 @@ async def main(): raise ValueError("No EventHubs URL supplied.") client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION) - await iter_receiver(receiver) + consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION) + await iter_consumer(consumer) if __name__ == '__main__': asyncio.run(main()) diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py index f6bb9de8d32f..cc0b0f317c70 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py @@ -6,7 +6,7 @@ # -------------------------------------------------------------------------------------------- """ -An example to show running concurrent receivers. +An example to show running concurrent consumers. """ import os @@ -31,11 +31,11 @@ async def pump(client, partition): - receiver = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, prefetch=5) - async with receiver: + consumer = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, prefetch=5) + async with consumer: total = 0 start_time = time.time() - for event_data in await receiver.receive(timeout=10): + for event_data in await consumer.receive(timeout=10): last_offset = event_data.offset last_sn = event_data.sequence_number print("Received: {}, {}".format(last_offset.value, last_sn)) diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py index 76a56b8519b2..ffae6787628d 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py @@ -30,16 +30,16 @@ async def run(client): - sender = client.create_producer() - await send(sender, 4) + producer = client.create_producer() + await send(producer, 4) -async def send(sender, count): - async with sender: +async def send(producer, count): + async with producer: for i in range(count): logger.info("Sending message: {}".format(i)) data = EventData(str(i)) - await sender.send(data) + await producer.send(data) try: if not HOSTNAME: diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py index 9790259c5129..c22e602781df 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py @@ -31,36 +31,36 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): # [START create_eventhub_client_async_sender] client = EventHubClient.from_connection_string(connection_str) - # Create an async sender. - sender = client.create_producer(partition_id="0") + # Create an async producer. + producer = client.create_producer(partition_id="0") # [END create_eventhub_client_async_sender] # [START create_eventhub_client_async_receiver] client = EventHubClient.from_connection_string(connection_str) - # Create an async receiver. + # Create an async consumer. receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) - # Create an exclusive async receiver. + # Create an exclusive async consumer. receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), owner_level=1) # [END create_eventhub_client_async_receiver] client = EventHubClient.from_connection_string(connection_str) - sender = client.create_producer(partition_id="0") - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + producer = client.create_producer(partition_id="0") + consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) - await receiver.receive(timeout=1) + await consumer.receive(timeout=1) # [START eventhub_client_async_send] - async with sender: + async with producer: event_data = EventData(b"A single event") - await sender.send(event_data) + await producer.send(event_data) # [END eventhub_client_async_send] await asyncio.sleep(1) # [START eventhub_client_async_receive] logger = logging.getLogger("azure.eventhub") - async with receiver: - received = await receiver.receive(timeout=5) + async with consumer: + received = await consumer.receive(timeout=5) for event_data in received: logger.info("Message received:{}".format(event_data.body_as_str())) # [END eventhub_client_async_receive] @@ -70,35 +70,35 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): @pytest.mark.asyncio -async def test_example_eventhub_async_sender_ops(live_eventhub_config, connection_str): +async def test_example_eventhub_async_producer_ops(live_eventhub_config, connection_str): from azure.eventhub.aio import EventHubClient from azure.eventhub import EventData # [START eventhub_client_async_sender_close] client = EventHubClient.from_connection_string(connection_str) - sender = client.create_producer(partition_id="0") + producer = client.create_producer(partition_id="0") try: - await sender.send(EventData(b"A single event")) + await producer.send(EventData(b"A single event")) finally: # Close down the send handler. - await sender.close() + await producer.close() # [END eventhub_client_async_sender_close] @pytest.mark.asyncio -async def test_example_eventhub_async_receiver_ops(live_eventhub_config, connection_str): +async def test_example_eventhub_async_consumer_ops(live_eventhub_config, connection_str): from azure.eventhub.aio import EventHubClient from azure.eventhub import EventPosition # [START eventhub_client_async_receiver_close] client = EventHubClient.from_connection_string(connection_str) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) try: # Open and receive - await receiver.receive(timeout=1) + await consumer.receive(timeout=1) except: raise finally: # Close down the receive handler. - await receiver.close() + await consumer.close() # [END eventhub_client_async_receiver_close] diff --git a/sdk/eventhub/azure-eventhubs/examples/batch_send.py b/sdk/eventhub/azure-eventhubs/examples/batch_send.py index e9a2e31a0092..a539775c062a 100644 --- a/sdk/eventhub/azure-eventhubs/examples/batch_send.py +++ b/sdk/eventhub/azure-eventhubs/examples/batch_send.py @@ -33,16 +33,16 @@ raise ValueError("No EventHubs URL supplied.") client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False) - sender = client.create_producer(partition_id="1") + producer = client.create_producer(partition_id="1") event_list = [] for i in range(1500): event_list.append('Hello World') - with sender: + with producer: start_time = time.time() data = EventData(body=event_list) - sender.send(data) + producer.send(data) end_time = time.time() run_time = end_time - start_time logger.info("Runtime: {} seconds".format(run_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py b/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py index 1a0840e02cfe..54d439281f2b 100644 --- a/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py +++ b/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py @@ -36,11 +36,11 @@ event_hub_path=EVENT_HUB, credential=credential) try: - sender = client.create_producer(partition_id='0') + producer = client.create_producer(partition_id='0') - with sender: + with producer: event = EventData(body='A single message') - sender.send(event) + producer.send(event) except KeyboardInterrupt: pass diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py b/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py index b70b03284f65..d0c11c970dd8 100644 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py +++ b/sdk/eventhub/azure-eventhubs/examples/iothub_recv.py @@ -19,9 +19,9 @@ iot_connection_str = 'HostName=iothubfortrack2py.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=glF9a2n0D9fgmWpfTqjjmvkYt0WaTNqZx9GV/UKwDkQ=' # os.environ['IOTHUB_CONNECTION_STR'] client = EventHubClient.from_iothub_connection_string(iot_connection_str, network_tracing=False) -receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') -with receiver: - received = receiver.receive(timeout=5) +consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events') +with consumer: + received = consumer.receive(timeout=5) print(received) eh_info = client.get_properties() diff --git a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py b/sdk/eventhub/azure-eventhubs/examples/iothub_send.py index 152afd81355e..cc664a73ad45 100644 --- a/sdk/eventhub/azure-eventhubs/examples/iothub_send.py +++ b/sdk/eventhub/azure-eventhubs/examples/iothub_send.py @@ -21,9 +21,9 @@ client = EventHubClient.from_iothub_connection_string(iot_connection_str, network_tracing=False) try: - sender = client.create_producer(operation='/messages/devicebound') - with sender: - sender.send(EventData(b"A single event", to_device=iot_device_id)) + producer = client.create_producer(operation='/messages/devicebound') + with producer: + producer.send(EventData(b"A single event", to_device=iot_device_id)) except KeyboardInterrupt: pass diff --git a/sdk/eventhub/azure-eventhubs/examples/iterator_receiver.py b/sdk/eventhub/azure-eventhubs/examples/iterator_receiver.py index 22bd4b4e14ea..fe20fc8503dd 100644 --- a/sdk/eventhub/azure-eventhubs/examples/iterator_receiver.py +++ b/sdk/eventhub/azure-eventhubs/examples/iterator_receiver.py @@ -25,20 +25,20 @@ EVENT_POSITION = EventPosition.first_available_event() -class PartitionReceiverThread(Thread): - def __init__(self, receiver): +class PartitionConsumerThread(Thread): + def __init__(self, consumer): Thread.__init__(self) - self.receiver = receiver + self.consumer = consumer def run(self): - for item in self.receiver: + for item in self.consumer: print(item) client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False) -receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION) -with receiver: - thread = PartitionReceiverThread(receiver) +consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION) +with consumer: + thread = PartitionConsumerThread(consumer) thread.start() thread.join(2) # stop after 2 seconds diff --git a/sdk/eventhub/azure-eventhubs/examples/proxy.py b/sdk/eventhub/azure-eventhubs/examples/proxy.py index 9eb74d12c418..e8d7e3fff9ad 100644 --- a/sdk/eventhub/azure-eventhubs/examples/proxy.py +++ b/sdk/eventhub/azure-eventhubs/examples/proxy.py @@ -39,10 +39,10 @@ client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False, http_proxy=HTTP_PROXY) try: - sender = client.create_producer(partition_id=PARTITION) - receiver = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION) + producer = client.create_producer(partition_id=PARTITION) + consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION) - receiver.receive(timeout=1) + consumer.receive(timeout=1) event_list = [] for i in range(20): @@ -50,12 +50,12 @@ print('Start sending events behind a proxy.') - sender.send(event_list) + producer.send(event_list) print('Start receiving events behind a proxy.') - received = receiver.receive(max_batch_size=50, timeout=5) + received = consumer.receive(max_batch_size=50, timeout=5) finally: - sender.close() - receiver.close() + producer.close() + consumer.close() diff --git a/sdk/eventhub/azure-eventhubs/examples/recv.py b/sdk/eventhub/azure-eventhubs/examples/recv.py index e8609d91db97..7f78a8a5ad51 100644 --- a/sdk/eventhub/azure-eventhubs/examples/recv.py +++ b/sdk/eventhub/azure-eventhubs/examples/recv.py @@ -33,10 +33,10 @@ network_tracing=False) try: - receiver = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION, prefetch=5000) - with receiver: + consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION, prefetch=5000) + with consumer: start_time = time.time() - batch = receiver.receive(timeout=5000) + batch = consumer.receive(timeout=5000) while batch: for event_data in batch: last_offset = event_data.offset @@ -44,7 +44,7 @@ print("Received: {}, {}".format(last_offset.value, last_sn)) print(event_data.body_as_str()) total += 1 - batch = receiver.receive(timeout=5000) + batch = consumer.receive(timeout=5000) end_time = time.time() run_time = end_time - start_time diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_batch.py b/sdk/eventhub/azure-eventhubs/examples/recv_batch.py index 2b36f089274f..0b37769d0242 100644 --- a/sdk/eventhub/azure-eventhubs/examples/recv_batch.py +++ b/sdk/eventhub/azure-eventhubs/examples/recv_batch.py @@ -34,9 +34,9 @@ client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False) try: - receiver = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION, prefetch=100) - with receiver: - batched_events = receiver.receive(max_batch_size=10) + consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION, prefetch=100) + with consumer: + batched_events = consumer.receive(max_batch_size=10) for event_data in batched_events: last_offset = event_data.offset.value last_sn = event_data.sequence_number diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_epoch.py b/sdk/eventhub/azure-eventhubs/examples/recv_epoch.py index 878b4d7de263..4217874771ad 100644 --- a/sdk/eventhub/azure-eventhubs/examples/recv_epoch.py +++ b/sdk/eventhub/azure-eventhubs/examples/recv_epoch.py @@ -6,7 +6,7 @@ # -------------------------------------------------------------------------------------------- """ -An example to show receiving events from an Event Hub partition as an epoch receiver. +An example to show receiving events from an Event Hub partition as an epoch consumer. """ import os @@ -26,16 +26,15 @@ USER = os.environ.get('EVENT_HUB_SAS_POLICY') KEY = os.environ.get('EVENT_HUB_SAS_KEY') -EXCLUSIVE_RECEIVER_PRIORITY = 42 PARTITION = "0" async def pump(client, owner_level): - receiver = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EventPosition("-1"), owner_level=owner_level) - async with receiver: + consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EventPosition("-1"), owner_level=owner_level) + async with consumer: total = 0 start_time = time.time() - for event_data in await receiver.receive(timeout=5): + for event_data in await consumer.receive(timeout=5): last_offset = event_data.offset last_sn = event_data.sequence_number total += 1 diff --git a/sdk/eventhub/azure-eventhubs/examples/send.py b/sdk/eventhub/azure-eventhubs/examples/send.py index 7f319bcfb1dc..4954b1146b23 100644 --- a/sdk/eventhub/azure-eventhubs/examples/send.py +++ b/sdk/eventhub/azure-eventhubs/examples/send.py @@ -33,16 +33,16 @@ client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False) - sender = client.create_producer(partition_id="0") + producer = client.create_producer(partition_id="0") ed = EventData("msg") try: start_time = time.time() - with sender: + with producer: for i in range(100): logger.info("Sending message: {}".format(i)) - sender.send(ed) + producer.send(ed) except: raise finally: diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index d46f5a0b63e8..6b1552047fab 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -57,25 +57,25 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config): from azure.eventhub import EventData, EventPosition - # [START create_eventhub_client_sender] + # [START create_eventhub_client_producer] client = EventHubClient.from_connection_string(connection_str) - # Create a sender. - sender = client.create_producer(partition_id="0") + # Create a producer. + producer = client.create_producer(partition_id="0") # [END create_eventhub_client_sender] # [START create_eventhub_client_receiver] client = EventHubClient.from_connection_string(connection_str) - # Create a receiver. - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) - # Create an exclusive receiver object. - exclusive_receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=1) + # Create a consumer. + consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + # Create an exclusive consumer object. + exclusive_consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=1) # [END create_eventhub_client_receiver] client = EventHubClient.from_connection_string(connection_str) - sender = client.create_producer(partition_id="0") - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + producer = client.create_producer(partition_id="0") + consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) try: - receiver.receive(timeout=1) + consumer.receive(timeout=1) # [START create_event_data] event_data = EventData("String data") @@ -87,16 +87,16 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config): # [END create_event_data] # [START eventhub_client_sync_send] - with sender: + with producer: event_data = EventData(b"A single event") - sender.send(event_data) + producer.send(event_data) # [END eventhub_client_sync_send] time.sleep(1) # [START eventhub_client_sync_receive] - with receiver: + with consumer: logger = logging.getLogger("azure.eventhub") - received = receiver.receive(timeout=5, max_batch_size=1) + received = consumer.receive(timeout=5, max_batch_size=1) for event_data in received: logger.info("Message received:{}".format(event_data.body_as_str())) # [END eventhub_client_sync_receive] @@ -107,30 +107,30 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config): pass -def test_example_eventhub_sender_ops(live_eventhub_config, connection_str): +def test_example_eventhub_producer_ops(live_eventhub_config, connection_str): from azure.eventhub import EventHubClient, EventData # [START eventhub_client_sender_close] client = EventHubClient.from_connection_string(connection_str) - sender = client.create_producer(partition_id="0") + producer = client.create_producer(partition_id="0") try: - sender.send(EventData(b"A single event")) + producer.send(EventData(b"A single event")) finally: # Close down the send handler. - sender.close() + producer.close() # [END eventhub_client_sender_close] -def test_example_eventhub_receiver_ops(live_eventhub_config, connection_str): +def test_example_eventhub_consumer_ops(live_eventhub_config, connection_str): from azure.eventhub import EventHubClient from azure.eventhub import EventPosition # [START eventhub_client_receiver_close] client = EventHubClient.from_connection_string(connection_str) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) try: - receiver.receive(timeout=1) + consumer.receive(timeout=1) finally: # Close down the receive handler. - receiver.close() + consumer.close() # [END eventhub_client_receiver_close]