diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index 394a4e385ec1..087157bf9cc1 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -47,6 +47,10 @@ def parse_starting_position(args): parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss", type=str) parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0) parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int) +parser.add_argument("--max_batch_size", type=int, default=0, + help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()") +parser.add_argument("--max_wait_time", type=float, default=0, + help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()") parser.add_argument("--track_last_enqueued_event_properties", action="store_true") parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10) parser.add_argument("--conn_str", help="EventHub connection string", @@ -82,6 +86,7 @@ def parse_starting_position(args): start_time = time.perf_counter() recv_cnt_map = defaultdict(int) +recv_cnt_iteration_map = defaultdict(int) recv_time_map = dict() @@ -94,7 +99,7 @@ async def get_partition_ids(self): async def on_event_received(partition_context, event): - recv_cnt_map[partition_context.partition_id] += 1 + recv_cnt_map[partition_context.partition_id] += 1 if event else 0 if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: total_time_elapsed = time.perf_counter() - start_time @@ -111,6 +116,26 @@ async def on_event_received(partition_context, event): await partition_context.update_checkpoint(event) +async def on_event_batch_received(partition_context, event_batch): + recv_cnt_map[partition_context.partition_id] += len(event_batch) + recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch) + if recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: + total_time_elapsed = time.perf_counter() - start_time + + partition_previous_time = recv_time_map.get(partition_context.partition_id) + partition_current_time = time.perf_counter() + recv_time_map[partition_context.partition_id] = partition_current_time + LOGGER.info("Partition: %r, Total received: %r, Time elapsed: %r, Speed since start: %r/s, Current speed: %r/s", + partition_context.partition_id, + recv_cnt_map[partition_context.partition_id], + total_time_elapsed, + recv_cnt_map[partition_context.partition_id] / total_time_elapsed, + recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None + ) + recv_cnt_iteration_map[partition_context.partition_id] = 0 + await partition_context.update_checkpoint() + + def create_client(args): if args.storage_conn_str: @@ -180,11 +205,18 @@ async def run(args): "track_last_enqueued_event_properties": args.track_last_enqueued_event_properties, "starting_position": starting_position } + if args.max_batch_size: + kwargs_dict["max_batch_size"] = args.max_batch_size + if args.max_wait_time: + kwargs_dict["max_wait_time"] = args.max_wait_time if args.parallel_recv_cnt and args.parallel_recv_cnt > 1: clients = [create_client(args) for _ in range(args.parallel_recv_cnt)] tasks = [ asyncio.ensure_future( - clients[i].receive( + clients[i].receive_batch( + on_event_batch_received, + **kwargs_dict + ) if args.max_batch_size else clients[i].receive( on_event_received, **kwargs_dict ) @@ -193,9 +225,12 @@ async def run(args): else: clients = [create_client(args)] tasks = [asyncio.ensure_future( - clients[0].receive( + clients[0].receive_batch( + on_event_batch_received, + **kwargs_dict + ) if args.max_batch_size else clients[0].receive( on_event_received, - prefetch=args.link_credit, + **kwargs_dict ) )] diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index bf377b16d11a..09a487206e33 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -45,6 +45,11 @@ def parse_starting_position(args): parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss") parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0) parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int) +parser.add_argument("--max_batch_size", type=int, default=0, + help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()") +parser.add_argument("--max_wait_time", type=float, default=0, + help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()") + parser.add_argument("--track_last_enqueued_event_properties", action="store_true") parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10) parser.add_argument("--conn_str", help="EventHub connection string", @@ -80,6 +85,7 @@ def parse_starting_position(args): start_time = time.perf_counter() recv_cnt_map = defaultdict(int) +recv_cnt_iteration_map = defaultdict(int) recv_time_map = dict() @@ -92,7 +98,7 @@ def get_partition_ids(self): def on_event_received(partition_context, event): - recv_cnt_map[partition_context.partition_id] += 1 + recv_cnt_map[partition_context.partition_id] += 1 if event else 0 if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: total_time_elapsed = time.perf_counter() - start_time @@ -109,6 +115,25 @@ def on_event_received(partition_context, event): partition_context.update_checkpoint(event) +def on_event_batch_received(partition_context, event_batch): + recv_cnt_map[partition_context.partition_id] += len(event_batch) + recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch) + if recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: + total_time_elapsed = time.perf_counter() - start_time + partition_previous_time = recv_time_map.get(partition_context.partition_id) + partition_current_time = time.perf_counter() + recv_time_map[partition_context.partition_id] = partition_current_time + LOGGER.info("Partition: %r, Total received: %r, Time elapsed: %r, Speed since start: %r/s, Current speed: %r/s", + partition_context.partition_id, + recv_cnt_map[partition_context.partition_id], + total_time_elapsed, + recv_cnt_map[partition_context.partition_id] / total_time_elapsed, + recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None + ) + recv_cnt_iteration_map[partition_context.partition_id] = 0 + partition_context.update_checkpoint() + + def create_client(args): if args.storage_conn_str: checkpoint_store = BlobCheckpointStore.from_connection_string(args.storage_conn_str, args.storage_container_name) @@ -176,12 +201,16 @@ def run(args): "track_last_enqueued_event_properties": args.track_last_enqueued_event_properties, "starting_position": starting_position } + if args.max_batch_size: + kwargs_dict["max_batch_size"] = args.max_batch_size + if args.max_wait_time: + kwargs_dict["max_wait_time"] = args.max_wait_time if args.parallel_recv_cnt and args.parallel_recv_cnt > 1: clients = [create_client(args) for _ in range(args.parallel_recv_cnt)] threads = [ threading.Thread( - target=clients[i].receive, - args=(on_event_received,), + target=clients[i].receive_batch if args.max_batch_size else clients[i].receive, + args=(on_event_batch_received if args.max_batch_size else on_event_received,), kwargs=kwargs_dict, daemon=True ) for i in range(args.parallel_recv_cnt) @@ -189,8 +218,8 @@ def run(args): else: clients = [create_client(args)] threads = [threading.Thread( - target=clients[0].receive, - args=(on_event_received,), + target=clients[0].receive_batch if args.max_batch_size else clients[0].receive, + args=(on_event_batch_received if args.max_batch_size else on_event_received,), kwargs=kwargs_dict, daemon=True )] diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py index 99b3188ca11a..3ff550a31891 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py @@ -28,6 +28,15 @@ def stress_send_sync(producer: EventHubProducerClient, args, logger): return len(batch) +def stress_send_list_sync(producer: EventHubProducerClient, args, logger): + quantity = int(256*1023 / args.payload) + send_list = [] + for _ in range(quantity): + send_list.append(EventData(body=b"D" * args.payload)) + producer.send_batch(send_list) + return len(send_list) + + async def stress_send_async(producer: EventHubProducerClientAsync, args, logger): batch = await producer.create_batch() try: @@ -39,6 +48,15 @@ async def stress_send_async(producer: EventHubProducerClientAsync, args, logger) return len(batch) +async def stress_send_list_async(producer: EventHubProducerClientAsync, args, logger): + quantity = int(256*1023 / args.payload) + send_list = [] + for _ in range(quantity): + send_list.append(EventData(body=b"D" * args.payload)) + await producer.send_batch(send_list) + return len(send_list) + + class StressTestRunner(object): def __init__(self, argument_parser): self.argument_parser = argument_parser