diff --git a/.gitignore b/.gitignore index 105533bf0748..87b0050a20cc 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,9 @@ build/ TestResults/ ENV_DIR/ +# Perf test profiling +cProfile-*.pstats + # tox generated artifacts test-junit-*.xml pylint-*.out.txt diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index 3b68426f62aa..2865d2eac6c4 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -2,6 +2,7 @@ 1. [The perfstress framework](#the-perfstress-framework) - [The PerfStressTest base](#the-perfstresstest-base) - [Default command options](#default-command-options) + - [Running with test proxy](#running-with-test-proxy) 2. [Adding performance tests to an SDK](#adding-performance-tests-to-an-sdk) - [Writing a test](#writing-a-test) - [Adding legacy T1 tests](#adding-legacy-t1-tests) @@ -38,6 +39,16 @@ class PerfStressTest: async def global_cleanup(self): # Can be optionally defined. Only run once, regardless of parallelism. + async def record_and_start_playback(self): + # Set up the recording on the test proxy, and configure the proxy in playback mode. + # This function is only run if a test proxy URL is provided (-x). + # There should be no need to overwrite this function. + + async def stop_playback(self): + # Configure the proxy out of playback mode and discard the recording. + # This function is only run if a test proxy URL is provided (-x). + # There should be no need to overwrite this function. + async def setup(self): # Can be optionally defined. Run once per test instance, after global_setup. @@ -65,12 +76,24 @@ class PerfStressTest: ``` ## Default command options The framework has a series of common command line options built in: -- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. -- `--iterations=1` Number of test iterations to run. Default is 1. -- `--parallel=1` Number of tests to run in parallel. Default is 1. -- `--warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. +- `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. +- `-i --iterations=1` Number of test iterations to run. Default is 1. +- `-p --parallel=1` Number of tests to run in parallel. Default is 1. +- `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. - `--sync` Whether to run the tests in sync or async. Default is False (async). - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). +- `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001"). +- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile---.pstats"`. + + +## Running with the test proxy +Follow the instructions here to install and run the test proxy server: +https://github.com/Azure/azure-sdk-tools/tree/feature/http-recording-server/tools/test-proxy/Azure.Sdk.Tools.TestProxy + +Once running, in a separate process run the perf test in question, combined with the `-x` flag to specify the proxy endpoint. +```cmd +(env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001" +``` # Adding performance tests to an SDK The performance tests will be in a submodule called `perfstress_tests` within the `tests` directory in an SDK project. @@ -351,5 +374,5 @@ Using the `perfstress` command alone will list the available perf tests found. N Please add a `README.md` to the perfstress_tests directory so that others know how to setup and run the perf tests, along with a description of the available tests and any support command line options. README files in a `tests/perfstress_tests` directory should already be filtered from CI validation for SDK readmes. Some examples can be found here: -- [Azure Storage Blob](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md) -- [Azure Service Bus](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md) \ No newline at end of file +- [Azure Storage Blob](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md) +- [Azure Service Bus](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md) \ No newline at end of file diff --git a/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py b/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py index b40bb6c20841..37230cef3214 100644 --- a/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py +++ b/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py @@ -21,8 +21,8 @@ def __init__(self, arguments): # Create clients vault_url = self.get_from_env("AZURE_KEYVAULT_URL") - self.client = SecretClient(vault_url, self.credential) - self.async_client = AsyncSecretClient(vault_url, self.async_credential) + self.client = SecretClient(vault_url, self.credential, **self._client_kwargs) + self.async_client = AsyncSecretClient(vault_url, self.async_credential, **self._client_kwargs) async def global_setup(self): """The global setup is run only once.""" diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py index a87a409796e1..d13de28f8711 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py @@ -122,7 +122,6 @@ def __init__( self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline) default_api_version = self._client._config.version # pylint: disable=protected-access self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access - self._loop = kwargs.get('loop', None) @distributed_trace_async async def get_account_information(self, **kwargs): # type: ignore diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py index d3d72ba65389..15b31566520d 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py @@ -119,7 +119,6 @@ def __init__( self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline) default_api_version = self._client._config.version # pylint: disable=protected-access self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access - self._loop = kwargs.get('loop', None) @distributed_trace_async async def get_user_delegation_key(self, key_start_time, # type: datetime @@ -620,7 +619,7 @@ def get_container_client(self, container): credential=self.credential, api_version=self.api_version, _configuration=self._config, _pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts, require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, - key_resolver_function=self.key_resolver_function, loop=self._loop) + key_resolver_function=self.key_resolver_function) def get_blob_client( self, container, # type: Union[ContainerProperties, str] @@ -675,4 +674,4 @@ def get_blob_client( credential=self.credential, api_version=self.api_version, _configuration=self._config, _pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts, require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, - key_resolver_function=self.key_resolver_function, loop=self._loop) + key_resolver_function=self.key_resolver_function) diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py index 93cc87748e34..cd0164392ab6 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py @@ -119,7 +119,6 @@ def __init__( self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline) default_api_version = self._client._config.version # pylint: disable=protected-access self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access - self._loop = kwargs.get('loop', None) @distributed_trace_async async def create_container(self, metadata=None, public_access=None, **kwargs): @@ -1207,4 +1206,4 @@ def get_blob_client( credential=self.credential, api_version=self.api_version, _configuration=self._config, _pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts, require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, - key_resolver_function=self.key_resolver_function, loop=self._loop) + key_resolver_function=self.key_resolver_function) diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md b/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md index f5bcf6dfad3c..0289e3b68bba 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md @@ -37,13 +37,15 @@ Using the `perfstress` command alone will list the available perf tests found. N ### Common perf command line options These options are available for all perf tests: -- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. -- `--iterations=1` Number of test iterations to run. Default is 1. -- `--parallel=1` Number of tests to run in parallel. Default is 1. +- `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. +- `-i --iterations=1` Number of test iterations to run. Default is 1. +- `-p --parallel=1` Number of tests to run in parallel. Default is 1. - `--no-client-share` Whether each parallel test instance should share a single client, or use their own. Default is False (sharing). -- `--warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. +- `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. - `--sync` Whether to run the tests in sync or async. Default is False (async). This flag must be used for Storage legacy tests, which do not support async. - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). +- `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001"). WARNING: When using with Legacy tests - only HTTPS is supported. +- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile---.pstats"`. ### Common Blob command line options The options are available for all Blob perf tests: @@ -77,3 +79,12 @@ The tests currently written for the T1 SDK: ```cmd (env) ~/azure-storage-blob/tests> perfstress UploadTest --parallel=2 --size=10240 ``` + +## Running with the test proxy +Follow the instructions here to install and run the test proxy server: +https://github.com/Azure/azure-sdk-tools/tree/feature/http-recording-server/tools/test-proxy/Azure.Sdk.Tools.TestProxy + +Once running, in a separate process run the perf test in question, combined with the `-x` flag to specify the proxy endpoint. (Note, only the HTTPS endpoint is supported for the Legacy tests). +```cmd +(env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001" +``` diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py index 0e224e68bf2d..a5a3d607f972 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py @@ -5,11 +5,30 @@ import os import uuid +import functools + +import requests from azure_devtools.perfstress_tests import PerfStressTest from azure.storage.blob import BlockBlobService + +def test_proxy_callback(proxy_policy, request): + if proxy_policy.recording_id and proxy_policy.mode: + live_endpoint = request.host + request.host = proxy_policy._proxy_url.netloc + request.headers["x-recording-id"] = proxy_policy.recording_id + request.headers["x-recording-mode"] = proxy_policy.mode + request.headers["x-recording-remove"] = "false" + + # Ensure x-recording-upstream-base-uri header is only set once, since the + # same HttpMessage will be reused on retries + if "x-recording-upstream-base-uri" not in request.headers: + original_endpoint = "https://{}".format(live_endpoint) + request.headers["x-recording-upstream-base-uri"] = original_endpoint + + class _LegacyServiceTest(PerfStressTest): service_client = None async_service_client = None @@ -17,14 +36,26 @@ class _LegacyServiceTest(PerfStressTest): def __init__(self, arguments): super().__init__(arguments) connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING") + session = None + if self.args.test_proxy: + session = requests.Session() + session.verify = False if not _LegacyServiceTest.service_client or self.args.no_client_share: - _LegacyServiceTest.service_client = BlockBlobService(connection_string=connection_string) + _LegacyServiceTest.service_client = BlockBlobService( + connection_string=connection_string, + request_session=session) _LegacyServiceTest.service_client.MAX_SINGLE_PUT_SIZE = self.args.max_put_size _LegacyServiceTest.service_client.MAX_BLOCK_SIZE = self.args.max_block_size _LegacyServiceTest.service_client.MIN_LARGE_BLOCK_UPLOAD_THRESHOLD = self.args.buffer_threshold self.async_service_client = None self.service_client = _LegacyServiceTest.service_client + if self.args.test_proxy: + self.service_client.request_callback = functools.partial( + test_proxy_callback, + self._test_proxy_policy + ) + @staticmethod def add_arguments(parser): super(_LegacyServiceTest, _LegacyServiceTest).add_arguments(parser) diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py index 678ea3986e27..ca46e67ffccb 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py @@ -20,13 +20,16 @@ class _ServiceTest(PerfStressTest): def __init__(self, arguments): super().__init__(arguments) connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING") - kwargs = {} - kwargs['max_single_put_size'] = self.args.max_put_size - kwargs['max_block_size'] = self.args.max_block_size - kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold + if self.args.test_proxy: + self._client_kwargs['_additional_pipeline_policies'] = self._client_kwargs['per_retry_policies'] + self._client_kwargs['max_single_put_size'] = self.args.max_put_size + self._client_kwargs['max_block_size'] = self.args.max_block_size + self._client_kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold + # self._client_kwargs['api_version'] = '2019-02-02' # Used only for comparison with T1 legacy tests + if not _ServiceTest.service_client or self.args.no_client_share: - _ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs) - _ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs) + _ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **self._client_kwargs) + _ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **self._client_kwargs) self.service_client = _ServiceTest.service_client self.async_service_client =_ServiceTest.async_service_client diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py index ce73c910cf2a..f5f35a86fff1 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py @@ -22,7 +22,8 @@ async def global_setup(self): next_upload = next(pending) running.add(next_upload) except StopIteration: - await asyncio.wait(running, return_when=asyncio.ALL_COMPLETED) + if running: + await asyncio.wait(running, return_when=asyncio.ALL_COMPLETED) break def run_sync(self): diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py new file mode 100644 index 000000000000..5be416c1175a --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py @@ -0,0 +1,38 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from urllib.parse import urlparse + +from azure.core.pipeline.policies import SansIOHTTPPolicy + + +class PerfTestProxyPolicy(SansIOHTTPPolicy): + + def __init__(self, url): + self.recording_id = None + self.mode = None + self._proxy_url = urlparse(url) + + def redirect_to_test_proxy(self, request): + if self.recording_id and self.mode: + request.context.options['connection_verify'] = False + live_endpoint = urlparse(request.http_request.url) + redirected = live_endpoint._replace( + scheme=self._proxy_url.scheme, + netloc=self._proxy_url.netloc + ) + request.http_request.url = redirected.geturl() + request.http_request.headers["x-recording-id"] = self.recording_id + request.http_request.headers["x-recording-mode"] = self.mode + request.http_request.headers["x-recording-remove"] = "false" + + # Ensure x-recording-upstream-base-uri header is only set once, since the + # same HttpMessage will be reused on retries + if "x-recording-upstream-base-uri" not in request.http_request.headers: + original_endpoint = "{}://{}".format(live_endpoint.scheme, live_endpoint.netloc) + request.http_request.headers["x-recording-upstream-base-uri"] = original_endpoint + + def on_request(self, request): + self.redirect_to_test_proxy(request) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py index 3d171d96ac74..a48ded31e10c 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py @@ -93,6 +93,12 @@ def _parse_args(self): per_test_arg_parser.add_argument( "--sync", action="store_true", help="Run tests in sync mode. Default is False.", default=False ) + per_test_arg_parser.add_argument( + "--profile", action="store_true", help="Run tests with profiler. Default is False.", default=False + ) + per_test_arg_parser.add_argument( + "-x", "--test-proxy", help="URI of TestProxy Server" + ) # Per-test args self._test_class_to_run.add_arguments(per_test_arg_parser) @@ -134,9 +140,13 @@ async def start(self): await tests[0].global_setup() try: await asyncio.gather(*[test.setup() for test in tests]) - self.logger.info("") + if self.per_test_args.test_proxy: + self.logger.info("=== Record and Start Playback ===") + await asyncio.gather(*[test.record_and_start_playback() for test in tests]) + self.logger.info("") + if self.per_test_args.warmup > 0: await self._run_tests(tests, self.per_test_args.warmup, "Warmup") @@ -144,10 +154,19 @@ async def start(self): title = "Test" if self.per_test_args.iterations > 1: title += " " + (i + 1) - await self._run_tests(tests, self.per_test_args.duration, title) + await self._run_tests( + tests, + self.per_test_args.duration, + title, + with_profiler=self.per_test_args.profile) except Exception as e: print("Exception: " + str(e)) finally: + if self.per_test_args.test_proxy: + self.logger.info("=== Stop Playback ===") + await asyncio.gather(*[test.stop_playback() for test in tests]) + self.logger.info("") + if not self.per_test_args.no_cleanup: self.logger.info("=== Cleanup ===") await asyncio.gather(*[test.cleanup() for test in tests]) @@ -161,7 +180,7 @@ async def start(self): finally: await asyncio.gather(*[test.close() for test in tests]) - async def _run_tests(self, tests, duration, title): + async def _run_tests(self, tests, duration, title, with_profiler=False): self._completed_operations = [0] * len(tests) self._last_completion_times = [0] * len(tests) self._last_total_operations = -1 @@ -171,13 +190,16 @@ async def _run_tests(self, tests, duration, title): if self.per_test_args.sync: threads = [] for id, test in enumerate(tests): - thread = threading.Thread(target=lambda: self._run_sync_loop(test, duration, id)) + thread = threading.Thread( + target=lambda: self._run_sync_loop(test, duration, id, with_profiler) + ) threads.append(thread) thread.start() for thread in threads: thread.join() else: - await asyncio.gather(*[self._run_async_loop(test, duration, id) for id, test in enumerate(tests)]) + tasks = [self._run_async_loop(test, duration, id, with_profiler) for id, test in enumerate(tests)] + await asyncio.gather(*tasks) status_thread.stop() @@ -196,23 +218,63 @@ async def _run_tests(self, tests, duration, title): ) self.logger.info("") - def _run_sync_loop(self, test, duration, id): + def _run_sync_loop(self, test, duration, id, with_profiler): start = time.time() runtime = 0 - while runtime < duration: - test.run_sync() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime + if with_profiler: + import cProfile + profile = None + while runtime < duration: + profile = cProfile.Profile() + profile.enable() + test.run_sync() + profile.disable() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime + + if profile: + # Store only profile for final iteration + profile_name = "{}/cProfile-{}-{}-sync.pstats".format(os.getcwd(), test.__class__.__name__, id) + print("Dumping profile data to {}".format(profile_name)) + profile.dump_stats(profile_name) + else: + print("No profile generated.") + else: + while runtime < duration: + test.run_sync() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime - async def _run_async_loop(self, test, duration, id): + async def _run_async_loop(self, test, duration, id, with_profiler): start = time.time() runtime = 0 - while runtime < duration: - await test.run_async() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime + if with_profiler: + import cProfile + profile = None + while runtime < duration: + profile = cProfile.Profile() + profile.enable() + await test.run_async() + profile.disable() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime + + if profile: + # Store only profile for final iteration + profile_name = "{}/cProfile-{}-{}-async.pstats".format(os.getcwd(), test.__class__.__name__, id) + print("Dumping profile data to {}".format(profile_name)) + profile.dump_stats(profile_name) + else: + print("No profile generated.") + else: + while runtime < duration: + await test.run_async() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime def _print_status(self, title): if self._last_total_operations == -1: diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py index 7b6a8aec6919..87531c43b14b 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py @@ -4,6 +4,9 @@ # -------------------------------------------------------------------------------------------- import os +import aiohttp + +from ._policies import PerfTestProxyPolicy class PerfStressTest: @@ -20,6 +23,22 @@ class PerfStressTest: def __init__(self, arguments): self.args = arguments + self._session = None + self._test_proxy_policy = None + self._client_kwargs = {} + self._recording_id = None + + if self.args.test_proxy: + self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) + + # SSL will be disabled for the test proxy requests, so suppress warnings + import warnings + from urllib3.exceptions import InsecureRequestWarning + warnings.simplefilter('ignore', InsecureRequestWarning) + + # Add policy to redirect requests to the test proxy + self._test_proxy_policy = PerfTestProxyPolicy(self.args.test_proxy) + self._client_kwargs['per_retry_policies'] = [self._test_proxy_policy] async def global_setup(self): return @@ -27,6 +46,34 @@ async def global_setup(self): async def global_cleanup(self): return + async def record_and_start_playback(self): + await self._start_recording() + self._test_proxy_policy.recording_id = self._recording_id + self._test_proxy_policy.mode = "record" + + # Record one call to run() + if self.args.sync: + self.run_sync() + else: + await self.run_async() + + await self._stop_recording() + await self._start_playback() + self._test_proxy_policy.recording_id = self._recording_id + self._test_proxy_policy.mode = "playback" + + async def stop_playback(self): + headers = { + "x-recording-id": self._recording_id, + "x-purge-inmemory-recording": "true" + } + url = self.args.test_proxy + "/playback/stop" + async with self._session.post(url, headers=headers) as resp: + assert resp.status == 200 + + self._test_proxy_policy.recording_id = None + self._test_proxy_policy.mode = None + async def setup(self): return @@ -34,13 +81,8 @@ async def cleanup(self): return async def close(self): - return - - def __enter__(self): - return - - def __exit__(self, exc_type, exc_value, traceback): - return + if self._session: + await self._session.close() def run_sync(self): raise Exception("run_sync must be implemented for {}".format(self.__class__.__name__)) @@ -48,6 +90,25 @@ def run_sync(self): async def run_async(self): raise Exception("run_async must be implemented for {}".format(self.__class__.__name__)) + async def _start_recording(self): + url = self.args.test_proxy + "/record/start" + async with self._session.post(url) as resp: + assert resp.status == 200 + self._recording_id = resp.headers["x-recording-id"] + + async def _stop_recording(self): + headers = {"x-recording-id": self._recording_id} + url = self.args.test_proxy + "/record/stop" + async with self._session.post(url, headers=headers) as resp: + assert resp.status == 200 + + async def _start_playback(self): + headers = {"x-recording-id": self._recording_id} + url = self.args.test_proxy + "/playback/start" + async with self._session.post(url, headers=headers) as resp: + assert resp.status == 200 + self._recording_id = resp.headers["x-recording-id"] + @staticmethod def add_arguments(parser): """