Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tools/base/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ envoy_py_library(
],
)

envoy_py_library(
"tools.base.stream",
deps = [
requirement("aiofiles"),
requirement("aiohttp"),
],
)

envoy_py_library(
"tools.base.utils",
deps = [
Expand Down
157 changes: 157 additions & 0 deletions tools/base/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,67 @@
#
# pip-compile --allow-unsafe --generate-hashes tools/base/requirements.txt
#
aiofiles==0.7.0 \
--hash=sha256:a1c4fc9b2ff81568c83e21392a82f344ea9d23da906e4f6a52662764545e19d4 \
--hash=sha256:c67a6823b5f23fcab0a2595a289cec7d8c863ffcb4322fb8cd6b90400aedfdbc
# via -r tools/base/requirements.txt
aiohttp==3.7.4.post0 \
--hash=sha256:02f46fc0e3c5ac58b80d4d56eb0a7c7d97fcef69ace9326289fb9f1955e65cfe \
--hash=sha256:0563c1b3826945eecd62186f3f5c7d31abb7391fedc893b7e2b26303b5a9f3fe \
--hash=sha256:114b281e4d68302a324dd33abb04778e8557d88947875cbf4e842c2c01a030c5 \
--hash=sha256:14762875b22d0055f05d12abc7f7d61d5fd4fe4642ce1a249abdf8c700bf1fd8 \
--hash=sha256:15492a6368d985b76a2a5fdd2166cddfea5d24e69eefed4630cbaae5c81d89bd \
--hash=sha256:17c073de315745a1510393a96e680d20af8e67e324f70b42accbd4cb3315c9fb \
--hash=sha256:209b4a8ee987eccc91e2bd3ac36adee0e53a5970b8ac52c273f7f8fd4872c94c \
--hash=sha256:230a8f7e24298dea47659251abc0fd8b3c4e38a664c59d4b89cca7f6c09c9e87 \
--hash=sha256:2e19413bf84934d651344783c9f5e22dee452e251cfd220ebadbed2d9931dbf0 \
--hash=sha256:393f389841e8f2dfc86f774ad22f00923fdee66d238af89b70ea314c4aefd290 \
--hash=sha256:3cf75f7cdc2397ed4442594b935a11ed5569961333d49b7539ea741be2cc79d5 \
--hash=sha256:3d78619672183be860b96ed96f533046ec97ca067fd46ac1f6a09cd9b7484287 \
--hash=sha256:40eced07f07a9e60e825554a31f923e8d3997cfc7fb31dbc1328c70826e04cde \
--hash=sha256:493d3299ebe5f5a7c66b9819eacdcfbbaaf1a8e84911ddffcdc48888497afecf \
--hash=sha256:4b302b45040890cea949ad092479e01ba25911a15e648429c7c5aae9650c67a8 \
--hash=sha256:515dfef7f869a0feb2afee66b957cc7bbe9ad0cdee45aec7fdc623f4ecd4fb16 \
--hash=sha256:547da6cacac20666422d4882cfcd51298d45f7ccb60a04ec27424d2f36ba3eaf \
--hash=sha256:5df68496d19f849921f05f14f31bd6ef53ad4b00245da3195048c69934521809 \
--hash=sha256:64322071e046020e8797117b3658b9c2f80e3267daec409b350b6a7a05041213 \
--hash=sha256:7615dab56bb07bff74bc865307aeb89a8bfd9941d2ef9d817b9436da3a0ea54f \
--hash=sha256:79ebfc238612123a713a457d92afb4096e2148be17df6c50fb9bf7a81c2f8013 \
--hash=sha256:7b18b97cf8ee5452fa5f4e3af95d01d84d86d32c5e2bfa260cf041749d66360b \
--hash=sha256:932bb1ea39a54e9ea27fc9232163059a0b8855256f4052e776357ad9add6f1c9 \
--hash=sha256:a00bb73540af068ca7390e636c01cbc4f644961896fa9363154ff43fd37af2f5 \
--hash=sha256:a5ca29ee66f8343ed336816c553e82d6cade48a3ad702b9ffa6125d187e2dedb \
--hash=sha256:af9aa9ef5ba1fd5b8c948bb11f44891968ab30356d65fd0cc6707d989cd521df \
--hash=sha256:bb437315738aa441251214dad17428cafda9cdc9729499f1d6001748e1d432f4 \
--hash=sha256:bdb230b4943891321e06fc7def63c7aace16095be7d9cf3b1e01be2f10fba439 \
--hash=sha256:c6e9dcb4cb338d91a73f178d866d051efe7c62a7166653a91e7d9fb18274058f \
--hash=sha256:cffe3ab27871bc3ea47df5d8f7013945712c46a3cc5a95b6bee15887f1675c22 \
--hash=sha256:d012ad7911653a906425d8473a1465caa9f8dea7fcf07b6d870397b774ea7c0f \
--hash=sha256:d9e13b33afd39ddeb377eff2c1c4f00544e191e1d1dee5b6c51ddee8ea6f0cf5 \
--hash=sha256:e4b2b334e68b18ac9817d828ba44d8fcb391f6acb398bcc5062b14b2cbeac970 \
--hash=sha256:e54962802d4b8b18b6207d4a927032826af39395a3bd9196a5af43fc4e60b009 \
--hash=sha256:f705e12750171c0ab4ef2a3c76b9a4024a62c4103e3a55dd6f99265b9bc6fcfc \
--hash=sha256:f881853d2643a29e643609da57b96d5f9c9b93f62429dcc1cbb413c7d07f0e1a \
--hash=sha256:fe60131d21b31fd1a14bd43e6bb88256f69dfc3188b3a89d736d6c71ed43ec95
# via -r tools/base/requirements.txt
async-timeout==3.0.1 \
--hash=sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f \
--hash=sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3
# via
# -r tools/base/requirements.txt
# aiohttp
attrs==21.2.0 \
--hash=sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1 \
--hash=sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb
# via
# -r tools/base/requirements.txt
# aiohttp
chardet==4.0.0 \
--hash=sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa \
--hash=sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5
# via
# -r tools/base/requirements.txt
# aiohttp
colorama==0.4.4 \
--hash=sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b \
--hash=sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2
Expand All @@ -22,6 +83,54 @@ humanfriendly==9.2 \
# via
# -r tools/base/requirements.txt
# coloredlogs
idna==3.2 \
--hash=sha256:14475042e284991034cb48e06f6851428fb14c4dc953acd9be9a5e95c7b6dd7a \
--hash=sha256:467fbad99067910785144ce333826c71fb0e63a425657295239737f7ecd125f3
# via
# -r tools/base/requirements.txt
# yarl
multidict==5.1.0 \
--hash=sha256:018132dbd8688c7a69ad89c4a3f39ea2f9f33302ebe567a879da8f4ca73f0d0a \
--hash=sha256:051012ccee979b2b06be928a6150d237aec75dd6bf2d1eeeb190baf2b05abc93 \
--hash=sha256:05c20b68e512166fddba59a918773ba002fdd77800cad9f55b59790030bab632 \
--hash=sha256:07b42215124aedecc6083f1ce6b7e5ec5b50047afa701f3442054373a6deb656 \
--hash=sha256:0e3c84e6c67eba89c2dbcee08504ba8644ab4284863452450520dad8f1e89b79 \
--hash=sha256:0e929169f9c090dae0646a011c8b058e5e5fb391466016b39d21745b48817fd7 \
--hash=sha256:1ab820665e67373de5802acae069a6a05567ae234ddb129f31d290fc3d1aa56d \
--hash=sha256:25b4e5f22d3a37ddf3effc0710ba692cfc792c2b9edfb9c05aefe823256e84d5 \
--hash=sha256:2e68965192c4ea61fff1b81c14ff712fc7dc15d2bd120602e4a3494ea6584224 \
--hash=sha256:2f1a132f1c88724674271d636e6b7351477c27722f2ed789f719f9e3545a3d26 \
--hash=sha256:37e5438e1c78931df5d3c0c78ae049092877e5e9c02dd1ff5abb9cf27a5914ea \
--hash=sha256:3a041b76d13706b7fff23b9fc83117c7b8fe8d5fe9e6be45eee72b9baa75f348 \
--hash=sha256:3a4f32116f8f72ecf2a29dabfb27b23ab7cdc0ba807e8459e59a93a9be9506f6 \
--hash=sha256:46c73e09ad374a6d876c599f2328161bcd95e280f84d2060cf57991dec5cfe76 \
--hash=sha256:46dd362c2f045095c920162e9307de5ffd0a1bfbba0a6e990b344366f55a30c1 \
--hash=sha256:4b186eb7d6ae7c06eb4392411189469e6a820da81447f46c0072a41c748ab73f \
--hash=sha256:54fd1e83a184e19c598d5e70ba508196fd0bbdd676ce159feb412a4a6664f952 \
--hash=sha256:585fd452dd7782130d112f7ddf3473ffdd521414674c33876187e101b588738a \
--hash=sha256:5cf3443199b83ed9e955f511b5b241fd3ae004e3cb81c58ec10f4fe47c7dce37 \
--hash=sha256:6a4d5ce640e37b0efcc8441caeea8f43a06addace2335bd11151bc02d2ee31f9 \
--hash=sha256:7df80d07818b385f3129180369079bd6934cf70469f99daaebfac89dca288359 \
--hash=sha256:806068d4f86cb06af37cd65821554f98240a19ce646d3cd24e1c33587f313eb8 \
--hash=sha256:830f57206cc96ed0ccf68304141fec9481a096c4d2e2831f311bde1c404401da \
--hash=sha256:929006d3c2d923788ba153ad0de8ed2e5ed39fdbe8e7be21e2f22ed06c6783d3 \
--hash=sha256:9436dc58c123f07b230383083855593550c4d301d2532045a17ccf6eca505f6d \
--hash=sha256:9dd6e9b1a913d096ac95d0399bd737e00f2af1e1594a787e00f7975778c8b2bf \
--hash=sha256:ace010325c787c378afd7f7c1ac66b26313b3344628652eacd149bdd23c68841 \
--hash=sha256:b47a43177a5e65b771b80db71e7be76c0ba23cc8aa73eeeb089ed5219cdbe27d \
--hash=sha256:b797515be8743b771aa868f83563f789bbd4b236659ba52243b735d80b29ed93 \
--hash=sha256:b7993704f1a4b204e71debe6095150d43b2ee6150fa4f44d6d966ec356a8d61f \
--hash=sha256:d5c65bdf4484872c4af3150aeebe101ba560dcfb34488d9a8ff8dbcd21079647 \
--hash=sha256:d81eddcb12d608cc08081fa88d046c78afb1bf8107e6feab5d43503fea74a635 \
--hash=sha256:dc862056f76443a0db4509116c5cd480fe1b6a2d45512a653f9a855cc0517456 \
--hash=sha256:ecc771ab628ea281517e24fd2c52e8f31c41e66652d07599ad8818abaad38cda \
--hash=sha256:f200755768dc19c6f4e2b672421e0ebb3dd54c38d5a4f262b872d8cfcc9e93b5 \
--hash=sha256:f21756997ad8ef815d8ef3d34edd98804ab5ea337feedcd62fb52d22bf531281 \
--hash=sha256:fc13a9524bc18b6fb6e0dbec3533ba0496bbed167c56d0aabefd965584557d80
# via
# -r tools/base/requirements.txt
# aiohttp
# yarl
pyyaml==5.4.1 \
--hash=sha256:08682f6b72c722394747bddaf0aa62277e02557c0fd1c42cb853016a38f8dedf \
--hash=sha256:0f5f5786c0e09baddcd8b4b45f20a7b5d61a7e7e99846e3c799b05c7c53fa696 \
Expand Down Expand Up @@ -53,10 +162,58 @@ pyyaml==5.4.1 \
--hash=sha256:fdc842473cd33f45ff6bce46aea678a54e3d21f1b61a7750ce3c498eedfe25d6 \
--hash=sha256:fe69978f3f768926cfa37b867e3843918e012cf83f680806599ddce33c2c68b0
# via -r tools/base/requirements.txt
typing-extensions==3.10.0.0 \
--hash=sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497 \
--hash=sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342 \
--hash=sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84
# via
# -r tools/base/requirements.txt
# aiohttp
verboselogs==1.7 \
--hash=sha256:d63f23bf568295b95d3530c6864a0b580cec70e7ff974177dead1e4ffbc6ff49 \
--hash=sha256:e33ddedcdfdafcb3a174701150430b11b46ceb64c2a9a26198c76a156568e427
# via -r tools/base/requirements.txt
yarl==1.6.3 \
--hash=sha256:00d7ad91b6583602eb9c1d085a2cf281ada267e9a197e8b7cae487dadbfa293e \
--hash=sha256:0355a701b3998dcd832d0dc47cc5dedf3874f966ac7f870e0f3a6788d802d434 \
--hash=sha256:15263c3b0b47968c1d90daa89f21fcc889bb4b1aac5555580d74565de6836366 \
--hash=sha256:2ce4c621d21326a4a5500c25031e102af589edb50c09b321049e388b3934eec3 \
--hash=sha256:31ede6e8c4329fb81c86706ba8f6bf661a924b53ba191b27aa5fcee5714d18ec \
--hash=sha256:324ba3d3c6fee56e2e0b0d09bf5c73824b9f08234339d2b788af65e60040c959 \
--hash=sha256:329412812ecfc94a57cd37c9d547579510a9e83c516bc069470db5f75684629e \
--hash=sha256:4736eaee5626db8d9cda9eb5282028cc834e2aeb194e0d8b50217d707e98bb5c \
--hash=sha256:4953fb0b4fdb7e08b2f3b3be80a00d28c5c8a2056bb066169de00e6501b986b6 \
--hash=sha256:4c5bcfc3ed226bf6419f7a33982fb4b8ec2e45785a0561eb99274ebbf09fdd6a \
--hash=sha256:547f7665ad50fa8563150ed079f8e805e63dd85def6674c97efd78eed6c224a6 \
--hash=sha256:5b883e458058f8d6099e4420f0cc2567989032b5f34b271c0827de9f1079a424 \
--hash=sha256:63f90b20ca654b3ecc7a8d62c03ffa46999595f0167d6450fa8383bab252987e \
--hash=sha256:68dc568889b1c13f1e4745c96b931cc94fdd0defe92a72c2b8ce01091b22e35f \
--hash=sha256:69ee97c71fee1f63d04c945f56d5d726483c4762845400a6795a3b75d56b6c50 \
--hash=sha256:6d6283d8e0631b617edf0fd726353cb76630b83a089a40933043894e7f6721e2 \
--hash=sha256:72a660bdd24497e3e84f5519e57a9ee9220b6f3ac4d45056961bf22838ce20cc \
--hash=sha256:73494d5b71099ae8cb8754f1df131c11d433b387efab7b51849e7e1e851f07a4 \
--hash=sha256:7356644cbed76119d0b6bd32ffba704d30d747e0c217109d7979a7bc36c4d970 \
--hash=sha256:8a9066529240171b68893d60dca86a763eae2139dd42f42106b03cf4b426bf10 \
--hash=sha256:8aa3decd5e0e852dc68335abf5478a518b41bf2ab2f330fe44916399efedfae0 \
--hash=sha256:97b5bdc450d63c3ba30a127d018b866ea94e65655efaf889ebeabc20f7d12406 \
--hash=sha256:9ede61b0854e267fd565e7527e2f2eb3ef8858b301319be0604177690e1a3896 \
--hash=sha256:b2e9a456c121e26d13c29251f8267541bd75e6a1ccf9e859179701c36a078643 \
--hash=sha256:b5dfc9a40c198334f4f3f55880ecf910adebdcb2a0b9a9c23c9345faa9185721 \
--hash=sha256:bafb450deef6861815ed579c7a6113a879a6ef58aed4c3a4be54400ae8871478 \
--hash=sha256:c49ff66d479d38ab863c50f7bb27dee97c6627c5fe60697de15529da9c3de724 \
--hash=sha256:ce3beb46a72d9f2190f9e1027886bfc513702d748047b548b05dab7dfb584d2e \
--hash=sha256:d26608cf178efb8faa5ff0f2d2e77c208f471c5a3709e577a7b3fd0445703ac8 \
--hash=sha256:d597767fcd2c3dc49d6eea360c458b65643d1e4dbed91361cf5e36e53c1f8c96 \
--hash=sha256:d5c32c82990e4ac4d8150fd7652b972216b204de4e83a122546dce571c1bdf25 \
--hash=sha256:d8d07d102f17b68966e2de0e07bfd6e139c7c02ef06d3a0f8d2f0f055e13bb76 \
--hash=sha256:e46fba844f4895b36f4c398c5af062a9808d1f26b2999c58909517384d5deda2 \
--hash=sha256:e6b5460dc5ad42ad2b36cca524491dfcaffbfd9c8df50508bddc354e787b8dc2 \
--hash=sha256:f040bcc6725c821a4c0665f3aa96a4d0805a7aaf2caf266d256b8ed71b9f041c \
--hash=sha256:f0b059678fd549c66b89bed03efcabb009075bd131c248ecdf087bdb6faba24a \
--hash=sha256:fcbb48a93e8699eae920f8d92f7160c03567b421bc17362a9ffbbd706a816f71
# via
# -r tools/base/requirements.txt
# aiohttp

# The following packages are considered to be unsafe in a requirements file:
setuptools==57.4.0 \
Expand Down
97 changes: 97 additions & 0 deletions tools/base/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pathlib
from contextlib import asynccontextmanager
from typing import AsyncGenerator, AsyncIterator, Optional, Union

import aiofiles
from aiofiles.threadpool.binary import AsyncBufferedIOBase, AsyncBufferedReader
import aiohttp


class AsyncStream:
# 16k seems to offer a good balance of performance/speed
default_chunk_size: int = 1024 * 16

def __init__(self, buffer: AsyncBufferedIOBase, chunk_size: Optional[int] = None):
self._buffer = buffer
self._chunk_size = chunk_size

@property
def buffer(self) -> AsyncBufferedIOBase:
return self._buffer

@property
def chunk_size(self) -> int:
return self._chunk_size or self.default_chunk_size


class AsyncStreamReader(AsyncStream):
"""This wraps an `AsyncBufferedReader` with a `__len__`

This is useful if you want to a pass an `AsyncBufferedReader`
as the body data to an `HTTP` stream, but the `HTTP` client
wants to send a `Content-Length` header, based on the `len()`
of the data.

As the file's size can be gleaned from the OS `stat_size`,
this can be set ahead of reading chunks from the file.

This allows large file uploads to use little or no additional
memory while uploading with aiohttp.
"""

def __init__(self, *args, size: Optional[int] = None, **kwargs):
self._size = size
super().__init__(*args, **kwargs)

async def __aiter__(self) -> AsyncGenerator[bytes, AsyncBufferedReader]:
while True:
chunk = await self.buffer.read(self.chunk_size)
if not chunk:
break
yield chunk

def __len__(self) -> int:
if self.size is None:
raise TypeError(
f"object of type '{self.__class__.__name__}' with no 'size' cannot get len()")
return self.size

@property
def size(self) -> Optional[int]:
return self._size


class AsyncStreamWriter(AsyncStream):
"""This wraps an async file object and provides a `stream_bytes` method to
stream an `aiohttp.ClientResponse` to the file.

It makes use of aiohttp's stream buffering to download chunks, and then
writes the chunks to disk asynchronously.

This allows large file downloads to use little or no additional
memory while downloading with aiohttp.
"""

async def stream_bytes(self, response: aiohttp.ClientResponse) -> None:
"""Stream chunks from an `aiohttp.ClientResponse` to an async
file object.
"""
# This is kinda aiohttp specific, we can make this more generic
# and then adapt to aiohttp if we find the need
async for chunk in response.content.iter_chunked(self.chunk_size):
await self.buffer.write(chunk)


@asynccontextmanager
async def async_reader(path: Union[str, pathlib.Path],
chunk_size: Optional[int] = None) -> AsyncIterator[AsyncStreamReader]:
path = pathlib.Path(path)
async with aiofiles.open(path, "rb") as f:
yield AsyncStreamReader(f, chunk_size=chunk_size, size=path.stat().st_size)


@asynccontextmanager
async def async_writer(path: Union[str, pathlib.Path],
chunk_size: Optional[int] = None) -> AsyncIterator[AsyncStreamWriter]:
async with aiofiles.open(path, "wb") as f:
yield AsyncStreamWriter(f, chunk_size=chunk_size)
Loading