Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cb13bd8
Increment version
Feb 14, 2020
38e9f65
Update Development Status
Feb 14, 2020
aa30bc0
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 15, 2020
c4710ea
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 20, 2020
053f073
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 24, 2020
f0697f7
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 6, 2020
c83241a
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 6, 2020
18c6fba
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 6, 2020
607f134
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 9, 2020
88fd7d1
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 9, 2020
fcbaf65
Remove typing.Deque for Py3.5.3
Mar 9, 2020
f52e2d9
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 10, 2020
8aaaf1e
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 11, 2020
a12351c
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 25, 2020
cc9dbb9
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 27, 2020
932336a
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 30, 2020
2742ca0
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 31, 2020
72ab463
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 2, 2020
213bb9d
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 3, 2020
b312d19
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 6, 2020
0638b9a
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 6, 2020
c7bd430
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 7, 2020
de47f42
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 7, 2020
2fcc005
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 15, 2020
cfa9d81
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 20, 2020
ac224c1
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 23, 2020
1305495
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 27, 2020
2183d32
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 29, 2020
e233d8c
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 29, 2020
fa41059
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 30, 2020
16e7e8b
Add batch support in stress test script
Apr 30, 2020
ba4e788
Allow receive() to have max_wait_time
Apr 30, 2020
227d6b1
Update sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_str…
YijunXieMS Apr 30, 2020
490b398
Fix bug
Apr 30, 2020
aa114c9
Fix bug
Apr 30, 2020
0d8dda5
Update sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_str…
YijunXieMS Apr 30, 2020
fc4b477
Update sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_str…
YijunXieMS Apr 30, 2020
cad6a3c
Log all received event count per iteration
Apr 30, 2020
c3e475d
bug fix
May 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def parse_starting_position(args):
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss", type=str)
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
parser.add_argument("--max_batch_size", type=int, default=0,
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
parser.add_argument("--max_wait_time", type=float, default=0,
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")
parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
parser.add_argument("--conn_str", help="EventHub connection string",
Expand Down Expand Up @@ -82,6 +86,7 @@ def parse_starting_position(args):

start_time = time.perf_counter()
recv_cnt_map = defaultdict(int)
recv_cnt_iteration_map = defaultdict(int)
recv_time_map = dict()


Expand All @@ -94,7 +99,7 @@ async def get_partition_ids(self):


async def on_event_received(partition_context, event):
recv_cnt_map[partition_context.partition_id] += 1
recv_cnt_map[partition_context.partition_id] += 1 if event else 0
if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0:
total_time_elapsed = time.perf_counter() - start_time

Expand All @@ -111,6 +116,26 @@ async def on_event_received(partition_context, event):
await partition_context.update_checkpoint(event)


async def on_event_batch_received(partition_context, event_batch):
recv_cnt_map[partition_context.partition_id] += len(event_batch)
recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch)
if recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT:
total_time_elapsed = time.perf_counter() - start_time

partition_previous_time = recv_time_map.get(partition_context.partition_id)
partition_current_time = time.perf_counter()
recv_time_map[partition_context.partition_id] = partition_current_time
LOGGER.info("Partition: %r, Total received: %r, Time elapsed: %r, Speed since start: %r/s, Current speed: %r/s",
partition_context.partition_id,
recv_cnt_map[partition_context.partition_id],
total_time_elapsed,
recv_cnt_map[partition_context.partition_id] / total_time_elapsed,
recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None
)
recv_cnt_iteration_map[partition_context.partition_id] = 0
await partition_context.update_checkpoint()


def create_client(args):

if args.storage_conn_str:
Expand Down Expand Up @@ -180,11 +205,18 @@ async def run(args):
"track_last_enqueued_event_properties": args.track_last_enqueued_event_properties,
"starting_position": starting_position
}
if args.max_batch_size:
kwargs_dict["max_batch_size"] = args.max_batch_size
if args.max_wait_time:
kwargs_dict["max_wait_time"] = args.max_wait_time
if args.parallel_recv_cnt and args.parallel_recv_cnt > 1:
clients = [create_client(args) for _ in range(args.parallel_recv_cnt)]
tasks = [
asyncio.ensure_future(
clients[i].receive(
clients[i].receive_batch(
on_event_batch_received,
**kwargs_dict
) if args.max_batch_size else clients[i].receive(
on_event_received,
**kwargs_dict
)
Expand All @@ -193,9 +225,12 @@ async def run(args):
else:
clients = [create_client(args)]
tasks = [asyncio.ensure_future(
clients[0].receive(
clients[0].receive_batch(
on_event_batch_received,
**kwargs_dict
) if args.max_batch_size else clients[0].receive(
on_event_received,
prefetch=args.link_credit,
**kwargs_dict
)
)]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def parse_starting_position(args):
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
parser.add_argument("--max_batch_size", type=int, default=0,
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
parser.add_argument("--max_wait_time", type=float, default=0,
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")

parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
parser.add_argument("--conn_str", help="EventHub connection string",
Expand Down Expand Up @@ -80,6 +85,7 @@ def parse_starting_position(args):

start_time = time.perf_counter()
recv_cnt_map = defaultdict(int)
recv_cnt_iteration_map = defaultdict(int)
recv_time_map = dict()


Expand All @@ -92,7 +98,7 @@ def get_partition_ids(self):


def on_event_received(partition_context, event):
recv_cnt_map[partition_context.partition_id] += 1
recv_cnt_map[partition_context.partition_id] += 1 if event else 0
if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0:
total_time_elapsed = time.perf_counter() - start_time

Expand All @@ -109,6 +115,25 @@ def on_event_received(partition_context, event):
partition_context.update_checkpoint(event)


def on_event_batch_received(partition_context, event_batch):
recv_cnt_map[partition_context.partition_id] += len(event_batch)
recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch)
if recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT:
total_time_elapsed = time.perf_counter() - start_time
partition_previous_time = recv_time_map.get(partition_context.partition_id)
partition_current_time = time.perf_counter()
recv_time_map[partition_context.partition_id] = partition_current_time
LOGGER.info("Partition: %r, Total received: %r, Time elapsed: %r, Speed since start: %r/s, Current speed: %r/s",
partition_context.partition_id,
recv_cnt_map[partition_context.partition_id],
total_time_elapsed,
recv_cnt_map[partition_context.partition_id] / total_time_elapsed,
recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None
)
recv_cnt_iteration_map[partition_context.partition_id] = 0
partition_context.update_checkpoint()


def create_client(args):
if args.storage_conn_str:
checkpoint_store = BlobCheckpointStore.from_connection_string(args.storage_conn_str, args.storage_container_name)
Expand Down Expand Up @@ -176,21 +201,25 @@ def run(args):
"track_last_enqueued_event_properties": args.track_last_enqueued_event_properties,
"starting_position": starting_position
}
if args.max_batch_size:
kwargs_dict["max_batch_size"] = args.max_batch_size
if args.max_wait_time:
kwargs_dict["max_wait_time"] = args.max_wait_time
if args.parallel_recv_cnt and args.parallel_recv_cnt > 1:
clients = [create_client(args) for _ in range(args.parallel_recv_cnt)]
threads = [
threading.Thread(
target=clients[i].receive,
args=(on_event_received,),
target=clients[i].receive_batch if args.max_batch_size else clients[i].receive,
args=(on_event_batch_received if args.max_batch_size else on_event_received,),
kwargs=kwargs_dict,
daemon=True
) for i in range(args.parallel_recv_cnt)
]
else:
clients = [create_client(args)]
threads = [threading.Thread(
target=clients[0].receive,
args=(on_event_received,),
target=clients[0].receive_batch if args.max_batch_size else clients[0].receive,
args=(on_event_batch_received if args.max_batch_size else on_event_received,),
kwargs=kwargs_dict,
daemon=True
)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def stress_send_sync(producer: EventHubProducerClient, args, logger):
return len(batch)


def stress_send_list_sync(producer: EventHubProducerClient, args, logger):
quantity = int(256*1023 / args.payload)
send_list = []
for _ in range(quantity):
send_list.append(EventData(body=b"D" * args.payload))
producer.send_batch(send_list)
return len(send_list)


async def stress_send_async(producer: EventHubProducerClientAsync, args, logger):
batch = await producer.create_batch()
try:
Expand All @@ -39,6 +48,15 @@ async def stress_send_async(producer: EventHubProducerClientAsync, args, logger)
return len(batch)


async def stress_send_list_async(producer: EventHubProducerClientAsync, args, logger):
quantity = int(256*1023 / args.payload)
send_list = []
for _ in range(quantity):
send_list.append(EventData(body=b"D" * args.payload))
await producer.send_batch(send_list)
return len(send_list)


class StressTestRunner(object):
def __init__(self, argument_parser):
self.argument_parser = argument_parser
Expand Down