Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4540b01
Update EventHub to enable live testing in sovereign clouds for multip…
v-xuto Nov 11, 2021
36a4833
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj May 26, 2022
605c896
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Jun 27, 2022
60d3b21
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Jun 30, 2022
972e07e
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Jul 4, 2022
fc9b304
add the sleep time to stable the test
zedy-wj Jul 4, 2022
72600e5
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Jul 25, 2022
7ed7936
fix conflict
zedy-wj Aug 29, 2022
5f1c61a
fix issue
zedy-wj Sep 2, 2022
02a52c7
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Sep 22, 2022
e739b21
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Oct 24, 2022
83e0af7
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Oct 28, 2022
698f37b
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Nov 29, 2022
f472bfc
fix conflict
zedy-wj Dec 28, 2022
2000e1f
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Jan 18, 2023
b3afb2d
fix conflict
zedy-wj Jan 20, 2023
c05ca45
fix conflict
zedy-wj Jan 20, 2023
158f98e
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Jan 28, 2023
facb63d
Remove 'AZURE_COSMOS_CONN_STR'
v-xuto Jan 28, 2023
ac5cbc3
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Feb 2, 2023
52648ff
Merge remote-tracking branch 'upstream/main' into eventhub-sov-test
zedy-wj Mar 1, 2023
a77e4eb
fix flaky tests
swathipil Mar 6, 2023
fc99bf2
fix more flaky tests
swathipil Mar 7, 2023
897dc33
update client_invalid_cred test
swathipil Mar 7, 2023
88709b3
ignore write timeout samples
swathipil Mar 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/devops_tasks/test_run_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@
"sample_publish_events_to_a_topic_using_sas_credential_async.py"
],
"azure-eventhub": [
"client_identity_authentication.py", # TODO: remove after fixing issue #29177
"connection_to_custom_endpoint_address.py",
"proxy.py",
"connection_to_custom_endpoint_address_async.py",
"iot_hub_connection_string_receive_async.py",
"proxy_async.py",
"send_stream.py", # TODO: remove after fixing issue #29177
],
"azure-eventhub-checkpointstoretable": ["receive_events_using_checkpoint_store.py"],
"azure-servicebus": [
Expand Down
15 changes: 11 additions & 4 deletions sdk/eventhub/azure-eventhub/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def resource_group():
except KeyError:
pytest.skip('AZURE_SUBSCRIPTION_ID undefined')
return
resource_client = ResourceManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID)
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
credential_scopes = ["{}.default".format(base_url)]
resource_client = ResourceManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
resource_group_name = RES_GROUP_PREFIX + str(uuid.uuid4())
parameters = {"location": LOCATION}
expiry = datetime.datetime.utcnow() + datetime.timedelta(days=1)
Expand All @@ -122,7 +124,9 @@ def eventhub_namespace(resource_group):
except KeyError:
pytest.skip('AZURE_SUBSCRIPTION_ID defined')
return
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID)
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
credential_scopes = ["{}.default".format(base_url)]
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
namespace_name = NAMESPACE_PREFIX + str(uuid.uuid4())
try:
namespace = resource_client.namespaces.begin_create_or_update(
Expand All @@ -147,16 +151,19 @@ def live_eventhub(resource_group, eventhub_namespace): # pylint: disable=redefi
except KeyError:
pytest.skip('AZURE_SUBSCRIPTION_ID defined')
return
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID)
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
credential_scopes = ["{}.default".format(base_url)]
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
eventhub_name = EVENTHUB_PREFIX + str(uuid.uuid4())
eventhub_ns_name, connection_string, key_name, primary_key = eventhub_namespace
eventhub_endpoint_suffix = os.environ.get("EVENT_HUB_ENDPOINT_SUFFIX", ".servicebus.windows.net")
try:
eventhub = resource_client.event_hubs.create_or_update(
resource_group.name, eventhub_ns_name, eventhub_name, {"partition_count": PARTITION_COUNT}
)
live_eventhub_config = {
'resource_group': resource_group.name,
'hostname': "{}.servicebus.windows.net".format(eventhub_ns_name),
'hostname': "{}{}".format(eventhub_ns_name, eventhub_endpoint_suffix),
'key_name': key_name,
'access_key': primary_key,
'namespace': eventhub_ns_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ async def test_client_secret_credential_async(live_eventhub, uamqp_transport):
eventhub_name=live_eventhub['event_hub'],
credential=credential,
user_agent='customized information',
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport
)
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
user_agent='customized information',
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport
)

Expand All @@ -46,7 +46,7 @@ async def on_event(partition_context, event):
on_event.called = False
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event, partition_id='0', starting_position='-1'))
await asyncio.sleep(13)
await asyncio.sleep(15)
await task
assert on_event.called is True
assert on_event.partition_id == "0"
Expand Down Expand Up @@ -108,7 +108,7 @@ async def test_client_azure_sas_credential_async(live_eventhub, uamqp_transport)
token = (await credential.get_token(auth_uri)).token
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
auth_timeout=3,
auth_timeout=30,
credential=AzureSasCredential(token), uamqp_transport=uamqp_transport)

async with producer_client:
Expand All @@ -128,7 +128,7 @@ async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_tran
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
auth_timeout=3,
auth_timeout=30,
user_agent='customized information', uamqp_transport=uamqp_transport)

assert (await consumer_client.get_eventhub_properties()) is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ async def on_error(events, pid, err):
await asyncio.sleep(5)
assert not sent_events
await asyncio.sleep(20)

assert sum([len(sent_events[pid]) for pid in partitions]) == 1

assert not on_error.err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def on_event_batch(partition_context, event_batch):


@pytest.mark.parametrize("max_wait_time, sleep_time, expected_result",
[(3, 10, []),
[(3, 15, []),
(3, 2, None),
])
@pytest.mark.liveTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,29 +310,31 @@ async def on_error(partition_context, error):
token = (await credential.get_token(auth_uri)).token
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token[:-1], time.time() + 5),
credential=EventHubSASTokenCredential(token, time.time() + 5),
uamqp_transport=uamqp_transport)
await asyncio.sleep(10)
await asyncio.sleep(15)
# expired credential
async with producer_client:
with pytest.raises(AuthenticationError):
await producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task
# TODO: expired credential AuthenticationError not raised for east-asia/China regions
if 'servicebus.windows.net' in live_eventhub['hostname']:
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task

# expired credential
assert isinstance(on_error.err, AuthenticationError)
# expired credential
assert isinstance(on_error.err, AuthenticationError)

credential = EventHubSharedKeyCredential('fakekey', live_eventhub['access_key'])
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def on_error(partition_context, error):
for i in range(5):
ed = EventData("Event Number {}".format(i))
senders[0].send(ed)
await asyncio.sleep(10)
await asyncio.sleep(20)
await task1
await task2
assert isinstance(on_error.error, EventHubError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def on_event(partition_context, event):
"starting_position": '-1'
})
worker.start()
time.sleep(13)
time.sleep(15)

worker.join()
assert on_event.called is True
Expand Down Expand Up @@ -112,7 +112,7 @@ def test_client_azure_sas_credential(live_eventhub, uamqp_transport):
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=AzureSasCredential(token),
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport)

with producer_client:
Expand All @@ -131,7 +131,7 @@ def test_client_azure_named_key_credential(live_eventhub, uamqp_transport):
consumer_group='$default',
credential=credential,
user_agent='customized information',
auth_timeout=3,
auth_timeout=30,
uamqp_transport=uamqp_transport)

assert consumer_client.get_eventhub_properties() is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def on_event_batch(partition_context, event_batch):


@pytest.mark.parametrize("max_wait_time, sleep_time, expected_result",
[(3, 10, []),
[(3, 15, []),
(3, 2, None)])
def test_receive_batch_empty_with_max_wait_time(uamqp_transport, connection_str, max_wait_time, sleep_time, expected_result):
'''Test whether event handler is called when max_wait_time > 0 and no event is received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ def on_error(partition_context, error):
producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
with consumer_client:
thread = threading.Thread(target=consumer_client.receive, args=(on_event,),
kwargs={"starting_position": "-1", "on_error": on_error})
kwargs={"starting_position": "-1", "on_error": on_error})
thread.daemon = True
thread.start()
time.sleep(15)
Expand Down Expand Up @@ -330,7 +330,11 @@ def on_error(partition_context, error):
uamqp_transport=uamqp_transport)

with producer_client:
with pytest.raises(AuthenticationError):
errors = (AuthenticationError)
# TODO: flaky TimeoutError during connect for China region
if 'servicebus.windows.net' not in live_eventhub['hostname']:
errors = (AuthenticationError, ConnectError)
with pytest.raises(errors):
producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def on_event(partition_context, event):
"track_last_enqueued_event_properties": True})
thread.daemon = True
thread.start()
time.sleep(10)
time.sleep(30)
assert on_event.event_position is not None
thread.join()
senders[0].send(EventData(expected_result))
Expand All @@ -105,7 +105,7 @@ def on_event(partition_context, event):
"track_last_enqueued_event_properties": True})
thread.daemon = True
thread.start()
time.sleep(15)
time.sleep(30)
assert on_event.event.body_as_str() == expected_result

thread.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ def test_send_and_receive_large_body_size(connstr_receivers, uamqp_transport, ti
if sys.platform.startswith('darwin'):
pytest.skip("Skipping on OSX - open issue regarding message size")
connection_str, receivers = connstr_receivers

# TODO: sending large batch to China cloud results in write timeout for pyamqp
# https://github.com/Azure/azure-sdk-for-python/issues/29177
if not uamqp_transport and 'servicebus.windows.net' not in connection_str:
pytest.skip("Skipping for pyamqp - open issue regarding write timeout")

client = EventHubProducerClient.from_connection_string(connection_str, uamqp_transport=uamqp_transport)
with client:
payload = 250 * 1024
Expand All @@ -104,7 +110,7 @@ def test_send_and_receive_large_body_size(connstr_receivers, uamqp_transport, ti
client.send_event(EventData("A" * payload))

received = []
timeout = 10 * timeout_factor
timeout = 20 * timeout_factor
for r in receivers:
received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=timeout)])

Expand Down
17 changes: 16 additions & 1 deletion sdk/eventhub/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
"description": "The subscription ID to which the application and resources belong."
}
},
"serviceBusEndpointSuffix": {
"type": "string",
"defaultValue": ".servicebus.windows.net",
"metadata": {
"description": "The url suffix to use when creating eventhubs connection strings."
}
},
"tenantId": {
"type": "string",
"defaultValue": "[subscription().tenantId]",
Expand Down Expand Up @@ -180,7 +187,7 @@
},
"EVENT_HUB_HOSTNAME": {
"type": "string",
"value": "[concat(variables('eventHubsNamespace'), '.servicebus.windows.net')]"
"value": "[concat(variables('eventHubsNamespace'), parameters('serviceBusEndpointSuffix'))]"
},
"EVENT_HUB_CONN_STR": {
"type": "string",
Expand Down Expand Up @@ -213,6 +220,14 @@
"AZURE_TABLES_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
},
"RESOURCE_REGION": {
"type": "string",
"value": "[parameters('location')]"
},
"EVENT_HUB_ENDPOINT_SUFFIX": {
"type": "string",
"value": "[parameters('serviceBusEndpointSuffix')]"
}
}
}
23 changes: 13 additions & 10 deletions sdk/eventhub/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ stages:
ServiceDirectory: eventhub
TestTimeoutInMinutes: 240
BuildTargetingString: azure-eventhub*
SupportedClouds: 'Public,UsGov,China'
CloudConfig:
Public:
SubscriptionConfiguration: $(sub-config-azure-cloud-test-resources)
UsGov:
SubscriptionConfiguration: $(sub-config-gov-test-resources)
China:
SubscriptionConfiguration: $(sub-config-cn-test-resources)
Location: 'chinanorth2'
MatrixReplace:
- TestSamples=.*/true
MatrixFilters:
Expand All @@ -14,15 +23,9 @@ stages:
AZURE_STORAGE_DATA_LAKE_ENABLED_CONN_STR: $(python-eh-livetest-event-hub-storage-data-lake-enabled-conn-str)
IOTHUB_CONNECTION_STR: $(python-eh-livetest-event-hub-iothub-connection-str)
IOTHUB_DEVICE: $(python-eh-livetest-event-hub-iothub-device)
AZURE_CLIENT_ID: $(python-eh-livetest-event-hub-aad-client-id)
AZURE_TENANT_ID: $(python-eh-livetest-event-hub-aad-tenant-id)
AZURE_CLIENT_SECRET: $(python-eh-livetest-event-hub-aad-secret)
AZURE_SUBSCRIPTION_ID: $(python-eh-livetest-event-hub-subscription-id)
AZURE_CLIENT_ID: $(EVENTHUB_CLIENT_ID)
AZURE_TENANT_ID: $(EVENTHUB_TENANT_ID)
AZURE_CLIENT_SECRET: $(EVENTHUB_CLIENT_SECRET)
AZURE_SUBSCRIPTION_ID: $(EVENTHUB_SUBSCRIPTION_ID)
AZURE_COSMOS_CONN_STR: $(python-eventhub-livetest-cosmos-conn-str)
Clouds: 'Public,Canary'
CloudConfig:
Public:
SubscriptionConfiguration: $(sub-config-azure-cloud-test-resources)
Canary:
SubscriptionConfiguration: $(sub-config-azure-cloud-test-resources)
Location: 'eastus2euap'