Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def get_properties(self):
"""
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
"password": self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
Expand All @@ -114,12 +114,12 @@ async def get_properties(self):
output = {}
if eh_info:
output['path'] = eh_info[b'name'].decode('utf-8')
output['created_at'] = datetime.datetime.utcfromtimestamp(float(eh_info[b'created_at'])/1000)
output['created_at'] = datetime.datetime.utcfromtimestamp(float(eh_info[b'created_at']) / 1000)
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
await mgmt_client.close_async()

async def get_partition_ids(self):
"""
Get partition ids of the specified EventHub async.
Expand Down Expand Up @@ -179,21 +179,21 @@ def create_consumer(
self, consumer_group, partition_id, event_position, owner_level=None,
operation=None, prefetch=None, loop=None):
"""
Create an async receiver to the client for a particular consumer group and partition.
Create an async consumer to the client for a particular consumer group and partition.

:param consumer_group: The name of the consumer group. Default value is `$Default`.
:type consumer_group: str
:param partition_id: The ID of the partition.
:type partition_id: str
:param event_position: The position from which to start receiving.
:type event_position: ~azure.eventhub.common.EventPosition
:param owner_level: The priority of the exclusive receiver. The client will create an exclusive
receiver if owner_level is set.
:param owner_level: The priority of the exclusive consumer. The client will create an exclusive
consumer if owner_level is set.
:type owner_level: int
:param operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:param prefetch: The message prefetch count of the receiver. Default is 300.
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:param loop: An event loop. If not specified the default event loop will be used.
:rtype: ~azure.eventhub.aio.receiver_async.EventHubConsumer
Expand All @@ -204,7 +204,7 @@ def create_consumer(
:end-before: [END create_eventhub_client_async_receiver]
:language: python
:dedent: 4
:caption: Add an async receiver to the client for a particular consumer group and partition.
:caption: Add an async consumer to the client for a particular consumer group and partition.

"""
prefetch = self.config.prefetch if prefetch is None else prefetch
Expand All @@ -220,7 +220,7 @@ def create_consumer(
def create_producer(
self, partition_id=None, operation=None, send_timeout=None, loop=None):
"""
Create an async sender to the client to send ~azure.eventhub.common.EventData object
Create an async producer to the client to send ~azure.eventhub.common.EventData object
to an EventHub.

:param partition_id: Optionally specify a particular partition to send to.
Expand All @@ -243,7 +243,7 @@ def create_producer(
:end-before: [END create_eventhub_client_async_sender]
:language: python
:dedent: 4
:caption: Add an async sender to the client to
:caption: Add an async producer to the client to
send ~azure.eventhub.common.EventData object to an EventHub.

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__( # pylint: disable=super-init-not-called
self, client, source, event_position=None, prefetch=300, owner_level=None,
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an async receiver.
Instantiate an async consumer.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand All @@ -39,8 +39,8 @@ def __init__( # pylint: disable=super-init-not-called
:param prefetch: The number of events to prefetch from the service
for processing. Default is 300.
:type prefetch: int
:param owner_level: The priority of the exclusive receiver. It will an exclusive
receiver if owner_level is set.
:param owner_level: The priority of the exclusive consumer. It will an exclusive
consumer if owner_level is set.
:type owner_level: int
:param loop: An event loop.
"""
Expand Down Expand Up @@ -155,7 +155,7 @@ async def __anext__(self):

def _check_closed(self):
if self.error:
raise EventHubError("This receiver has been closed. Please create a new receiver to receive event data.",
raise EventHubError("This consumer has been closed. Please create a new consumer to receive event data.",
self.error)
async def _open(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__( # pylint: disable=super-init-not-called
self, client, target, partition=None, send_timeout=60,
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an EventHub event SenderAsync handler.
Instantiate an async EventHubProducer.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand All @@ -44,7 +44,7 @@ def __init__( # pylint: disable=super-init-not-called
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is `None`, i.e. no keep alive pings.
:type keep_alive: float
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
:param auto_reconnect: Whether to automatically reconnect the producer if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
:param loop: An event loop. If not specified the default event loop will be used.
Expand All @@ -59,7 +59,7 @@ def __init__( # pylint: disable=super-init-not-called
self.timeout = send_timeout
self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
self.name = "EHProducer-{}".format(uuid.uuid4())
self.unsent_events = None
self.redirected = None
self.error = None
Expand Down Expand Up @@ -305,7 +305,7 @@ async def _send_event_data(self):

def _check_closed(self):
if self.error:
raise EventHubError("This sender has been closed. Please create a new sender to send event data.",
raise EventHubError("This producer has been closed. Please create a new producer to send event data.",
self.error)

@staticmethod
Expand Down
12 changes: 6 additions & 6 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,21 @@ def create_consumer(
):
# type: (str, str, EventPosition, int, str, int) -> EventHubConsumer
"""
Create a receiver to the client for a particular consumer group and partition.
Create a consumer to the client for a particular consumer group and partition.

:param consumer_group: The name of the consumer group. Default value is `$Default`.
:type consumer_group: str
:param partition_id: The ID of the partition.
:type partition_id: str
:param event_position: The position from which to start receiving.
:type event_position: ~azure.eventhub.common.EventPosition
:param owner_level: The priority of the exclusive receiver. The client will create an exclusive
receiver if owner_level is set.
:param owner_level: The priority of the exclusive consumer. The client will create an exclusive
consumer if owner_level is set.
:type owner_level: int
:param operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:param prefetch: The message prefetch count of the receiver. Default is 300.
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:rtype: ~azure.eventhub.receiver.EventHubConsumer

Expand All @@ -217,7 +217,7 @@ def create_consumer(
:end-before: [END create_eventhub_client_receiver]
:language: python
:dedent: 4
:caption: Add a receiver to the client for a particular consumer group and partition.
:caption: Add a consumer to the client for a particular consumer group and partition.

"""
prefetch = self.config.prefetch if prefetch is None else prefetch
Expand Down Expand Up @@ -253,7 +253,7 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None):
:end-before: [END create_eventhub_client_sender]
:language: python
:dedent: 4
:caption: Add a sender to the client to send EventData object to an EventHub.
:caption: Add a producer to the client to send EventData object to an EventHub.

"""
target = "amqps://{}{}".format(self.address.hostname, self.address.path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
transport type is explicitly requested.
:type transport_type: ~azure.eventhub.TransportType
:param prefetch: The message prefetch count of the receiver. Default is 300.
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but
will return as soon as service returns no new events. Default value is the same as prefetch.
Expand Down Expand Up @@ -181,7 +181,7 @@ def from_connection_string(cls, conn_str, event_hub_path=None, **kwargs):
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
transport type is explicitly requested.
:type transport_type: ~azure.eventhub.TransportType
:param prefetch: The message prefetch count of the receiver. Default is 300.
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but
will return as soon as service returns no new events. Default value is the same as prefetch.
Expand Down Expand Up @@ -237,7 +237,7 @@ def from_iothub_connection_string(cls, conn_str, **kwargs):
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
transport type is explicitly requested.
:type transport_type: ~azure.eventhub.TransportType
:param prefetch: The message prefetch count of the receiver. Default is 300.
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but
will return as soon as service returns no new events. Default value is the same as prefetch.
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _set_partition_key(self, value):

class EventPosition(object):
"""
The position(offset, sequence or timestamp) where a receiver starts. Examples:
The position(offset, sequence or timestamp) where a consumer starts. Examples:

Beginning of the event stream:
>>> event_pos = EventPosition("-1")
Expand Down
8 changes: 4 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class EventHubConsumer(object):
def __init__(self, client, source, event_position=None, prefetch=300, owner_level=None,
keep_alive=None, auto_reconnect=True):
"""
Instantiate a receiver.
Instantiate a consumer.

:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient
Expand All @@ -39,8 +39,8 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve
:param prefetch: The number of events to prefetch from the service
for processing. Default is 300.
:type prefetch: int
:param owner_level: The priority of the exclusive receiver. It will an exclusive
receiver if owner_level is set.
:param owner_level: The priority of the exclusive consumer. It will an exclusive
consumer if owner_level is set.
:type owner_level: int
"""
self.running = False
Expand Down Expand Up @@ -157,7 +157,7 @@ def __next__(self):

def _check_closed(self):
if self.error:
raise EventHubError("This receiver has been closed. Please create a new receiver to receive event data.",
raise EventHubError("This consumer has been closed. Please create a new consumer to receive event data.",
self.error)

def _redirect(self, redirect):
Expand Down
8 changes: 4 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class EventHubProducer(object):

def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
"""
Instantiate an EventHub event EventHubProducer handler.
Instantiate an EventHubProducer.

:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient.
Expand All @@ -43,7 +43,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is None, i.e. no keep alive pings.
:type keep_alive: float
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
:param auto_reconnect: Whether to automatically reconnect the producer if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
"""
Expand All @@ -58,7 +58,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
self.name = "EHProducer-{}".format(uuid.uuid4())
self.unsent_events = None
if partition:
self.target += "/Partitions/" + partition
Expand Down Expand Up @@ -300,7 +300,7 @@ def _send_event_data(self):

def _check_closed(self):
if self.error:
raise EventHubError("This sender has been closed. Please create a new sender to send event data.", self.error)
raise EventHubError("This producer has been closed. Please create a new producer to send event data.", self.error)

@staticmethod
def _set_partition_key(event_datas, partition_key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# --------------------------------------------------------------------------------------------

"""
An example to show iterator receiver.
An example to show iterator consumer.
"""

import os
Expand All @@ -30,19 +30,19 @@
EVENT_POSITION = EventPosition.first_available_event()


async def iter_receiver(receiver):
async with receiver:
async for item in receiver:
print(item.body_as_str(), item.offset.value, receiver.name)
async def iter_consumer(consumer):
async with consumer:
async for item in consumer:
print(item.body_as_str(), item.offset.value, consumer.name)


async def main():
if not HOSTNAME:
raise ValueError("No EventHubs URL supplied.")
client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY),
network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION)
await iter_receiver(receiver)
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION)
await iter_consumer(consumer)

if __name__ == '__main__':
asyncio.run(main())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# --------------------------------------------------------------------------------------------

"""
An example to show running concurrent receivers.
An example to show running concurrent consumers.
"""

import os
Expand All @@ -31,11 +31,11 @@


async def pump(client, partition):
receiver = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, prefetch=5)
async with receiver:
consumer = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, prefetch=5)
async with consumer:
total = 0
start_time = time.time()
for event_data in await receiver.receive(timeout=10):
for event_data in await consumer.receive(timeout=10):
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Received: {}, {}".format(last_offset.value, last_sn))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@


async def run(client):
sender = client.create_producer()
await send(sender, 4)
producer = client.create_producer()
await send(producer, 4)


async def send(sender, count):
async with sender:
async def send(producer, count):
async with producer:
for i in range(count):
logger.info("Sending message: {}".format(i))
data = EventData(str(i))
await sender.send(data)
await producer.send(data)

try:
if not HOSTNAME:
Expand Down
Loading