Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from opencensus.ext.azure import metrics_exporter
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
from opencensus.stats import stats as stats_module
from opencensus.stats import view as view_module

from logger import get_azure_logger


class AbstractMonitorMetric:
def __init__(self, test_name, test_description=None):
pass

def record_messages_cpu_memory(self, number_of_events, cpu_usage, memory_usage):
pass

def record_error(self, error, extra=None):
pass


class AzureMonitorMetric(AbstractMonitorMetric):
def __init__(self, test_name, test_description=None):
# oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING'
self.exporter = metrics_exporter.new_metrics_exporter()
self.stats = stats_module.stats
self.view_manager = self.stats.view_manager
self.stats_recorder = self.stats.stats_recorder
self.azure_logger = get_azure_logger(test_name)
self.name = test_name
self.desc = test_description

messages_measure_name = "The number of messages handled by " + self.name
messages_measure_desc = "The number of messages handled by " + self.desc if self.desc else None
memory_measure_name = "memory usage percentage for " + self.name
memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None
cpu_measure_name = "cpu usage percentage for " + self.name
cpu_measure_desc = "cpu usage percentage for " + self.desc if self.desc else None
error_measure_name = "error count for " + self.name
error_measure_desc = "The number of errors happened while running the test for " + self.desc if self.desc else None

self.messages_measure = measure_module.MeasureInt(messages_measure_name, messages_measure_desc, "messages")
self.memory_measure = measure_module.MeasureFloat(memory_measure_name, memory_measure_desc)
self.cpu_measure = measure_module.MeasureFloat(cpu_measure_name, cpu_measure_desc)
self.error_measure = measure_module.MeasureInt(error_measure_name, error_measure_desc)

self.messages_measure_view = view_module.View(
messages_measure_name,
messages_measure_desc,
[],
self.messages_measure,
aggregation_module.SumAggregation()
)

self.memory_measure_view = view_module.View(
memory_measure_name,
memory_measure_desc,
[],
self.memory_measure,
aggregation_module.LastValueAggregation()
)

self.cpu_measure_view = view_module.View(
cpu_measure_name,
cpu_measure_desc,
[],
self.cpu_measure,
aggregation_module.LastValueAggregation()
)

self.error_measure_view = view_module.View(
error_measure_name,
error_measure_desc,
[],
self.error_measure,
aggregation_module.CountAggregation()
)

self.view_manager.register_view(self.messages_measure_view)
self.view_manager.register_view(self.memory_measure_view)
self.view_manager.register_view(self.cpu_measure_view)
self.view_manager.register_view(self.error_measure_view)

self.mmap = self.stats_recorder.new_measurement_map()

def record_messages_cpu_memory(self, number_of_messages, cpu_usage, memory_usage):
self.mmap.measure_int_put(self.messages_measure, number_of_messages)
self.mmap.measure_float_put(self.memory_measure, memory_usage)
self.mmap.measure_float_put(self.cpu_measure, cpu_usage)
self.mmap.record()

def record_error(self, error, extra=None):
self.mmap.measure_int_put(self.error_measure, 1)
self.mmap.record()
self.azure_logger.exception(
"Error happened when running {}: {}. Extra info: {}".format(self.name, repr(error), extra)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
aiohttp>=3.0; python_version >= '3.5'
opencensus-ext-azure
psutil
89 changes: 89 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/stress_tests/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import sys
import logging
from logging.handlers import RotatingFileHandler

from opencensus.ext.azure.log_exporter import AzureLogHandler


def get_base_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
logger = logging.getLogger(logger_name)
logger.setLevel(level)
formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

if print_console:
console_handler = logging.StreamHandler(stream=sys.stdout)
if not logger.handlers:
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

if log_filename:
logger_file_handler = RotatingFileHandler(
log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
logger_file_handler.setFormatter(formatter)
logger.addHandler(logger_file_handler)

return logger


def get_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
stress_logger = logging.getLogger(logger_name)
stress_logger.setLevel(level)
eventhub_logger = logging.getLogger("azure.eventhub")
eventhub_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(level)

formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
if print_console:
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
if not eventhub_logger.handlers:
eventhub_logger.addHandler(console_handler)
if not uamqp_logger.handlers:
uamqp_logger.addHandler(console_handler)
if not stress_logger.handlers:
stress_logger.addHandler(console_handler)

if log_filename:
eventhub_file_handler = RotatingFileHandler(
"eventhub_" + log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
uamqp_file_handler = RotatingFileHandler(
"uamqp_" + log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
stress_file_handler = RotatingFileHandler(
log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
eventhub_file_handler.setFormatter(formatter)
uamqp_file_handler.setFormatter(formatter)
stress_file_handler.setFormatter(formatter)
eventhub_logger.addHandler(eventhub_file_handler)
uamqp_logger.addHandler(uamqp_file_handler)
stress_logger.addHandler(stress_file_handler)

return stress_logger


def get_azure_logger(logger_name, level=logging.INFO):
logger = logging.getLogger("azure_logger_" + logger_name)
logger.setLevel(level)
# oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING'
logger.addHandler(AzureLogHandler())
return logger
119 changes: 119 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/stress_tests/stress_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import asyncio
from argparse import ArgumentParser
from datetime import timedelta

from azure.servicebus import ServiceBusClient
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient

from stress_test_base import StressTestRunner, StressTestRunnerAsync
from app_insights_metric import AzureMonitorMetric
from process_monitor import ProcessMonitor

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]


def sync_send(client, args):
azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Sender")
process_monitor = ProcessMonitor("monitor_sender_stress_sync.log", "sender_stress_sync")
stress_test = StressTestRunner(
senders=[client.get_queue_sender(QUEUE_NAME)],
receivers=[],
message_size=args.message_size,
send_batch_size=args.send_batch_size,
duration=timedelta(seconds=args.duration),
azure_monitor_metric=azure_monitor_metric,
process_monitor=process_monitor,
fail_on_exception=False
)
stress_test.run()


async def async_send(client, args):
azure_monitor_metric = AzureMonitorMetric("Async ServiceBus Sender")
process_monitor = ProcessMonitor("monitor_sender_stress_async.log", "sender_stress_async")
stress_test = StressTestRunnerAsync(
senders=[client.get_queue_sender(QUEUE_NAME)],
receivers=[],
message_size=args.message_size,
send_batch_size=args.send_batch_size,
duration=timedelta(seconds=args.duration),
azure_monitor_metric=azure_monitor_metric,
process_monitor=process_monitor,
fail_on_exception=False
)
await stress_test.run_async()


def sync_receive(client, args):
azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Receiver")
process_monitor = ProcessMonitor("monitor_receiver_stress_sync.log", "receiver_stress_sync")
stress_test = StressTestRunner(
senders=[],
receivers=[client.get_queue_receiver(QUEUE_NAME)],
max_message_count=args.max_message_count,
receive_type=args.receive_type,
max_wait_time=args.max_wait_time,
duration=timedelta(seconds=args.duration),
azure_monitor_metric=azure_monitor_metric,
process_monitor=process_monitor,
fail_on_exception=False
)
stress_test.run()


async def async_receive(client, args):
azure_monitor_metric = AzureMonitorMetric("Async ServiceBus Receiver")
process_monitor = ProcessMonitor("monitor_receiver_stress_async.log", "receiver_stress_async")
stress_test = StressTestRunnerAsync(
senders=[],
receivers=[client.get_queue_receiver(QUEUE_NAME)],
max_message_count=args.max_message_count,
receive_type=args.receive_type,
max_wait_time=args.max_wait_time,
duration=timedelta(seconds=args.duration),
azure_monitor_metric=azure_monitor_metric,
process_monitor=process_monitor,
fail_on_exception=False
)
await stress_test.run_async()


if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("--method", type=str)
parser.add_argument("--duration", type=int, default=259200)
parser.add_argument("--logging-enable", action="store_true")

parser.add_argument("--send-batch-size", type=int, default=100)
parser.add_argument("--message-size", type=int, default=100)

parser.add_argument("--receive-type", type=str, default="pull")
parser.add_argument("--max_wait_time", type=int, default=10)
parser.add_argument("--max_message_count", type=int, default=1)

args, _ = parser.parse_known_args()
loop = asyncio.get_event_loop()

if args.method.startswith("sync"):
sb_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
else:
sb_client = AsyncServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

if args.method == 'sync_send':
sync_send(sb_client, args)
elif args.method == 'async_send':
loop.run_until_complete(async_send(sb_client, args))
elif args.method == 'sync_receive':
sync_receive(sb_client, args)
elif args.method == 'async_receive':
loop.run_until_complete(async_receive(sb_client, args))
else:
raise RuntimeError("Must set a method to run stress test.")
Loading