diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py index c6220e7c04c4..276f783ffc43 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py @@ -96,6 +96,39 @@ def _stop_eventprocessor(cls, event_processor): elif (consumer_group, '-1') in eventhub_client._event_processors: del eventhub_client._event_processors[(consumer_group, "-1")] + @classmethod + def from_connection_string(cls, conn_str, **kwargs): + """ + Create an EventHubConsumerClient from a connection string. + + :param str conn_str: The connection string of an eventhub. + :keyword str event_hub_path: The path of the specific Event Hub to connect the client to. + :keyword credential: The credential object used for authentication which implements particular interface + of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, + ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and + objects that implement `get_token(self, *scopes)` method. + :keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`. + :keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys - 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present - 'username', 'password'. + :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :keyword str user_agent: The user agent that needs to be appended to the built in user agent string. + :keyword int retry_total: The total number of attempts to redo the failed operation when an error happened. + Default value is 3. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.eventhub.TransportType + :keyword partition_manager: + stores the load balancing data and checkpoint data when receiving events + if partition_manager is specified. If it's None, this EventHubConsumerClient instance will receive + events without load balancing and checkpoint. + :paramtype partition_manager: Implementation classes of ~azure.eventhub.aio.PartitionManager + :keyword float load_balancing_interval: + When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10. + """ + return super(EventHubConsumerClient, cls).from_connection_string(conn_str, **kwargs) + def receive(self, on_events, consumer_group, **kwargs): # type: (Callable[[PartitionContext, List[EventData]], None], str, Any) -> None """Receive events from partition(s) optionally with load balancing and checkpointing. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py index fc4b13c45226..f4c5850eacd5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py @@ -71,6 +71,32 @@ def _init_locks_for_producers(self): for _ in range(num_of_producers): self._producers_locks.append(threading.Lock()) + @classmethod + def from_connection_string(cls, conn_str, **kwargs): + """ + Create an EventHubProducerClient from a connection string. + + :param str conn_str: The connection string of an eventhub. + :keyword str event_hub_path: The path of the specific Event Hub to connect the client to. + :keyword credential: The credential object used for authentication which implements particular interface + of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, + ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and + objects that implement `get_token(self, *scopes)` method. + :keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`. + :keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys - 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present - 'username', 'password'. + :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :keyword str user_agent: The user agent that needs to be appended to the built in user agent string. + :keyword int retry_total: The total number of attempts to redo the failed operation when an error happened. + Default value is 3. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.eventhub.TransportType + """ + return super(EventHubProducerClient, cls).from_connection_string(conn_str, **kwargs) + def send(self, event_data, **kwargs): # type: (Union[EventData, EventDataBatch, Iterable[EventData]], Any) -> None """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py index a9919c5228bc..fa1752aafe67 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py @@ -76,6 +76,39 @@ def __init__(self, host, event_hub_path, credential, **kwargs) -> None: self._event_processors = dict() # type: Dict[Tuple[str, str], EventProcessor] self._closed = False + @classmethod + def from_connection_string(cls, conn_str, **kwargs): + """ + Create an EventHubConsumerClient from a connection string. + + :param str conn_str: The connection string of an eventhub. + :keyword str event_hub_path: The path of the specific Event Hub to connect the client to. + :keyword credential: The credential object used for authentication which implements particular interface + of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, + ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and + objects that implement `get_token(self, *scopes)` method. + :keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`. + :keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys - 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present - 'username', 'password'. + :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :keyword str user_agent: The user agent that needs to be appended to the built in user agent string. + :keyword int retry_total: The total number of attempts to redo the failed operation when an error happened. + Default value is 3. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.eventhub.TransportType + :keyword partition_manager: + stores the load balancing data and checkpoint data when receiving events + if partition_manager is specified. If it's None, this EventHubConsumerClient instance will receive + events without load balancing and checkpoint. + :paramtype partition_manager: Implementation classes of ~azure.eventhub.aio.PartitionManager + :keyword float load_balancing_interval: + When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10. + """ + return super(EventHubConsumerClient, cls).from_connection_string(conn_str, **kwargs) + async def receive( self, on_events, consumer_group: str, *, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py index 0bbe5de92b4a..46c2cb2f5db5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py @@ -71,6 +71,32 @@ async def _init_locks_for_producers(self): self._producers_locks.append(asyncio.Lock()) # self._producers_locks = [asyncio.Lock()] * num_of_producers + @classmethod + def from_connection_string(cls, conn_str, **kwargs): + """ + Create an EventHubProducerClient from a connection string. + + :param str conn_str: The connection string of an eventhub. + :keyword str event_hub_path: The path of the specific Event Hub to connect the client to. + :keyword credential: The credential object used for authentication which implements particular interface + of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, + ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and + objects that implement `get_token(self, *scopes)` method. + :keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`. + :keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys - 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present - 'username', 'password'. + :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :keyword str user_agent: The user agent that needs to be appended to the built in user agent string. + :keyword int retry_total: The total number of attempts to redo the failed operation when an error happened. + Default value is 3. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.eventhub.TransportType + """ + return super(EventHubProducerClient, cls).from_connection_string(conn_str, **kwargs) + async def send(self, event_data, *, partition_key: Union[str, bytes] = None, partition_id: str = None, timeout: float = None) -> None: """Sends event data and blocks until acknowledgement is received or operation times out. diff --git a/sdk/eventhub/tests.yml b/sdk/eventhub/tests.yml index 7eebea067f9a..f383a2a5f512 100644 --- a/sdk/eventhub/tests.yml +++ b/sdk/eventhub/tests.yml @@ -28,6 +28,7 @@ jobs: AZURE_STORAGE_ACCOUNT: $(python-eh-livetest-event-hub-storage-account) AZURE_STORAGE_ACCESS_KEY: $(python-eh-livetest-event-hub-storage-access-key) AZURE_STORAGE_CONN_STR: $(python-eh-livetest-event-hub-storage-conn-str) + EVENT_HUB_CONN_STR: $(python-eh-livetest-event-hub-conn-str) EVENT_HUB_HOSTNAME: $(python-eh-livetest-event-hub-hostname) EVENT_HUB_NAME: $(python-eh-livetest-event-hub-name) EVENT_HUB_SAS_POLICY: $(python-eh-livetest-event-hub-sas-policy) @@ -35,6 +36,6 @@ jobs: EVENT_HUB_NAMESPACE: $(python-eh-livetest-event-hub-namespace) IOTHUB_CONNECTION_STR: $(python-eh-livetest-event-hub-iothub-connection-str) IOTHUB_DEVICE: $(python-eh-livetest-event-hub-iothub-device) - AAD_CLIENT_ID: $(python-eh-livetest-event-hub-aad-client-id) - AAD_TENANT_ID: $(python-eh-livetest-event-hub-aad-tenant-id) - AAD_SECRET: $(python-eh-livetest-event-hub-aad-secret) + AZURE_CLIENT_ID: $(python-eh-livetest-event-hub-aad-client-id) + AZURE_TENANT_ID: $(python-eh-livetest-event-hub-aad-tenant-id) + AZURE_CLIENT_SECRET: $(python-eh-livetest-event-hub-aad-secret)