diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py index ec717e7ec06e..781e24248792 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py @@ -55,7 +55,9 @@ def __init__(self, message_size = 10, idle_timeout = 10, send_delay = .01, - receive_delay = 0): + receive_delay = 0, + should_complete_messages = True, + max_batch_size = 1): self.senders = senders self.receivers = receivers self.duration=duration @@ -65,6 +67,8 @@ def __init__(self, self.idle_timeout = idle_timeout self.send_delay = send_delay self.receive_delay = receive_delay + self.should_complete_messages = should_complete_messages + self.max_batch_size = max_batch_size # Because of pickle we need to create a state object and not just pass around ourselves. # If we ever require multiple runs of this one after another, just make Run() reset this. @@ -144,14 +148,15 @@ def _Receive(self, receiver, end_time): with receiver: while end_time > datetime.utcnow(): if self.receive_type == ReceiveType.pull: - batch = receiver.receive_messages() + batch = receiver.receive_messages(max_batch_size=self.max_batch_size) elif self.receive_type == ReceiveType.push: batch = receiver for message in batch: self.OnReceive(self._state, message) try: - message.complete() + if self.should_complete_messages: + message.complete() except MessageAlreadySettled: # It may have been settled in the plugin callback. pass self._state.total_received += 1 diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py index befd1e7d89d8..54200c4773ae 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py @@ -111,6 +111,46 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s receivers = [sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete)], duration=timedelta(seconds=60)) + result = stress_test.Run() + assert(result.total_sent > 0) + assert(result.total_received > 0) + + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @ServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest') + def test_stress_queue_unsettled_messages(self, servicebus_namespace_connection_string, servicebus_queue): + sb_client = ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, debug=False) + + stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)], + receivers = [sb_client.get_queue_receiver(servicebus_queue.name)], + duration = timedelta(seconds=350), + should_complete_messages = False) + + result = stress_test.Run() + # This test is prompted by reports of an issue where enough unsettled messages saturate a service-side cache + # and prevent further receipt. + assert(result.total_sent > 2500) + assert(result.total_received > 2500) + + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @ServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest') + def test_stress_queue_receive_large_batch_size(self, servicebus_namespace_connection_string, servicebus_queue): + sb_client = ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, debug=False) + + stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)], + receivers = [sb_client.get_queue_receiver(servicebus_queue.name, prefetch=50)], + duration = timedelta(seconds=60), + max_batch_size = 50) + result = stress_test.Run() assert(result.total_sent > 0) assert(result.total_received > 0) \ No newline at end of file