Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,39 @@ def _stop_eventprocessor(cls, event_processor):
elif (consumer_group, '-1') in eventhub_client._event_processors:
del eventhub_client._event_processors[(consumer_group, "-1")]

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
"""
Create an EventHubConsumerClient from a connection string.

:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword credential: The credential object used for authentication which implements particular interface
of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential,
~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and
objects that implement `get_token(self, *scopes)` method.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:keyword str user_agent: The user agent that needs to be appended to the built in user agent string.
:keyword int retry_total: The total number of attempts to redo the failed operation when an error happened.
Default value is 3.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
:keyword partition_manager:
stores the load balancing data and checkpoint data when receiving events
if partition_manager is specified. If it's None, this EventHubConsumerClient instance will receive
events without load balancing and checkpoint.
:paramtype partition_manager: Implementation classes of ~azure.eventhub.aio.PartitionManager
:keyword float load_balancing_interval:
When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.
"""
return super(EventHubConsumerClient, cls).from_connection_string(conn_str, **kwargs)

def receive(self, on_events, consumer_group, **kwargs):
# type: (Callable[[PartitionContext, List[EventData]], None], str, Any) -> None
"""Receive events from partition(s) optionally with load balancing and checkpointing.
Expand Down
26 changes: 26 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ def _init_locks_for_producers(self):
for _ in range(num_of_producers):
self._producers_locks.append(threading.Lock())

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
"""
Create an EventHubProducerClient from a connection string.

:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword credential: The credential object used for authentication which implements particular interface
of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential,
~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and
objects that implement `get_token(self, *scopes)` method.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:keyword str user_agent: The user agent that needs to be appended to the built in user agent string.
:keyword int retry_total: The total number of attempts to redo the failed operation when an error happened.
Default value is 3.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
"""
return super(EventHubProducerClient, cls).from_connection_string(conn_str, **kwargs)

def send(self, event_data, **kwargs):
# type: (Union[EventData, EventDataBatch, Iterable[EventData]], Any) -> None
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,39 @@ def __init__(self, host, event_hub_path, credential, **kwargs) -> None:
self._event_processors = dict() # type: Dict[Tuple[str, str], EventProcessor]
self._closed = False

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
"""
Create an EventHubConsumerClient from a connection string.

:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword credential: The credential object used for authentication which implements particular interface
of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential,
~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and
objects that implement `get_token(self, *scopes)` method.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:keyword str user_agent: The user agent that needs to be appended to the built in user agent string.
:keyword int retry_total: The total number of attempts to redo the failed operation when an error happened.
Default value is 3.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
:keyword partition_manager:
stores the load balancing data and checkpoint data when receiving events
if partition_manager is specified. If it's None, this EventHubConsumerClient instance will receive
events without load balancing and checkpoint.
:paramtype partition_manager: Implementation classes of ~azure.eventhub.aio.PartitionManager
:keyword float load_balancing_interval:
When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.
"""
return super(EventHubConsumerClient, cls).from_connection_string(conn_str, **kwargs)

async def receive(
self, on_events, consumer_group: str,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ async def _init_locks_for_producers(self):
self._producers_locks.append(asyncio.Lock())
# self._producers_locks = [asyncio.Lock()] * num_of_producers

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
"""
Create an EventHubProducerClient from a connection string.

:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword credential: The credential object used for authentication which implements particular interface
of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential,
~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and
objects that implement `get_token(self, *scopes)` method.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:keyword str user_agent: The user agent that needs to be appended to the built in user agent string.
:keyword int retry_total: The total number of attempts to redo the failed operation when an error happened.
Default value is 3.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
"""
return super(EventHubProducerClient, cls).from_connection_string(conn_str, **kwargs)

async def send(self, event_data,
*, partition_key: Union[str, bytes] = None, partition_id: str = None, timeout: float = None) -> None:
"""Sends event data and blocks until acknowledgement is received or operation times out.
Expand Down
7 changes: 4 additions & 3 deletions sdk/eventhub/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ jobs:
AZURE_STORAGE_ACCOUNT: $(python-eh-livetest-event-hub-storage-account)
AZURE_STORAGE_ACCESS_KEY: $(python-eh-livetest-event-hub-storage-access-key)
AZURE_STORAGE_CONN_STR: $(python-eh-livetest-event-hub-storage-conn-str)
EVENT_HUB_CONN_STR: $(python-eh-livetest-event-hub-conn-str)
EVENT_HUB_HOSTNAME: $(python-eh-livetest-event-hub-hostname)
EVENT_HUB_NAME: $(python-eh-livetest-event-hub-name)
EVENT_HUB_SAS_POLICY: $(python-eh-livetest-event-hub-sas-policy)
EVENT_HUB_SAS_KEY: $(python-eh-livetest-event-hub-sas-key)
EVENT_HUB_NAMESPACE: $(python-eh-livetest-event-hub-namespace)
IOTHUB_CONNECTION_STR: $(python-eh-livetest-event-hub-iothub-connection-str)
IOTHUB_DEVICE: $(python-eh-livetest-event-hub-iothub-device)
AAD_CLIENT_ID: $(python-eh-livetest-event-hub-aad-client-id)
AAD_TENANT_ID: $(python-eh-livetest-event-hub-aad-tenant-id)
AAD_SECRET: $(python-eh-livetest-event-hub-aad-secret)
AZURE_CLIENT_ID: $(python-eh-livetest-event-hub-aad-client-id)
AZURE_TENANT_ID: $(python-eh-livetest-event-hub-aad-tenant-id)
AZURE_CLIENT_SECRET: $(python-eh-livetest-event-hub-aad-secret)