From febffe44b96622552bde2b365de3744009a870cc Mon Sep 17 00:00:00 2001 From: swathipil Date: Wed, 6 Mar 2024 16:11:24 -0800 Subject: [PATCH 1/4] add async iterator random stream for async httpx --- .../_async_iterator_random_stream.py | 71 +++++++++++++++++++ .../tests/perf_tests/upload_binary.py | 6 +- 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py diff --git a/sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py b/sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py new file mode 100644 index 000000000000..a37c8934298c --- /dev/null +++ b/sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py @@ -0,0 +1,71 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + + +from devtools_testutils.perfstress_tests._random_stream import get_random_bytes, _DEFAULT_LENGTH + + +# The AsyncIteratorRandomStream is used specifically for the AsyncHttpXTransport upload stream scenario, since the +# async httpx transport requires the request body stream to be type AsyncIterator (i.e. have an __aiter__ and __anext__ method rather than __iter__). +# Specific check in httpx here: https://github.com/encode/httpx/blob/7df47ce4d93a06f2c3310cd692b4c2336d7663ba/httpx/_content.py#L116. + +class AsyncIteratorRandomStream(): + def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH): + super().__init__() + self._base_data = get_random_bytes(initial_buffer_length) + self._data_length = length + self._base_buffer_length = initial_buffer_length + self._position = 0 + self._remaining = length + self._closed = False + + def __len__(self): + return self._remaining + + def __aiter__(self): + return self + + async def __anext__(self): + if self._remaining == 0: + raise StopAsyncIteration + return self.read() + + def reset(self): + self._position = 0 + self._remaining = self._data_length + self._closed = False + + def read(self, size=None): + if self._remaining == 0: + return b"" + + if size is None: + e = self._base_buffer_length + else: + e = size + e = min(e, self._remaining) + if e > self._base_buffer_length: + self._base_data = get_random_bytes(e) + self._base_buffer_length = e + self._remaining = self._remaining - e + self._position += e + return self._base_data[:e] + + def seek(self, index, whence=0): + if whence == 0: + self._position = index + elif whence == 1: + self._position = self._position + index + elif whence == 2: + self._position = self._data_length - 1 + index + + def tell(self): + return self._position + + def remaining(self): + return self._remaining + + def close(self): + self._closed = True diff --git a/sdk/core/corehttp/tests/perf_tests/upload_binary.py b/sdk/core/corehttp/tests/perf_tests/upload_binary.py index 2603af67b310..a7156a996e0c 100644 --- a/sdk/core/corehttp/tests/perf_tests/upload_binary.py +++ b/sdk/core/corehttp/tests/perf_tests/upload_binary.py @@ -6,6 +6,7 @@ from time import time from wsgiref.handlers import format_date_time from devtools_testutils.perfstress_tests import RandomStream, AsyncRandomStream +from ._async_iterator_random_stream import AsyncIteratorRandomStream from corehttp.rest import HttpRequest from corehttp.exceptions import ( @@ -29,7 +30,10 @@ def __init__(self, arguments): blob_name = "uploadtest" self.blob_endpoint = f"{self.account_endpoint}{self.container_name}/{blob_name}" self.upload_stream = RandomStream(self.args.size) - self.upload_stream_async = AsyncRandomStream(self.args.size) + if self.args.transport == "httpx": + self.upload_stream_async = AsyncIteratorRandomStream(self.args.size) + else: + self.upload_stream_async = AsyncRandomStream(self.args.size) def run_sync(self): self.upload_stream.reset() From f3d797b0aadc4bea221ba4324b95aea395421ce8 Mon Sep 17 00:00:00 2001 From: swathipil Date: Wed, 6 Mar 2024 16:11:46 -0800 Subject: [PATCH 2/4] update perf tests with same args for easier comparison of perf --- sdk/core/azure-core/perf-tests.yml | 6 +++--- sdk/core/corehttp/perf-tests.yml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/core/azure-core/perf-tests.yml b/sdk/core/azure-core/perf-tests.yml index a20e66ab2ac5..6a50b1d710d0 100644 --- a/sdk/core/azure-core/perf-tests.yml +++ b/sdk/core/azure-core/perf-tests.yml @@ -14,8 +14,8 @@ Tests: Arguments: - --size 1024 --parallel 64 --duration 60 --policies all - --size 1024 --parallel 64 --duration 60 --policies all --use-entra-id - - --size 10240 --parallel 32 --duration 60 - - --size 10240 --parallel 32 --duration 60 --transport requests + - --size 10240 --parallel 64 --duration 60 + - --size 10240 --parallel 64 --duration 60 --transport requests - Test: download-binary Class: DownloadBinaryDataTest @@ -23,7 +23,7 @@ Tests: - --size 1024 --parallel 64 --duration 60 - --size 1024 --parallel 64 --duration 60 --transport requests - --size 1024 --parallel 64 --duration 60 --use-entra-id - - --size 10240 --parallel 32 --duration 60 --policies all + - --size 10240 --parallel 64 --duration 60 --policies all - Test: update-entity Class: UpdateEntityJSONTest diff --git a/sdk/core/corehttp/perf-tests.yml b/sdk/core/corehttp/perf-tests.yml index 0a4b09aa61b9..e3a6ba376b4c 100644 --- a/sdk/core/corehttp/perf-tests.yml +++ b/sdk/core/corehttp/perf-tests.yml @@ -14,8 +14,8 @@ Tests: Arguments: - --size 1024 --parallel 64 --duration 60 --policies all - --size 1024 --parallel 64 --duration 60 --policies all --use-entra-id - - --size 10240 --parallel 32 --duration 60 - - --size 10240 --parallel 32 --duration 60 --transport httpx + - --size 10240 --parallel 64 --duration 60 + - --size 10240 --parallel 64 --duration 60 --transport httpx - Test: download-binary Class: DownloadBinaryDataTest @@ -23,7 +23,7 @@ Tests: - --size 1024 --parallel 64 --duration 60 - --size 1024 --parallel 64 --duration 60 --transport httpx - --size 1024 --parallel 64 --duration 60 --use-entra-id - - --size 10240 --parallel 32 --duration 60 --policies all + - --size 10240 --parallel 64 --duration 60 --policies all - Test: update-entity Class: UpdateEntityJSONTest From 4e3bc5932abcc0a38568df551d672a56fae624e5 Mon Sep 17 00:00:00 2001 From: swathipil Date: Thu, 7 Mar 2024 09:31:02 -0800 Subject: [PATCH 3/4] pauls comments --- .../_async_iterator_random_stream.py | 71 ------------------- .../tests/perf_tests/upload_binary.py | 7 +- .../perfstress_tests/__init__.py | 3 +- .../perfstress_tests/_async_random_stream.py | 45 ++++++++++++ 4 files changed, 52 insertions(+), 74 deletions(-) delete mode 100644 sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py diff --git a/sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py b/sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py deleted file mode 100644 index a37c8934298c..000000000000 --- a/sdk/core/corehttp/tests/perf_tests/_async_iterator_random_stream.py +++ /dev/null @@ -1,71 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - - -from devtools_testutils.perfstress_tests._random_stream import get_random_bytes, _DEFAULT_LENGTH - - -# The AsyncIteratorRandomStream is used specifically for the AsyncHttpXTransport upload stream scenario, since the -# async httpx transport requires the request body stream to be type AsyncIterator (i.e. have an __aiter__ and __anext__ method rather than __iter__). -# Specific check in httpx here: https://github.com/encode/httpx/blob/7df47ce4d93a06f2c3310cd692b4c2336d7663ba/httpx/_content.py#L116. - -class AsyncIteratorRandomStream(): - def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH): - super().__init__() - self._base_data = get_random_bytes(initial_buffer_length) - self._data_length = length - self._base_buffer_length = initial_buffer_length - self._position = 0 - self._remaining = length - self._closed = False - - def __len__(self): - return self._remaining - - def __aiter__(self): - return self - - async def __anext__(self): - if self._remaining == 0: - raise StopAsyncIteration - return self.read() - - def reset(self): - self._position = 0 - self._remaining = self._data_length - self._closed = False - - def read(self, size=None): - if self._remaining == 0: - return b"" - - if size is None: - e = self._base_buffer_length - else: - e = size - e = min(e, self._remaining) - if e > self._base_buffer_length: - self._base_data = get_random_bytes(e) - self._base_buffer_length = e - self._remaining = self._remaining - e - self._position += e - return self._base_data[:e] - - def seek(self, index, whence=0): - if whence == 0: - self._position = index - elif whence == 1: - self._position = self._position + index - elif whence == 2: - self._position = self._data_length - 1 + index - - def tell(self): - return self._position - - def remaining(self): - return self._remaining - - def close(self): - self._closed = True diff --git a/sdk/core/corehttp/tests/perf_tests/upload_binary.py b/sdk/core/corehttp/tests/perf_tests/upload_binary.py index a7156a996e0c..d83a0b3127b8 100644 --- a/sdk/core/corehttp/tests/perf_tests/upload_binary.py +++ b/sdk/core/corehttp/tests/perf_tests/upload_binary.py @@ -5,8 +5,7 @@ from time import time from wsgiref.handlers import format_date_time -from devtools_testutils.perfstress_tests import RandomStream, AsyncRandomStream -from ._async_iterator_random_stream import AsyncIteratorRandomStream +from devtools_testutils.perfstress_tests import RandomStream, AsyncRandomStream, AsyncIteratorRandomStream from corehttp.rest import HttpRequest from corehttp.exceptions import ( @@ -30,6 +29,10 @@ def __init__(self, arguments): blob_name = "uploadtest" self.blob_endpoint = f"{self.account_endpoint}{self.container_name}/{blob_name}" self.upload_stream = RandomStream(self.args.size) + + # The AsyncIteratorRandomStream is used for upload stream scenario, since the + # async httpx transport requires the request body stream to be type AsyncIterator (i.e. have an __aiter__ and __anext__ method rather than __iter__). + # Specific check in httpx here: https://github.com/encode/httpx/blob/7df47ce4d93a06f2c3310cd692b4c2336d7663ba/httpx/_content.py#L116. if self.args.transport == "httpx": self.upload_stream_async = AsyncIteratorRandomStream(self.args.size) else: diff --git a/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/__init__.py b/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/__init__.py index fc62d8e08df8..2556fca820a2 100644 --- a/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/__init__.py +++ b/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/__init__.py @@ -9,7 +9,7 @@ from ._perf_stress_runner import _PerfStressRunner from ._perf_stress_test import PerfStressTest from ._random_stream import RandomStream, WriteStream, get_random_bytes -from ._async_random_stream import AsyncRandomStream +from ._async_random_stream import AsyncRandomStream, AsyncIteratorRandomStream from ._batch_perf_test import BatchPerfTest from ._event_perf_test import EventPerfTest @@ -19,6 +19,7 @@ "EventPerfTest", "RandomStream", "WriteStream", + "AsyncIteratorRandomStream", "AsyncRandomStream", "get_random_bytes" ] diff --git a/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py b/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py index ce8be60731df..2595d00fc3c3 100644 --- a/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py +++ b/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +from typing import AsyncIterator from io import BytesIO from ._random_stream import get_random_bytes, _DEFAULT_LENGTH @@ -58,3 +59,47 @@ def remaining(self): def close(self): self._closed = True + + +class AsyncIteratorRandomStream(AsyncIterator[bytes]): + """ + Async random stream of bytes for methods that accept AsyncIterator as input. + """ + def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH): + super().__init__() + self._base_data = get_random_bytes(initial_buffer_length) + self._data_length = length + self._base_buffer_length = initial_buffer_length + self._position = 0 + self._remaining = length + + def __len__(self): + return self._remaining + + def __aiter__(self): + return self + + async def __anext__(self): + if self._remaining == 0: + raise StopAsyncIteration + return self.read() + + def reset(self): + self._position = 0 + self._remaining = self._data_length + + def read(self, size=None): + if self._remaining == 0: + return b"" + + if size is None: + e = self._base_buffer_length + else: + e = size + e = min(e, self._remaining) + if e > self._base_buffer_length: + self._base_data = get_random_bytes(e) + self._base_buffer_length = e + self._remaining = self._remaining - e + self._position += e + return self._base_data[:e] From d6b0e202cf2ef2fb7aa10ffe4df298ccdf9957d8 Mon Sep 17 00:00:00 2001 From: swathipil Date: Thu, 7 Mar 2024 12:46:40 -0800 Subject: [PATCH 4/4] update asynciterator comment to iterable --- sdk/core/corehttp/tests/perf_tests/upload_binary.py | 2 +- .../devtools_testutils/perfstress_tests/_async_random_stream.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/core/corehttp/tests/perf_tests/upload_binary.py b/sdk/core/corehttp/tests/perf_tests/upload_binary.py index d83a0b3127b8..2bd341ff15a1 100644 --- a/sdk/core/corehttp/tests/perf_tests/upload_binary.py +++ b/sdk/core/corehttp/tests/perf_tests/upload_binary.py @@ -31,7 +31,7 @@ def __init__(self, arguments): self.upload_stream = RandomStream(self.args.size) # The AsyncIteratorRandomStream is used for upload stream scenario, since the - # async httpx transport requires the request body stream to be type AsyncIterator (i.e. have an __aiter__ and __anext__ method rather than __iter__). + # async httpx transport requires the request body stream to be type AsyncIterable (i.e. have an __aiter__ method rather than __iter__). # Specific check in httpx here: https://github.com/encode/httpx/blob/7df47ce4d93a06f2c3310cd692b4c2336d7663ba/httpx/_content.py#L116. if self.args.transport == "httpx": self.upload_stream_async = AsyncIteratorRandomStream(self.args.size) diff --git a/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py b/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py index 2595d00fc3c3..ee9e2ab8e0ac 100644 --- a/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py +++ b/tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_async_random_stream.py @@ -66,7 +66,6 @@ class AsyncIteratorRandomStream(AsyncIterator[bytes]): Async random stream of bytes for methods that accept AsyncIterator as input. """ def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH): - super().__init__() self._base_data = get_random_bytes(initial_buffer_length) self._data_length = length self._base_buffer_length = initial_buffer_length