diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/app_insights_metric.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/app_insights_metric.py new file mode 100644 index 000000000000..edd6174cabbd --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/app_insights_metric.py @@ -0,0 +1,101 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from opencensus.ext.azure import metrics_exporter +from opencensus.stats import aggregation as aggregation_module +from opencensus.stats import measure as measure_module +from opencensus.stats import stats as stats_module +from opencensus.stats import view as view_module + +from logger import get_azure_logger + + +class AbstractMonitorMetric: + def __init__(self, test_name, test_description=None): + pass + + def record_messages_cpu_memory(self, number_of_events, cpu_usage, memory_usage): + pass + + def record_error(self, error, extra=None): + pass + + +class AzureMonitorMetric(AbstractMonitorMetric): + def __init__(self, test_name, test_description=None): + # oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING' + self.exporter = metrics_exporter.new_metrics_exporter() + self.stats = stats_module.stats + self.view_manager = self.stats.view_manager + self.stats_recorder = self.stats.stats_recorder + self.azure_logger = get_azure_logger(test_name) + self.name = test_name + self.desc = test_description + + messages_measure_name = "The number of messages handled by " + self.name + messages_measure_desc = "The number of messages handled by " + self.desc if self.desc else None + memory_measure_name = "memory usage percentage for " + self.name + memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None + cpu_measure_name = "cpu usage percentage for " + self.name + cpu_measure_desc = "cpu usage percentage for " + self.desc if self.desc else None + error_measure_name = "error count for " + self.name + error_measure_desc = "The number of errors happened while running the test for " + self.desc if self.desc else None + + self.messages_measure = measure_module.MeasureInt(messages_measure_name, messages_measure_desc, "messages") + self.memory_measure = measure_module.MeasureFloat(memory_measure_name, memory_measure_desc) + self.cpu_measure = measure_module.MeasureFloat(cpu_measure_name, cpu_measure_desc) + self.error_measure = measure_module.MeasureInt(error_measure_name, error_measure_desc) + + self.messages_measure_view = view_module.View( + messages_measure_name, + messages_measure_desc, + [], + self.messages_measure, + aggregation_module.SumAggregation() + ) + + self.memory_measure_view = view_module.View( + memory_measure_name, + memory_measure_desc, + [], + self.memory_measure, + aggregation_module.LastValueAggregation() + ) + + self.cpu_measure_view = view_module.View( + cpu_measure_name, + cpu_measure_desc, + [], + self.cpu_measure, + aggregation_module.LastValueAggregation() + ) + + self.error_measure_view = view_module.View( + error_measure_name, + error_measure_desc, + [], + self.error_measure, + aggregation_module.CountAggregation() + ) + + self.view_manager.register_view(self.messages_measure_view) + self.view_manager.register_view(self.memory_measure_view) + self.view_manager.register_view(self.cpu_measure_view) + self.view_manager.register_view(self.error_measure_view) + + self.mmap = self.stats_recorder.new_measurement_map() + + def record_messages_cpu_memory(self, number_of_messages, cpu_usage, memory_usage): + self.mmap.measure_int_put(self.messages_measure, number_of_messages) + self.mmap.measure_float_put(self.memory_measure, memory_usage) + self.mmap.measure_float_put(self.cpu_measure, cpu_usage) + self.mmap.record() + + def record_error(self, error, extra=None): + self.mmap.measure_int_put(self.error_measure, 1) + self.mmap.record() + self.azure_logger.exception( + "Error happened when running {}: {}. Extra info: {}".format(self.name, repr(error), extra) + ) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/dev_requirements.txt b/sdk/servicebus/azure-servicebus/tests/stress_tests/dev_requirements.txt new file mode 100644 index 000000000000..c985629b5a49 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/dev_requirements.txt @@ -0,0 +1,3 @@ +aiohttp>=3.0; python_version >= '3.5' +opencensus-ext-azure +psutil diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/logger.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/logger.py new file mode 100644 index 000000000000..0875c2e8a7e0 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/logger.py @@ -0,0 +1,89 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import sys +import logging +from logging.handlers import RotatingFileHandler + +from opencensus.ext.azure.log_exporter import AzureLogHandler + + +def get_base_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None, + log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3): + logger = logging.getLogger(logger_name) + logger.setLevel(level) + formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + + if print_console: + console_handler = logging.StreamHandler(stream=sys.stdout) + if not logger.handlers: + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + if log_filename: + logger_file_handler = RotatingFileHandler( + log_filename, + maxBytes=log_file_max_bytes, + backupCount=log_file_backup_count + ) + logger_file_handler.setFormatter(formatter) + logger.addHandler(logger_file_handler) + + return logger + + +def get_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None, + log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3): + stress_logger = logging.getLogger(logger_name) + stress_logger.setLevel(level) + eventhub_logger = logging.getLogger("azure.eventhub") + eventhub_logger.setLevel(level) + uamqp_logger = logging.getLogger("uamqp") + uamqp_logger.setLevel(level) + + formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + if print_console: + console_handler = logging.StreamHandler(stream=sys.stdout) + console_handler.setFormatter(formatter) + if not eventhub_logger.handlers: + eventhub_logger.addHandler(console_handler) + if not uamqp_logger.handlers: + uamqp_logger.addHandler(console_handler) + if not stress_logger.handlers: + stress_logger.addHandler(console_handler) + + if log_filename: + eventhub_file_handler = RotatingFileHandler( + "eventhub_" + log_filename, + maxBytes=log_file_max_bytes, + backupCount=log_file_backup_count + ) + uamqp_file_handler = RotatingFileHandler( + "uamqp_" + log_filename, + maxBytes=log_file_max_bytes, + backupCount=log_file_backup_count + ) + stress_file_handler = RotatingFileHandler( + log_filename, + maxBytes=log_file_max_bytes, + backupCount=log_file_backup_count + ) + eventhub_file_handler.setFormatter(formatter) + uamqp_file_handler.setFormatter(formatter) + stress_file_handler.setFormatter(formatter) + eventhub_logger.addHandler(eventhub_file_handler) + uamqp_logger.addHandler(uamqp_file_handler) + stress_logger.addHandler(stress_file_handler) + + return stress_logger + + +def get_azure_logger(logger_name, level=logging.INFO): + logger = logging.getLogger("azure_logger_" + logger_name) + logger.setLevel(level) + # oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING' + logger.addHandler(AzureLogHandler()) + return logger diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_runner.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_runner.py new file mode 100644 index 000000000000..c1fe97d8bca1 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_runner.py @@ -0,0 +1,119 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import os +import asyncio +from argparse import ArgumentParser +from datetime import timedelta + +from azure.servicebus import ServiceBusClient +from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient + +from stress_test_base import StressTestRunner, StressTestRunnerAsync +from app_insights_metric import AzureMonitorMetric +from process_monitor import ProcessMonitor + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + + +def sync_send(client, args): + azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Sender") + process_monitor = ProcessMonitor("monitor_sender_stress_sync.log", "sender_stress_sync") + stress_test = StressTestRunner( + senders=[client.get_queue_sender(QUEUE_NAME)], + receivers=[], + message_size=args.message_size, + send_batch_size=args.send_batch_size, + duration=timedelta(seconds=args.duration), + azure_monitor_metric=azure_monitor_metric, + process_monitor=process_monitor, + fail_on_exception=False + ) + stress_test.run() + + +async def async_send(client, args): + azure_monitor_metric = AzureMonitorMetric("Async ServiceBus Sender") + process_monitor = ProcessMonitor("monitor_sender_stress_async.log", "sender_stress_async") + stress_test = StressTestRunnerAsync( + senders=[client.get_queue_sender(QUEUE_NAME)], + receivers=[], + message_size=args.message_size, + send_batch_size=args.send_batch_size, + duration=timedelta(seconds=args.duration), + azure_monitor_metric=azure_monitor_metric, + process_monitor=process_monitor, + fail_on_exception=False + ) + await stress_test.run_async() + + +def sync_receive(client, args): + azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Receiver") + process_monitor = ProcessMonitor("monitor_receiver_stress_sync.log", "receiver_stress_sync") + stress_test = StressTestRunner( + senders=[], + receivers=[client.get_queue_receiver(QUEUE_NAME)], + max_message_count=args.max_message_count, + receive_type=args.receive_type, + max_wait_time=args.max_wait_time, + duration=timedelta(seconds=args.duration), + azure_monitor_metric=azure_monitor_metric, + process_monitor=process_monitor, + fail_on_exception=False + ) + stress_test.run() + + +async def async_receive(client, args): + azure_monitor_metric = AzureMonitorMetric("Async ServiceBus Receiver") + process_monitor = ProcessMonitor("monitor_receiver_stress_async.log", "receiver_stress_async") + stress_test = StressTestRunnerAsync( + senders=[], + receivers=[client.get_queue_receiver(QUEUE_NAME)], + max_message_count=args.max_message_count, + receive_type=args.receive_type, + max_wait_time=args.max_wait_time, + duration=timedelta(seconds=args.duration), + azure_monitor_metric=azure_monitor_metric, + process_monitor=process_monitor, + fail_on_exception=False + ) + await stress_test.run_async() + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument("--method", type=str) + parser.add_argument("--duration", type=int, default=259200) + parser.add_argument("--logging-enable", action="store_true") + + parser.add_argument("--send-batch-size", type=int, default=100) + parser.add_argument("--message-size", type=int, default=100) + + parser.add_argument("--receive-type", type=str, default="pull") + parser.add_argument("--max_wait_time", type=int, default=10) + parser.add_argument("--max_message_count", type=int, default=1) + + args, _ = parser.parse_known_args() + loop = asyncio.get_event_loop() + + if args.method.startswith("sync"): + sb_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + else: + sb_client = AsyncServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + if args.method == 'sync_send': + sync_send(sb_client, args) + elif args.method == 'async_send': + loop.run_until_complete(async_send(sb_client, args)) + elif args.method == 'sync_receive': + sync_receive(sb_client, args) + elif args.method == 'async_receive': + loop.run_until_complete(async_receive(sb_client, args)) + else: + raise RuntimeError("Must set a method to run stress test.") diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py index 67700a165d2b..5e02e319633b 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py @@ -9,7 +9,7 @@ from datetime import datetime, timedelta import concurrent import sys -import uuid +import asyncio import logging try: @@ -20,8 +20,12 @@ from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusMessageBatch from azure.servicebus.exceptions import MessageAlreadySettled -from utilities import _build_logger -_logger = _build_logger("stress-test", logging.WARN) + +import logger +from app_insights_metric import AbstractMonitorMetric +from process_monitor import AbstractProcessMonitor +_logger = logger.get_base_logger("stress-test.log", "stress_test", logging.WARN) + class ReceiveType: push="push" @@ -63,6 +67,7 @@ def populate_process_stats(self): except NameError: return # psutil was not installed, fall back to simply not capturing these stats. + class StressTestRunner: '''Framework for running a service bus stress test. Duration can be overriden via the --stress_test_duration flag from the command line''' @@ -80,7 +85,9 @@ def __init__(self, should_complete_messages = True, max_message_count = 1, send_session_id = None, - fail_on_exception = True): + fail_on_exception = True, + azure_monitor_metric=None, + process_monitor=None): self.senders = senders self.receivers = receivers self.duration=duration @@ -94,6 +101,8 @@ def __init__(self, self.max_message_count = max_message_count self.fail_on_exception = fail_on_exception self.send_session_id = send_session_id + self.azure_monitor_metric = azure_monitor_metric or AbstractMonitorMetric("fake_test_name") + self.process_monitor = process_monitor or AbstractProcessMonitor("fake_log_filename", "fake_logger_name") # Because of pickle we need to create a state object and not just pass around ourselves. # If we ever require multiple runs of this one after another, just make Run() reset this. @@ -162,7 +171,7 @@ def _construct_message(self): message = ServiceBusMessage(self.pre_process_message_body("a" * self.message_size)) self.pre_process_message(message) batch.add_message(message) - self.PreProcessMessageBatch(batch) + self.pre_process_message_batch(batch) return batch else: message = ServiceBusMessage(self.pre_process_message_body("a" * self.message_size)) @@ -182,13 +191,19 @@ def _send(self, sender, end_time): if self.send_session_id != None: message.session_id = self.send_session_id sender.send_messages(message) + self.azure_monitor_metric.record_messages_cpu_memory( + self.send_batch_size, + self.process_monitor.cpu_usage_percent, + self.process_monitor.memory_usage_percent + ) + self._state.total_sent += self.send_batch_size self.on_send(self._state, message, sender) except Exception as e: _logger.exception("Exception during send: {}".format(e)) + self.azure_monitor_metric.record_error(e) self._state.exceptions.append(e) if self.fail_on_exception: raise - self._state.total_sent += 1 time.sleep(self.send_delay) self._state.timestamp = datetime.utcnow() return self._state @@ -223,44 +238,185 @@ def _receive(self, receiver, end_time): if end_time <= datetime.utcnow(): break time.sleep(self.receive_delay) + self.azure_monitor_metric.record_messages_cpu_memory( + 1, + self.process_monitor.cpu_usage_percent, + self.process_monitor.memory_usage_percent + ) self.post_receive(self._state, receiver) except Exception as e: _logger.exception("Exception during receive: {}".format(e)) self._state.exceptions.append(e) + self.azure_monitor_metric.record_error(e) if self.fail_on_exception: raise self._state.timestamp = datetime.utcnow() return self._state except Exception as e: + self.azure_monitor_metric.record_error(e) _logger.exception("Exception in receiver {}".format(e)) self._should_stop = True raise - def run(self): start_time = datetime.utcnow() end_time = start_time + (self._duration_override or self.duration) - sent_messages = 0 - received_messages = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as proc_pool: - _logger.info("STARTING PROC POOL") - senders = [proc_pool.submit(self._send, sender, end_time) for sender in self.senders] - receivers = [proc_pool.submit(self._receive, receiver, end_time) for receiver in self.receivers] + with self.process_monitor: + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as proc_pool: + _logger.info("STARTING PROC POOL") + senders = [proc_pool.submit(self._send, sender, end_time) for sender in self.senders] + receivers = [proc_pool.submit(self._receive, receiver, end_time) for receiver in self.receivers] + + result = StressTestResults() + for each in concurrent.futures.as_completed(senders + receivers): + _logger.info("SOMETHING FINISHED") + if each in senders: + result.state_by_sender[each] = each.result() + if each in receivers: + result.state_by_receiver[each] = each.result() + # TODO: do as_completed in one batch to provide a way to short-circuit on failure. + result.state_by_sender = {s:f.result() for s,f in zip(self.senders, concurrent.futures.as_completed(senders))} + result.state_by_receiver = {r:f.result() for r,f in zip(self.receivers, concurrent.futures.as_completed(receivers))} + _logger.info("got receiver results") + result.total_sent = sum([r.total_sent for r in result.state_by_sender.values()]) + result.total_received = sum([r.total_received for r in result.state_by_receiver.values()]) + result.time_elapsed = end_time - start_time + _logger.critical("Stress test completed. Results:\n{}".format(result)) + return result + + +class StressTestRunnerAsync(StressTestRunner): + def __init__( + self, + senders, + receivers, + duration=timedelta(minutes=15), + receive_type=ReceiveType.push, + send_batch_size=None, + message_size=10, + max_wait_time=10, + send_delay=.01, + receive_delay=0, + should_complete_messages=True, + max_message_count=1, + send_session_id=None, + fail_on_exception=True, + azure_monitor_metric=None, + process_monitor=None + ): + super(StressTestRunnerAsync, self).__init__( + senders, + receivers, + duration=duration, + receive_type=receive_type, + send_batch_size=send_batch_size, + message_size=message_size, + max_wait_time=max_wait_time, + send_delay=send_delay, + receive_delay=receive_delay, + should_complete_messages=should_complete_messages, + max_message_count=max_message_count, + send_session_id=send_session_id, + fail_on_exception=fail_on_exception, + azure_monitor_metric=azure_monitor_metric, + process_monitor=process_monitor + ) + + async def _send_async(self, sender, end_time): + self._schedule_interval_logger(end_time, "Sender " + str(self)) + try: + _logger.info("STARTING SENDER") + async with sender: + while end_time > datetime.utcnow() and not self._should_stop: + _logger.info("SENDING") + try: + message = self._construct_message() + if self.send_session_id != None: + message.session_id = self.send_session_id + await sender.send_messages(message) + self.azure_monitor_metric.record_messages_cpu_memory( + self.send_batch_size, + self.process_monitor.cpu_usage_percent, + self.process_monitor.memory_usage_percent + ) + self._state.total_sent += self.send_batch_size + self.on_send(self._state, message, sender) + except Exception as e: + _logger.exception("Exception during send: {}".format(e)) + self.azure_monitor_metric.record_error(e) + self._state.exceptions.append(e) + if self.fail_on_exception: + raise + await asyncio.sleep(self.send_delay) + self._state.timestamp = datetime.utcnow() + return self._state + except Exception as e: + _logger.exception("Exception in sender: {}".format(e)) + self._should_stop = True + raise + async def _receive_handle_message(self, message, receiver, end_time): + self.on_receive(self._state, message, receiver) + try: + if self.should_complete_messages: + await receiver.complete_message(message) + except MessageAlreadySettled: # It may have been settled in the plugin callback. + pass + self._state.total_received += 1 + # TODO: Get EnqueuedTimeUtc out of broker properties and calculate latency. Should properties/app properties be mostly None? + await asyncio.sleep(self.receive_delay) + self.azure_monitor_metric.record_messages_cpu_memory( + 1, + self.process_monitor.cpu_usage_percent, + self.process_monitor.memory_usage_percent + ) + + async def _receive_async(self, receiver, end_time): + self._schedule_interval_logger(end_time, "Receiver " + str(self)) + try: + async with receiver: + while end_time > datetime.utcnow() and not self._should_stop: + _logger.info("RECEIVE LOOP") + try: + if self.receive_type == ReceiveType.pull: + batch = await receiver.receive_messages(max_message_count=self.max_message_count, max_wait_time=self.max_wait_time) + for message in batch: + await self._receive_handle_message(message, receiver, end_time) + elif self.receive_type == ReceiveType.push: + batch = receiver._get_streaming_message_iter(max_wait_time=self.max_wait_time) + async for message in batch: + if end_time <= datetime.utcnow(): + break + await self._receive_handle_message(message,receiver, end_time) + self.post_receive(self._state, receiver) + except Exception as e: + _logger.exception("Exception during receive: {}".format(e)) + self._state.exceptions.append(e) + self.azure_monitor_metric.record_error(e) + if self.fail_on_exception: + raise + self._state.timestamp = datetime.utcnow() + return self._state + except Exception as e: + self.azure_monitor_metric.record_error(e) + _logger.exception("Exception in receiver {}".format(e)) + self._should_stop = True + raise + + async def run_async(self): + start_time = datetime.utcnow() + end_time = start_time + (self._duration_override or self.duration) + send_tasks = [asyncio.create_task(self._send_async(sender, end_time)) for sender in self.senders] + receive_tasks = [asyncio.create_task(self._receive_async(receiver, end_time)) for receiver in self.receivers] + with self.process_monitor: + await asyncio.gather(*send_tasks, *receive_tasks) result = StressTestResults() - for each in concurrent.futures.as_completed(senders + receivers): - _logger.info("SOMETHING FINISHED") - if each in senders: - result.state_by_sender[each] = each.result() - if each in receivers: - result.state_by_receiver[each] = each.result() - # TODO: do as_completed in one batch to provide a way to short-circuit on failure. - result.state_by_sender = {s:f.result() for s,f in zip(self.senders, concurrent.futures.as_completed(senders))} - result.state_by_receiver = {r:f.result() for r,f in zip(self.receivers, concurrent.futures.as_completed(receivers))} - _logger.info("got receiever results") + result.state_by_sender = {s: f.result() for s, f in zip(self.senders, send_tasks)} + result.state_by_receiver = {r: f.result() for r, f in + zip(self.receivers, receive_tasks)} + _logger.info("got receiver results") result.total_sent = sum([r.total_sent for r in result.state_by_sender.values()]) result.total_received = sum([r.total_received for r in result.state_by_receiver.values()]) result.time_elapsed = end_time - start_time _logger.critical("Stress test completed. Results:\n{}".format(result)) return result -