Skip to content

Commit ad30c7e

Browse files
yunhaolingbenbp
authored andcommitted
[EventHubs] Stress test refactor (#20389)
* init commit for stress * add dotenv dep * docker, helm, k8s * apply changes * run forever * add chaos * update chaos jos * Apply suggestions from code review Co-authored-by: Ben Broderick Phillips <[email protected]> * minor fixes * revert external targets Co-authored-by: Ben Broderick Phillips <[email protected]>
1 parent 4c2d086 commit ad30c7e

19 files changed

+415
-38
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
dependencies:
2+
- name: stress-test-addons
3+
repository: https://stresstestcharts.blob.core.windows.net/helm/
4+
version: 0.1.6
5+
digest: sha256:b97697ef5f303eec43e9a94fca8e312d20b8aed71318250499344aeca9880d31
6+
generated: "2021-08-24T11:24:15.375395-07:00"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
apiVersion: v2
2+
name: python-eventhubs-stress-test
3+
description: python event hubs stress test.
4+
version: 0.1.2
5+
appVersion: v0.2
6+
annotations:
7+
stressTest: 'true'
8+
namespace: python-eventhubs-stress-test-ns
9+
10+
dependencies:
11+
- name: stress-test-addons
12+
version: 0.1.6
13+
repository: https://stresstestcharts.blob.core.windows.net/helm/
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# syntax=docker/dockerfile:1
2+
3+
FROM python:3.8-slim-buster
4+
5+
WORKDIR /app
6+
7+
COPY ./scripts /app/stress/scripts
8+
9+
WORKDIR /app/stress/scripts
10+
RUN pip3 install -r dev_requirement.txt

sdk/eventhub/azure-eventhub/stress/dev_requirement.txt

Lines changed: 0 additions & 8 deletions
This file was deleted.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#",
3+
"contentVersion": "1.0.0.0",
4+
"parameters": { }
5+
}

sdk/eventhub/azure-eventhub/stress/app_insights_metric.py renamed to sdk/eventhub/azure-eventhub/stress/scripts/app_insights_metric.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5-
6-
import os
7-
85
from opencensus.ext.azure import metrics_exporter
96
from opencensus.stats import aggregation as aggregation_module
107
from opencensus.stats import measure as measure_module
@@ -26,7 +23,7 @@ def __init__(self, test_name, test_description=None):
2623
self.desc = test_description
2724

2825
events_measure_name = "The number of events handled by " + self.name
29-
events_measure_desc = "The number of events handled by " + self.desc if self.desc else None
26+
events_measure_desc = "The number of events handled by" + self.desc if self.desc else None
3027
memory_measure_name = "memory usage percentage for " + self.name
3128
memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None
3229
cpu_measure_name = "cpu usage percentage for " + self.name

sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py renamed to sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_async.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import logging
1212
from collections import defaultdict
1313
from functools import partial
14+
from dotenv import load_dotenv
1415

1516
from azure.identity.aio import ClientSecretCredential
1617
from azure.eventhub.aio import EventHubConsumerClient
@@ -22,6 +23,9 @@
2223
from process_monitor import ProcessMonitor
2324
from app_insights_metric import AzureMonitorMetric
2425

26+
ENV_FILE = os.environ.get('ENV_FILE')
27+
load_dotenv(dotenv_path=ENV_FILE, override=True)
28+
2529

2630
def parse_starting_position(args):
2731
starting_position = None
@@ -39,25 +43,26 @@ def parse_starting_position(args):
3943

4044

4145
parser = argparse.ArgumentParser()
42-
parser.add_argument("--link_credit", default=3000, type=int)
43-
parser.add_argument("--output_interval", type=float, default=1000)
44-
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
45-
parser.add_argument("--consumer_group", help="Consumer group name", default="$default")
46+
parser.add_argument("--link_credit", default=int(os.environ.get("LINK_CREDIT", 3000)), type=int)
47+
parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
48+
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
49+
parser.add_argument("--consumer_group", help="Consumer group name", default=os.environ.get("CONSUMER_GROUP", "$default"))
4650
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
47-
parser.add_argument("--starting_offset", help="Starting offset", type=str)
51+
parser.add_argument("--starting_offset", help="Starting offset", type=str, default=os.environ.get("STARTING_OFFSET", "-1"))
4852
parser.add_argument("--starting_sequence_number", help="Starting sequence number", type=int)
49-
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss", type=str)
53+
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
5054
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
5155
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
52-
parser.add_argument("--max_batch_size", type=int, default=0,
56+
parser.add_argument("--max_batch_size", type=int, default=int(os.environ.get("MAX_BATCH_SIZE", 0)),
5357
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
5458
parser.add_argument("--max_wait_time", type=float, default=0,
5559
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")
60+
5661
parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
5762
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
5863
parser.add_argument("--conn_str", help="EventHub connection string",
59-
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
60-
parser.add_argument("--eventhub", help="Name of EventHub")
64+
default=os.environ.get('EVENT_HUB_CONN_STR'))
65+
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
6166
parser.add_argument("--address", help="Address URI to the EventHub entity")
6267
parser.add_argument("--sas_policy", help="Name of the shared access policy to authenticate with")
6368
parser.add_argument("--sas_key", help="Shared access key")

sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py renamed to sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_sync.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import logging
1212
from collections import defaultdict
1313
from functools import partial
14+
from dotenv import load_dotenv
1415

1516
from azure.identity import ClientSecretCredential
1617
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
@@ -20,6 +21,9 @@
2021
from process_monitor import ProcessMonitor
2122
from app_insights_metric import AzureMonitorMetric
2223

24+
ENV_FILE = os.environ.get('ENV_FILE')
25+
load_dotenv(dotenv_path=ENV_FILE, override=True)
26+
2327

2428
def parse_starting_position(args):
2529
starting_position = None
@@ -37,26 +41,26 @@ def parse_starting_position(args):
3741

3842

3943
parser = argparse.ArgumentParser()
40-
parser.add_argument("--link_credit", default=3000, type=int)
41-
parser.add_argument("--output_interval", type=float, default=1000)
42-
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
43-
parser.add_argument("--consumer_group", help="Consumer group name", default="$default")
44+
parser.add_argument("--link_credit", default=int(os.environ.get("LINK_CREDIT", 3000)), type=int)
45+
parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
46+
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
47+
parser.add_argument("--consumer_group", help="Consumer group name", default=os.environ.get("CONSUMER_GROUP", "$default"))
4448
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
45-
parser.add_argument("--starting_offset", help="Starting offset", type=str)
49+
parser.add_argument("--starting_offset", help="Starting offset", type=str, default=os.environ.get("STARTING_OFFSET", "-1"))
4650
parser.add_argument("--starting_sequence_number", help="Starting sequence number", type=int)
4751
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
4852
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
4953
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
50-
parser.add_argument("--max_batch_size", type=int, default=0,
54+
parser.add_argument("--max_batch_size", type=int, default=int(os.environ.get("MAX_BATCH_SIZE", 0)),
5155
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
5256
parser.add_argument("--max_wait_time", type=float, default=0,
5357
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")
5458

5559
parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
5660
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
5761
parser.add_argument("--conn_str", help="EventHub connection string",
58-
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
59-
parser.add_argument("--eventhub", help="Name of EventHub")
62+
default=os.environ.get('EVENT_HUB_CONN_STR'))
63+
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
6064
parser.add_argument("--address", help="Address URI to the EventHub entity")
6165
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with")
6266
parser.add_argument("--sas-key", help="Shared access key")
@@ -82,7 +86,14 @@ def parse_starting_position(args):
8286

8387
args = parser.parse_args()
8488
starting_position = parse_starting_position(args)
85-
LOGGER = get_logger(args.log_filename, "stress_receive_sync", level=logging.INFO, print_console=args.print_console)
89+
print_console = args.print_console or (os.environ.get("PRINT_CONSOLE") == "1")
90+
91+
LOGGER = get_logger(
92+
args.log_filename,
93+
"stress_receive_sync",
94+
level=logging.INFO,
95+
print_console=print_console
96+
)
8697
LOG_PER_COUNT = args.output_interval
8798

8899
start_time = time.perf_counter()
@@ -105,7 +116,6 @@ def on_event_received(process_monitor, partition_context, event):
105116
recv_cnt_map[partition_context.partition_id] += 1 if event else 0
106117
if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0:
107118
total_time_elapsed = time.perf_counter() - start_time
108-
109119
partition_previous_time = recv_time_map.get(partition_context.partition_id)
110120
partition_current_time = time.perf_counter()
111121
recv_time_map[partition_context.partition_id] = partition_current_time
@@ -213,7 +223,11 @@ def create_client(args):
213223

214224
def run(args):
215225

216-
with ProcessMonitor("monitor_{}".format(args.log_filename), "consumer_stress_sync", print_console=args.print_console) as process_monitor:
226+
with ProcessMonitor(
227+
"monitor_{}".format(args.log_filename),
228+
"consumer_stress_sync",
229+
print_console=print_console
230+
) as process_monitor:
217231
kwargs_dict = {
218232
"prefetch": args.link_credit,
219233
"partition_id": str(args.recv_partition_id) if args.recv_partition_id else None,

sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py renamed to sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_producer_stress.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import time
1010
import asyncio
1111
from argparse import ArgumentParser
12+
from dotenv import load_dotenv
1213

1314
from azure.eventhub import EventHubProducerClient, EventData, EventHubSharedKeyCredential, TransportType
1415
from azure.eventhub.exceptions import EventHubError
@@ -20,6 +21,8 @@
2021
from process_monitor import ProcessMonitor
2122
from app_insights_metric import AzureMonitorMetric
2223

24+
ENV_FILE = os.environ.get('ENV_FILE')
25+
2326

2427
def handle_exception(error, ignore_send_failure, stress_logger, azure_monitor_metric):
2528
err_msg = "Sync send failed due to error: {}".format(repr(error))
@@ -89,9 +92,9 @@ async def stress_send_list_async(producer: EventHubProducerClientAsync, args, st
8992
class StressTestRunner(object):
9093
def __init__(self, argument_parser):
9194
self.argument_parser = argument_parser
92-
self.argument_parser.add_argument("-m", "--method", required=True)
93-
self.argument_parser.add_argument("--output_interval", type=float, default=1000)
94-
self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
95+
self.argument_parser.add_argument("-m", "--method", default="stress_send_list_sync")
96+
self.argument_parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
97+
self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
9598
self.argument_parser.add_argument(
9699
"--partitions",
97100
help="Number of partitions. 0 means to get partitions from eventhubs",
@@ -109,9 +112,9 @@ def __init__(self, argument_parser):
109112
type=str
110113
)
111114
self.argument_parser.add_argument("--conn_str", help="EventHub connection string",
112-
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
115+
default=os.environ.get('EVENT_HUB_CONN_STR'))
113116
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
114-
self.argument_parser.add_argument("--eventhub", help="Name of EventHub")
117+
self.argument_parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
115118
self.argument_parser.add_argument(
116119
"--transport_type",
117120
help="Transport type, 0 means AMQP, 1 means AMQP over WebSocket",
@@ -412,6 +415,7 @@ async def run_test_method_parallel_async(self, test_method, worker, logger, proc
412415

413416

414417
if __name__ == '__main__':
418+
load_dotenv(dotenv_path=ENV_FILE, override=True)
415419
parser = ArgumentParser()
416420
runner = StressTestRunner(parser)
417421
runner.run()

0 commit comments

Comments
 (0)