Skip to content

Commit 15523b7

Browse files
authored
[ServiceBus] Stress test embracing appinsights (#19501)
* draft PR for sb stress test adopting appinsights * add async stress test code and add app insights support * minor fix * feedback * improve on the logger
1 parent 024e007 commit 15523b7

File tree

5 files changed

+492
-24
lines changed

5 files changed

+492
-24
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
from opencensus.ext.azure import metrics_exporter
7+
from opencensus.stats import aggregation as aggregation_module
8+
from opencensus.stats import measure as measure_module
9+
from opencensus.stats import stats as stats_module
10+
from opencensus.stats import view as view_module
11+
12+
from logger import get_azure_logger
13+
14+
15+
class AbstractMonitorMetric:
16+
def __init__(self, test_name, test_description=None):
17+
pass
18+
19+
def record_messages_cpu_memory(self, number_of_events, cpu_usage, memory_usage):
20+
pass
21+
22+
def record_error(self, error, extra=None):
23+
pass
24+
25+
26+
class AzureMonitorMetric(AbstractMonitorMetric):
27+
def __init__(self, test_name, test_description=None):
28+
# oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING'
29+
self.exporter = metrics_exporter.new_metrics_exporter()
30+
self.stats = stats_module.stats
31+
self.view_manager = self.stats.view_manager
32+
self.stats_recorder = self.stats.stats_recorder
33+
self.azure_logger = get_azure_logger(test_name)
34+
self.name = test_name
35+
self.desc = test_description
36+
37+
messages_measure_name = "The number of messages handled by " + self.name
38+
messages_measure_desc = "The number of messages handled by " + self.desc if self.desc else None
39+
memory_measure_name = "memory usage percentage for " + self.name
40+
memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None
41+
cpu_measure_name = "cpu usage percentage for " + self.name
42+
cpu_measure_desc = "cpu usage percentage for " + self.desc if self.desc else None
43+
error_measure_name = "error count for " + self.name
44+
error_measure_desc = "The number of errors happened while running the test for " + self.desc if self.desc else None
45+
46+
self.messages_measure = measure_module.MeasureInt(messages_measure_name, messages_measure_desc, "messages")
47+
self.memory_measure = measure_module.MeasureFloat(memory_measure_name, memory_measure_desc)
48+
self.cpu_measure = measure_module.MeasureFloat(cpu_measure_name, cpu_measure_desc)
49+
self.error_measure = measure_module.MeasureInt(error_measure_name, error_measure_desc)
50+
51+
self.messages_measure_view = view_module.View(
52+
messages_measure_name,
53+
messages_measure_desc,
54+
[],
55+
self.messages_measure,
56+
aggregation_module.SumAggregation()
57+
)
58+
59+
self.memory_measure_view = view_module.View(
60+
memory_measure_name,
61+
memory_measure_desc,
62+
[],
63+
self.memory_measure,
64+
aggregation_module.LastValueAggregation()
65+
)
66+
67+
self.cpu_measure_view = view_module.View(
68+
cpu_measure_name,
69+
cpu_measure_desc,
70+
[],
71+
self.cpu_measure,
72+
aggregation_module.LastValueAggregation()
73+
)
74+
75+
self.error_measure_view = view_module.View(
76+
error_measure_name,
77+
error_measure_desc,
78+
[],
79+
self.error_measure,
80+
aggregation_module.CountAggregation()
81+
)
82+
83+
self.view_manager.register_view(self.messages_measure_view)
84+
self.view_manager.register_view(self.memory_measure_view)
85+
self.view_manager.register_view(self.cpu_measure_view)
86+
self.view_manager.register_view(self.error_measure_view)
87+
88+
self.mmap = self.stats_recorder.new_measurement_map()
89+
90+
def record_messages_cpu_memory(self, number_of_messages, cpu_usage, memory_usage):
91+
self.mmap.measure_int_put(self.messages_measure, number_of_messages)
92+
self.mmap.measure_float_put(self.memory_measure, memory_usage)
93+
self.mmap.measure_float_put(self.cpu_measure, cpu_usage)
94+
self.mmap.record()
95+
96+
def record_error(self, error, extra=None):
97+
self.mmap.measure_int_put(self.error_measure, 1)
98+
self.mmap.record()
99+
self.azure_logger.exception(
100+
"Error happened when running {}: {}. Extra info: {}".format(self.name, repr(error), extra)
101+
)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
aiohttp>=3.0; python_version >= '3.5'
2+
opencensus-ext-azure
3+
psutil
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
import os
7+
import sys
8+
import logging
9+
from logging.handlers import RotatingFileHandler
10+
11+
from opencensus.ext.azure.log_exporter import AzureLogHandler
12+
13+
14+
def get_base_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
15+
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
16+
logger = logging.getLogger(logger_name)
17+
logger.setLevel(level)
18+
formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
19+
20+
if print_console:
21+
console_handler = logging.StreamHandler(stream=sys.stdout)
22+
if not logger.handlers:
23+
console_handler.setFormatter(formatter)
24+
logger.addHandler(console_handler)
25+
26+
if log_filename:
27+
logger_file_handler = RotatingFileHandler(
28+
log_filename,
29+
maxBytes=log_file_max_bytes,
30+
backupCount=log_file_backup_count
31+
)
32+
logger_file_handler.setFormatter(formatter)
33+
logger.addHandler(logger_file_handler)
34+
35+
return logger
36+
37+
38+
def get_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
39+
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
40+
stress_logger = logging.getLogger(logger_name)
41+
stress_logger.setLevel(level)
42+
eventhub_logger = logging.getLogger("azure.eventhub")
43+
eventhub_logger.setLevel(level)
44+
uamqp_logger = logging.getLogger("uamqp")
45+
uamqp_logger.setLevel(level)
46+
47+
formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
48+
if print_console:
49+
console_handler = logging.StreamHandler(stream=sys.stdout)
50+
console_handler.setFormatter(formatter)
51+
if not eventhub_logger.handlers:
52+
eventhub_logger.addHandler(console_handler)
53+
if not uamqp_logger.handlers:
54+
uamqp_logger.addHandler(console_handler)
55+
if not stress_logger.handlers:
56+
stress_logger.addHandler(console_handler)
57+
58+
if log_filename:
59+
eventhub_file_handler = RotatingFileHandler(
60+
"eventhub_" + log_filename,
61+
maxBytes=log_file_max_bytes,
62+
backupCount=log_file_backup_count
63+
)
64+
uamqp_file_handler = RotatingFileHandler(
65+
"uamqp_" + log_filename,
66+
maxBytes=log_file_max_bytes,
67+
backupCount=log_file_backup_count
68+
)
69+
stress_file_handler = RotatingFileHandler(
70+
log_filename,
71+
maxBytes=log_file_max_bytes,
72+
backupCount=log_file_backup_count
73+
)
74+
eventhub_file_handler.setFormatter(formatter)
75+
uamqp_file_handler.setFormatter(formatter)
76+
stress_file_handler.setFormatter(formatter)
77+
eventhub_logger.addHandler(eventhub_file_handler)
78+
uamqp_logger.addHandler(uamqp_file_handler)
79+
stress_logger.addHandler(stress_file_handler)
80+
81+
return stress_logger
82+
83+
84+
def get_azure_logger(logger_name, level=logging.INFO):
85+
logger = logging.getLogger("azure_logger_" + logger_name)
86+
logger.setLevel(level)
87+
# oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING'
88+
logger.addHandler(AzureLogHandler())
89+
return logger
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#-------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for
4+
# license information.
5+
#--------------------------------------------------------------------------
6+
7+
import os
8+
import asyncio
9+
from argparse import ArgumentParser
10+
from datetime import timedelta
11+
12+
from azure.servicebus import ServiceBusClient
13+
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient
14+
15+
from stress_test_base import StressTestRunner, StressTestRunnerAsync
16+
from app_insights_metric import AzureMonitorMetric
17+
from process_monitor import ProcessMonitor
18+
19+
CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
20+
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]
21+
22+
23+
def sync_send(client, args):
24+
azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Sender")
25+
process_monitor = ProcessMonitor("monitor_sender_stress_sync.log", "sender_stress_sync")
26+
stress_test = StressTestRunner(
27+
senders=[client.get_queue_sender(QUEUE_NAME)],
28+
receivers=[],
29+
message_size=args.message_size,
30+
send_batch_size=args.send_batch_size,
31+
duration=timedelta(seconds=args.duration),
32+
azure_monitor_metric=azure_monitor_metric,
33+
process_monitor=process_monitor,
34+
fail_on_exception=False
35+
)
36+
stress_test.run()
37+
38+
39+
async def async_send(client, args):
40+
azure_monitor_metric = AzureMonitorMetric("Async ServiceBus Sender")
41+
process_monitor = ProcessMonitor("monitor_sender_stress_async.log", "sender_stress_async")
42+
stress_test = StressTestRunnerAsync(
43+
senders=[client.get_queue_sender(QUEUE_NAME)],
44+
receivers=[],
45+
message_size=args.message_size,
46+
send_batch_size=args.send_batch_size,
47+
duration=timedelta(seconds=args.duration),
48+
azure_monitor_metric=azure_monitor_metric,
49+
process_monitor=process_monitor,
50+
fail_on_exception=False
51+
)
52+
await stress_test.run_async()
53+
54+
55+
def sync_receive(client, args):
56+
azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Receiver")
57+
process_monitor = ProcessMonitor("monitor_receiver_stress_sync.log", "receiver_stress_sync")
58+
stress_test = StressTestRunner(
59+
senders=[],
60+
receivers=[client.get_queue_receiver(QUEUE_NAME)],
61+
max_message_count=args.max_message_count,
62+
receive_type=args.receive_type,
63+
max_wait_time=args.max_wait_time,
64+
duration=timedelta(seconds=args.duration),
65+
azure_monitor_metric=azure_monitor_metric,
66+
process_monitor=process_monitor,
67+
fail_on_exception=False
68+
)
69+
stress_test.run()
70+
71+
72+
async def async_receive(client, args):
73+
azure_monitor_metric = AzureMonitorMetric("Async ServiceBus Receiver")
74+
process_monitor = ProcessMonitor("monitor_receiver_stress_async.log", "receiver_stress_async")
75+
stress_test = StressTestRunnerAsync(
76+
senders=[],
77+
receivers=[client.get_queue_receiver(QUEUE_NAME)],
78+
max_message_count=args.max_message_count,
79+
receive_type=args.receive_type,
80+
max_wait_time=args.max_wait_time,
81+
duration=timedelta(seconds=args.duration),
82+
azure_monitor_metric=azure_monitor_metric,
83+
process_monitor=process_monitor,
84+
fail_on_exception=False
85+
)
86+
await stress_test.run_async()
87+
88+
89+
if __name__ == '__main__':
90+
parser = ArgumentParser()
91+
parser.add_argument("--method", type=str)
92+
parser.add_argument("--duration", type=int, default=259200)
93+
parser.add_argument("--logging-enable", action="store_true")
94+
95+
parser.add_argument("--send-batch-size", type=int, default=100)
96+
parser.add_argument("--message-size", type=int, default=100)
97+
98+
parser.add_argument("--receive-type", type=str, default="pull")
99+
parser.add_argument("--max_wait_time", type=int, default=10)
100+
parser.add_argument("--max_message_count", type=int, default=1)
101+
102+
args, _ = parser.parse_known_args()
103+
loop = asyncio.get_event_loop()
104+
105+
if args.method.startswith("sync"):
106+
sb_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
107+
else:
108+
sb_client = AsyncServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
109+
110+
if args.method == 'sync_send':
111+
sync_send(sb_client, args)
112+
elif args.method == 'async_send':
113+
loop.run_until_complete(async_send(sb_client, args))
114+
elif args.method == 'sync_receive':
115+
sync_receive(sb_client, args)
116+
elif args.method == 'async_receive':
117+
loop.run_until_complete(async_receive(sb_client, args))
118+
else:
119+
raise RuntimeError("Must set a method to run stress test.")

0 commit comments

Comments
 (0)