From 68760ee0a23db77ae186a5bb6520f48111d65f23 Mon Sep 17 00:00:00 2001 From: Ryan Northey Date: Fri, 20 Aug 2021 06:56:45 +0100 Subject: [PATCH 1/5] tooling: Add async buffered stream reader/writer for aiohttp This makes it easier to stream files up/down with minimal memory footprint. Signed-off-by: Ryan Northey --- tools/base/BUILD | 8 ++ tools/base/requirements.txt | 157 +++++++++++++++++++++++++ tools/base/stream.py | 97 ++++++++++++++++ tools/base/tests/test_stream.py | 197 ++++++++++++++++++++++++++++++++ 4 files changed, 459 insertions(+) create mode 100644 tools/base/stream.py create mode 100644 tools/base/tests/test_stream.py diff --git a/tools/base/BUILD b/tools/base/BUILD index c7c1de9dcbe33..07b0b0e8ad22a 100644 --- a/tools/base/BUILD +++ b/tools/base/BUILD @@ -34,6 +34,14 @@ envoy_py_library( ], ) +envoy_py_library( + "tools.base.stream", + deps = [ + requirement("aiofiles"), + requirement("aiohttp"), + ], +) + envoy_py_library( "tools.base.utils", deps = [ diff --git a/tools/base/requirements.txt b/tools/base/requirements.txt index 20ae6d1dc142d..b85d8248dad5b 100644 --- a/tools/base/requirements.txt +++ b/tools/base/requirements.txt @@ -4,6 +4,67 @@ # # pip-compile --allow-unsafe --generate-hashes tools/base/requirements.txt # +aiofiles==0.7.0 \ + --hash=sha256:a1c4fc9b2ff81568c83e21392a82f344ea9d23da906e4f6a52662764545e19d4 \ + --hash=sha256:c67a6823b5f23fcab0a2595a289cec7d8c863ffcb4322fb8cd6b90400aedfdbc + # via -r tools/base/requirements.txt +aiohttp==3.7.4.post0 \ + --hash=sha256:02f46fc0e3c5ac58b80d4d56eb0a7c7d97fcef69ace9326289fb9f1955e65cfe \ + --hash=sha256:0563c1b3826945eecd62186f3f5c7d31abb7391fedc893b7e2b26303b5a9f3fe \ + --hash=sha256:114b281e4d68302a324dd33abb04778e8557d88947875cbf4e842c2c01a030c5 \ + --hash=sha256:14762875b22d0055f05d12abc7f7d61d5fd4fe4642ce1a249abdf8c700bf1fd8 \ + --hash=sha256:15492a6368d985b76a2a5fdd2166cddfea5d24e69eefed4630cbaae5c81d89bd \ + --hash=sha256:17c073de315745a1510393a96e680d20af8e67e324f70b42accbd4cb3315c9fb \ + --hash=sha256:209b4a8ee987eccc91e2bd3ac36adee0e53a5970b8ac52c273f7f8fd4872c94c \ + --hash=sha256:230a8f7e24298dea47659251abc0fd8b3c4e38a664c59d4b89cca7f6c09c9e87 \ + --hash=sha256:2e19413bf84934d651344783c9f5e22dee452e251cfd220ebadbed2d9931dbf0 \ + --hash=sha256:393f389841e8f2dfc86f774ad22f00923fdee66d238af89b70ea314c4aefd290 \ + --hash=sha256:3cf75f7cdc2397ed4442594b935a11ed5569961333d49b7539ea741be2cc79d5 \ + --hash=sha256:3d78619672183be860b96ed96f533046ec97ca067fd46ac1f6a09cd9b7484287 \ + --hash=sha256:40eced07f07a9e60e825554a31f923e8d3997cfc7fb31dbc1328c70826e04cde \ + --hash=sha256:493d3299ebe5f5a7c66b9819eacdcfbbaaf1a8e84911ddffcdc48888497afecf \ + --hash=sha256:4b302b45040890cea949ad092479e01ba25911a15e648429c7c5aae9650c67a8 \ + --hash=sha256:515dfef7f869a0feb2afee66b957cc7bbe9ad0cdee45aec7fdc623f4ecd4fb16 \ + --hash=sha256:547da6cacac20666422d4882cfcd51298d45f7ccb60a04ec27424d2f36ba3eaf \ + --hash=sha256:5df68496d19f849921f05f14f31bd6ef53ad4b00245da3195048c69934521809 \ + --hash=sha256:64322071e046020e8797117b3658b9c2f80e3267daec409b350b6a7a05041213 \ + --hash=sha256:7615dab56bb07bff74bc865307aeb89a8bfd9941d2ef9d817b9436da3a0ea54f \ + --hash=sha256:79ebfc238612123a713a457d92afb4096e2148be17df6c50fb9bf7a81c2f8013 \ + --hash=sha256:7b18b97cf8ee5452fa5f4e3af95d01d84d86d32c5e2bfa260cf041749d66360b \ + --hash=sha256:932bb1ea39a54e9ea27fc9232163059a0b8855256f4052e776357ad9add6f1c9 \ + --hash=sha256:a00bb73540af068ca7390e636c01cbc4f644961896fa9363154ff43fd37af2f5 \ + --hash=sha256:a5ca29ee66f8343ed336816c553e82d6cade48a3ad702b9ffa6125d187e2dedb \ + --hash=sha256:af9aa9ef5ba1fd5b8c948bb11f44891968ab30356d65fd0cc6707d989cd521df \ + --hash=sha256:bb437315738aa441251214dad17428cafda9cdc9729499f1d6001748e1d432f4 \ + --hash=sha256:bdb230b4943891321e06fc7def63c7aace16095be7d9cf3b1e01be2f10fba439 \ + --hash=sha256:c6e9dcb4cb338d91a73f178d866d051efe7c62a7166653a91e7d9fb18274058f \ + --hash=sha256:cffe3ab27871bc3ea47df5d8f7013945712c46a3cc5a95b6bee15887f1675c22 \ + --hash=sha256:d012ad7911653a906425d8473a1465caa9f8dea7fcf07b6d870397b774ea7c0f \ + --hash=sha256:d9e13b33afd39ddeb377eff2c1c4f00544e191e1d1dee5b6c51ddee8ea6f0cf5 \ + --hash=sha256:e4b2b334e68b18ac9817d828ba44d8fcb391f6acb398bcc5062b14b2cbeac970 \ + --hash=sha256:e54962802d4b8b18b6207d4a927032826af39395a3bd9196a5af43fc4e60b009 \ + --hash=sha256:f705e12750171c0ab4ef2a3c76b9a4024a62c4103e3a55dd6f99265b9bc6fcfc \ + --hash=sha256:f881853d2643a29e643609da57b96d5f9c9b93f62429dcc1cbb413c7d07f0e1a \ + --hash=sha256:fe60131d21b31fd1a14bd43e6bb88256f69dfc3188b3a89d736d6c71ed43ec95 + # via -r tools/base/requirements.txt +async-timeout==3.0.1 \ + --hash=sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f \ + --hash=sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3 + # via + # -r tools/base/requirements.txt + # aiohttp +attrs==21.2.0 \ + --hash=sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1 \ + --hash=sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb + # via + # -r tools/base/requirements.txt + # aiohttp +chardet==4.0.0 \ + --hash=sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa \ + --hash=sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5 + # via + # -r tools/base/requirements.txt + # aiohttp colorama==0.4.4 \ --hash=sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b \ --hash=sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2 @@ -22,6 +83,54 @@ humanfriendly==9.2 \ # via # -r tools/base/requirements.txt # coloredlogs +idna==3.2 \ + --hash=sha256:14475042e284991034cb48e06f6851428fb14c4dc953acd9be9a5e95c7b6dd7a \ + --hash=sha256:467fbad99067910785144ce333826c71fb0e63a425657295239737f7ecd125f3 + # via + # -r tools/base/requirements.txt + # yarl +multidict==5.1.0 \ + --hash=sha256:018132dbd8688c7a69ad89c4a3f39ea2f9f33302ebe567a879da8f4ca73f0d0a \ + --hash=sha256:051012ccee979b2b06be928a6150d237aec75dd6bf2d1eeeb190baf2b05abc93 \ + --hash=sha256:05c20b68e512166fddba59a918773ba002fdd77800cad9f55b59790030bab632 \ + --hash=sha256:07b42215124aedecc6083f1ce6b7e5ec5b50047afa701f3442054373a6deb656 \ + --hash=sha256:0e3c84e6c67eba89c2dbcee08504ba8644ab4284863452450520dad8f1e89b79 \ + --hash=sha256:0e929169f9c090dae0646a011c8b058e5e5fb391466016b39d21745b48817fd7 \ + --hash=sha256:1ab820665e67373de5802acae069a6a05567ae234ddb129f31d290fc3d1aa56d \ + --hash=sha256:25b4e5f22d3a37ddf3effc0710ba692cfc792c2b9edfb9c05aefe823256e84d5 \ + --hash=sha256:2e68965192c4ea61fff1b81c14ff712fc7dc15d2bd120602e4a3494ea6584224 \ + --hash=sha256:2f1a132f1c88724674271d636e6b7351477c27722f2ed789f719f9e3545a3d26 \ + --hash=sha256:37e5438e1c78931df5d3c0c78ae049092877e5e9c02dd1ff5abb9cf27a5914ea \ + --hash=sha256:3a041b76d13706b7fff23b9fc83117c7b8fe8d5fe9e6be45eee72b9baa75f348 \ + --hash=sha256:3a4f32116f8f72ecf2a29dabfb27b23ab7cdc0ba807e8459e59a93a9be9506f6 \ + --hash=sha256:46c73e09ad374a6d876c599f2328161bcd95e280f84d2060cf57991dec5cfe76 \ + --hash=sha256:46dd362c2f045095c920162e9307de5ffd0a1bfbba0a6e990b344366f55a30c1 \ + --hash=sha256:4b186eb7d6ae7c06eb4392411189469e6a820da81447f46c0072a41c748ab73f \ + --hash=sha256:54fd1e83a184e19c598d5e70ba508196fd0bbdd676ce159feb412a4a6664f952 \ + --hash=sha256:585fd452dd7782130d112f7ddf3473ffdd521414674c33876187e101b588738a \ + --hash=sha256:5cf3443199b83ed9e955f511b5b241fd3ae004e3cb81c58ec10f4fe47c7dce37 \ + --hash=sha256:6a4d5ce640e37b0efcc8441caeea8f43a06addace2335bd11151bc02d2ee31f9 \ + --hash=sha256:7df80d07818b385f3129180369079bd6934cf70469f99daaebfac89dca288359 \ + --hash=sha256:806068d4f86cb06af37cd65821554f98240a19ce646d3cd24e1c33587f313eb8 \ + --hash=sha256:830f57206cc96ed0ccf68304141fec9481a096c4d2e2831f311bde1c404401da \ + --hash=sha256:929006d3c2d923788ba153ad0de8ed2e5ed39fdbe8e7be21e2f22ed06c6783d3 \ + --hash=sha256:9436dc58c123f07b230383083855593550c4d301d2532045a17ccf6eca505f6d \ + --hash=sha256:9dd6e9b1a913d096ac95d0399bd737e00f2af1e1594a787e00f7975778c8b2bf \ + --hash=sha256:ace010325c787c378afd7f7c1ac66b26313b3344628652eacd149bdd23c68841 \ + --hash=sha256:b47a43177a5e65b771b80db71e7be76c0ba23cc8aa73eeeb089ed5219cdbe27d \ + --hash=sha256:b797515be8743b771aa868f83563f789bbd4b236659ba52243b735d80b29ed93 \ + --hash=sha256:b7993704f1a4b204e71debe6095150d43b2ee6150fa4f44d6d966ec356a8d61f \ + --hash=sha256:d5c65bdf4484872c4af3150aeebe101ba560dcfb34488d9a8ff8dbcd21079647 \ + --hash=sha256:d81eddcb12d608cc08081fa88d046c78afb1bf8107e6feab5d43503fea74a635 \ + --hash=sha256:dc862056f76443a0db4509116c5cd480fe1b6a2d45512a653f9a855cc0517456 \ + --hash=sha256:ecc771ab628ea281517e24fd2c52e8f31c41e66652d07599ad8818abaad38cda \ + --hash=sha256:f200755768dc19c6f4e2b672421e0ebb3dd54c38d5a4f262b872d8cfcc9e93b5 \ + --hash=sha256:f21756997ad8ef815d8ef3d34edd98804ab5ea337feedcd62fb52d22bf531281 \ + --hash=sha256:fc13a9524bc18b6fb6e0dbec3533ba0496bbed167c56d0aabefd965584557d80 + # via + # -r tools/base/requirements.txt + # aiohttp + # yarl pyyaml==5.4.1 \ --hash=sha256:08682f6b72c722394747bddaf0aa62277e02557c0fd1c42cb853016a38f8dedf \ --hash=sha256:0f5f5786c0e09baddcd8b4b45f20a7b5d61a7e7e99846e3c799b05c7c53fa696 \ @@ -53,10 +162,58 @@ pyyaml==5.4.1 \ --hash=sha256:fdc842473cd33f45ff6bce46aea678a54e3d21f1b61a7750ce3c498eedfe25d6 \ --hash=sha256:fe69978f3f768926cfa37b867e3843918e012cf83f680806599ddce33c2c68b0 # via -r tools/base/requirements.txt +typing-extensions==3.10.0.0 \ + --hash=sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497 \ + --hash=sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342 \ + --hash=sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84 + # via + # -r tools/base/requirements.txt + # aiohttp verboselogs==1.7 \ --hash=sha256:d63f23bf568295b95d3530c6864a0b580cec70e7ff974177dead1e4ffbc6ff49 \ --hash=sha256:e33ddedcdfdafcb3a174701150430b11b46ceb64c2a9a26198c76a156568e427 # via -r tools/base/requirements.txt +yarl==1.6.3 \ + --hash=sha256:00d7ad91b6583602eb9c1d085a2cf281ada267e9a197e8b7cae487dadbfa293e \ + --hash=sha256:0355a701b3998dcd832d0dc47cc5dedf3874f966ac7f870e0f3a6788d802d434 \ + --hash=sha256:15263c3b0b47968c1d90daa89f21fcc889bb4b1aac5555580d74565de6836366 \ + --hash=sha256:2ce4c621d21326a4a5500c25031e102af589edb50c09b321049e388b3934eec3 \ + --hash=sha256:31ede6e8c4329fb81c86706ba8f6bf661a924b53ba191b27aa5fcee5714d18ec \ + --hash=sha256:324ba3d3c6fee56e2e0b0d09bf5c73824b9f08234339d2b788af65e60040c959 \ + --hash=sha256:329412812ecfc94a57cd37c9d547579510a9e83c516bc069470db5f75684629e \ + --hash=sha256:4736eaee5626db8d9cda9eb5282028cc834e2aeb194e0d8b50217d707e98bb5c \ + --hash=sha256:4953fb0b4fdb7e08b2f3b3be80a00d28c5c8a2056bb066169de00e6501b986b6 \ + --hash=sha256:4c5bcfc3ed226bf6419f7a33982fb4b8ec2e45785a0561eb99274ebbf09fdd6a \ + --hash=sha256:547f7665ad50fa8563150ed079f8e805e63dd85def6674c97efd78eed6c224a6 \ + --hash=sha256:5b883e458058f8d6099e4420f0cc2567989032b5f34b271c0827de9f1079a424 \ + --hash=sha256:63f90b20ca654b3ecc7a8d62c03ffa46999595f0167d6450fa8383bab252987e \ + --hash=sha256:68dc568889b1c13f1e4745c96b931cc94fdd0defe92a72c2b8ce01091b22e35f \ + --hash=sha256:69ee97c71fee1f63d04c945f56d5d726483c4762845400a6795a3b75d56b6c50 \ + --hash=sha256:6d6283d8e0631b617edf0fd726353cb76630b83a089a40933043894e7f6721e2 \ + --hash=sha256:72a660bdd24497e3e84f5519e57a9ee9220b6f3ac4d45056961bf22838ce20cc \ + --hash=sha256:73494d5b71099ae8cb8754f1df131c11d433b387efab7b51849e7e1e851f07a4 \ + --hash=sha256:7356644cbed76119d0b6bd32ffba704d30d747e0c217109d7979a7bc36c4d970 \ + --hash=sha256:8a9066529240171b68893d60dca86a763eae2139dd42f42106b03cf4b426bf10 \ + --hash=sha256:8aa3decd5e0e852dc68335abf5478a518b41bf2ab2f330fe44916399efedfae0 \ + --hash=sha256:97b5bdc450d63c3ba30a127d018b866ea94e65655efaf889ebeabc20f7d12406 \ + --hash=sha256:9ede61b0854e267fd565e7527e2f2eb3ef8858b301319be0604177690e1a3896 \ + --hash=sha256:b2e9a456c121e26d13c29251f8267541bd75e6a1ccf9e859179701c36a078643 \ + --hash=sha256:b5dfc9a40c198334f4f3f55880ecf910adebdcb2a0b9a9c23c9345faa9185721 \ + --hash=sha256:bafb450deef6861815ed579c7a6113a879a6ef58aed4c3a4be54400ae8871478 \ + --hash=sha256:c49ff66d479d38ab863c50f7bb27dee97c6627c5fe60697de15529da9c3de724 \ + --hash=sha256:ce3beb46a72d9f2190f9e1027886bfc513702d748047b548b05dab7dfb584d2e \ + --hash=sha256:d26608cf178efb8faa5ff0f2d2e77c208f471c5a3709e577a7b3fd0445703ac8 \ + --hash=sha256:d597767fcd2c3dc49d6eea360c458b65643d1e4dbed91361cf5e36e53c1f8c96 \ + --hash=sha256:d5c32c82990e4ac4d8150fd7652b972216b204de4e83a122546dce571c1bdf25 \ + --hash=sha256:d8d07d102f17b68966e2de0e07bfd6e139c7c02ef06d3a0f8d2f0f055e13bb76 \ + --hash=sha256:e46fba844f4895b36f4c398c5af062a9808d1f26b2999c58909517384d5deda2 \ + --hash=sha256:e6b5460dc5ad42ad2b36cca524491dfcaffbfd9c8df50508bddc354e787b8dc2 \ + --hash=sha256:f040bcc6725c821a4c0665f3aa96a4d0805a7aaf2caf266d256b8ed71b9f041c \ + --hash=sha256:f0b059678fd549c66b89bed03efcabb009075bd131c248ecdf087bdb6faba24a \ + --hash=sha256:fcbb48a93e8699eae920f8d92f7160c03567b421bc17362a9ffbbd706a816f71 + # via + # -r tools/base/requirements.txt + # aiohttp # The following packages are considered to be unsafe in a requirements file: setuptools==57.4.0 \ diff --git a/tools/base/stream.py b/tools/base/stream.py new file mode 100644 index 0000000000000..7b85451c9c117 --- /dev/null +++ b/tools/base/stream.py @@ -0,0 +1,97 @@ +import pathlib +from contextlib import asynccontextmanager +from typing import AsyncGenerator, AsyncIterator, Optional, Union + +import aiofiles +from aiofiles.threadpool.binary import AsyncBufferedIOBase, AsyncBufferedReader +import aiohttp + + +class AsyncStream: + # 16k seems to offer a good balance of performance/speed + default_chunk_size: int = 1024 * 16 + + def __init__(self, buffer: AsyncBufferedIOBase, chunk_size: Optional[int] = None): + self._buffer = buffer + self._chunk_size = chunk_size + + @property + def buffer(self) -> AsyncBufferedIOBase: + return self._buffer + + @property + def chunk_size(self) -> int: + return self._chunk_size or self.default_chunk_size + + +class AsyncStreamReader(AsyncStream): + """This wraps an `AsyncBufferedReader` with a `__len__` + + This is useful if you want to a pass an `AsyncBufferedReader` + as the body data to an `HTTP` stream, but the `HTTP` client + wants to send a `Content-Length` header, based on the `len()` + of the data. + + As the file's size can be gleaned from the OS `stat_size`, + this can be set ahead of reading chunks from the file. + + This allows large file uploads to use little or no additional + memory while uploading with aiohttp. + """ + + def __init__(self, *args, size: Optional[int] = None, **kwargs): + self._size = size + super().__init__(*args, **kwargs) + + async def __aiter__(self) -> AsyncGenerator[bytes, AsyncBufferedReader]: + while True: + chunk = await self.buffer.read(self.chunk_size) + if not chunk: + break + yield chunk + + def __len__(self) -> int: + if self.size is None: + raise TypeError( + f"object of type '{self.__class__.__name__}' with no 'size' cannot get len()") + return self.size + + @property + def size(self) -> Optional[int]: + return self._size + + +class AsyncStreamWriter(AsyncStream): + """This wraps an async file object and provides a `stream_bytes` method to + stream an `aiohttp.ClientResponse` to the file. + + It makes use of aiohttp's stream buffering to download chunks, and then + writes the chunks to disk asynchronously. + + This allows large file downloads to use little or no additional + memory while downloading with aiohttp. + """ + + async def stream_bytes(self, response: aiohttp.ClientResponse) -> None: + """Stream chunks from an `aiohttp.ClientResponse` to an async + file object. + """ + # This is kinda aiohttp specific, we can make this more generic + # and then adapt to aiohttp if we find the need + async for chunk in response.content.iter_chunked(self.chunk_size): + await self.buffer.write(chunk) + + +@asynccontextmanager +async def async_reader(path: Union[str, pathlib.Path], + chunk_size: Optional[int] = None) -> AsyncIterator[AsyncStreamReader]: + path = pathlib.Path(path) + async with aiofiles.open(path, "rb") as f: + yield AsyncStreamReader(f, chunk_size=chunk_size, size=path.stat().st_size) + + +@asynccontextmanager +async def async_writer(path: Union[str, pathlib.Path], + chunk_size: Optional[int] = None) -> AsyncIterator[AsyncStreamWriter]: + async with aiofiles.open(path, "wb") as f: + yield AsyncStreamWriter(f, chunk_size=chunk_size) diff --git a/tools/base/tests/test_stream.py b/tools/base/tests/test_stream.py new file mode 100644 index 0000000000000..656615e0d8ebf --- /dev/null +++ b/tools/base/tests/test_stream.py @@ -0,0 +1,197 @@ + +from unittest.mock import AsyncMock, MagicMock, PropertyMock + +import pytest + +from tools.base import stream + + +@pytest.mark.parametrize("chunk_size", [None, 0, 23]) +def test_stream_constructor(chunk_size): + kwargs = dict(chunk_size=chunk_size) if chunk_size is not None else {} + base = stream.AsyncStream("BUFFER", **kwargs) + assert base._buffer == "BUFFER" + assert base._chunk_size == chunk_size + assert base.default_chunk_size == 1024 * 16 + assert base.buffer == base._buffer + assert "buffer" not in base.__dict__ + assert base.chunk_size == base._chunk_size or base.default_chunk_size + assert "chunk_size" not in base.__dict__ + + +@pytest.mark.parametrize("size", [None, 0, 23]) +def test_stream_reader_constructor(patches, size): + args = [f"ARG{i}" for i in range(0, 3)] + kwargs = dict(foo="FOO", bar="BAR") + kwargs.update(dict(size=size) if size is not None else {}) + patched = patches( + "AsyncStream.__init__", + prefix="tools.base.stream") + + with patched as (m_super, ): + m_super.return_value = None + reader = stream.AsyncStreamReader(*args, **kwargs) + + kwargs.pop("size", None) + assert ( + list(m_super.call_args) + == [tuple(args), kwargs]) + assert reader._size == size + assert reader.size == size + assert "size" not in reader.__dict__ + + +@pytest.mark.asyncio +@pytest.mark.parametrize("size", [None, 0, 23]) +async def test_stream_reader_dunder_aiter(patches, size): + reader = stream.AsyncStreamReader("BUFFER") + patched = patches( + ("AsyncStreamReader.chunk_size", dict(new_callable=PropertyMock)), + ("AsyncStreamReader.buffer", dict(new_callable=PropertyMock)), + prefix="tools.base.stream") + + class DummyChunker: + counter = 0 + _read = MagicMock() + + async def read(self, chunk_size=None): + self._read(chunk_size) + if self.counter < 3: + self.counter += 1 + return f"CHUNK{self.counter - 1}" + + _chunker = DummyChunker() + results = [] + + with patched as (m_size, m_buffer): + m_buffer.return_value.read = _chunker.read + async for result in reader: + results.append(result) + + assert ( + results + == [f"CHUNK{i}" for i in range(0, 3)]) + assert ( + list(list(c) for c in _chunker._read.call_args_list) + == [[(m_size.return_value, ), {}] for i in range(0, 4)]) + + +@pytest.mark.parametrize("size", [None, 0, 23]) +def test_stream_reader_dunder_len(patches, size): + reader = stream.AsyncStreamReader("BUFFER") + patched = patches( + ("AsyncStreamReader.size", dict(new_callable=PropertyMock)), + prefix="tools.base.stream") + + with patched as (m_size, ): + m_size.return_value = size + if size is None: + with pytest.raises(TypeError) as e: + len(reader) + else: + assert len(reader) == size + + if size is None: + assert ( + e.value.args[0] + == "object of type 'AsyncStreamReader' with no 'size' cannot get len()") + + +def test_stream_writer_constructor(patches): + args = [f"ARG{i}" for i in range(0, 3)] + kwargs = dict(foo="FOO", bar="BAR") + patched = patches( + "AsyncStream.__init__", + prefix="tools.base.stream") + + with patched as (m_super, ): + m_super.return_value = None + writer = stream.AsyncStreamWriter(*args, **kwargs) + + assert ( + list(m_super.call_args) + == [tuple(args), kwargs]) + + +@pytest.mark.asyncio +async def test_stream_writer_stream_bytes(patches): + writer = stream.AsyncStreamWriter("BUFFER") + response = MagicMock() + patched = patches( + ("AsyncStreamWriter.buffer", dict(new_callable=PropertyMock)), + ("AsyncStreamWriter.chunk_size", dict(new_callable=PropertyMock)), + prefix="tools.base.stream") + + class DummyChunker: + + async def iter_chunked(self, chunk_size): + for i in range(0, 3): + yield f"CHUNK{i}" + + _chunker = DummyChunker() + response.content.iter_chunked.side_effect = _chunker.iter_chunked + + with patched as (m_buffer, m_size): + m_buffer.return_value.write = AsyncMock() + assert not await writer.stream_bytes(response) + + assert ( + list(response.content.iter_chunked.call_args) + == [(m_size.return_value, ), {}]) + assert ( + list(list(c) for c in m_buffer.return_value.write.call_args_list) + == [[(f'CHUNK{i}',), {}] for i in range(0, 3)]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("chunk_size", [None, 0, 23]) +async def test_stream_async_reader(patches, chunk_size): + patched = patches( + "pathlib", + "aiofiles", + "AsyncStreamReader", + prefix="tools.base.stream") + args = ((chunk_size, ) if chunk_size is not None else ()) + + with patched as (m_plib, m_aiofiles, m_reader): + async with stream.async_reader("PATH", *args) as reader: + pass + + assert reader == m_reader.return_value + assert ( + list(m_plib.Path.call_args) + == [("PATH", ), {}]) + assert ( + list(m_aiofiles.open.call_args) + == [(m_plib.Path.return_value, 'rb'), {}]) + assert ( + list(m_reader.call_args) + == [(m_aiofiles.open.return_value.__aenter__.return_value,), + {'chunk_size': chunk_size, + 'size': m_plib.Path.return_value.stat.return_value.st_size}]) + assert ( + list(m_plib.Path.return_value.stat.call_args) + == [(), {}]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("chunk_size", [None, 0, 23]) +async def test_stream_async_writer(patches, chunk_size): + patched = patches( + "aiofiles", + "AsyncStreamWriter", + prefix="tools.base.stream") + args = ((chunk_size, ) if chunk_size is not None else ()) + + with patched as (m_aiofiles, m_writer): + async with stream.async_writer("PATH", *args) as writer: + pass + + assert writer == m_writer.return_value + assert ( + list(m_aiofiles.open.call_args) + == [("PATH", 'wb'), {}]) + assert ( + list(m_writer.call_args) + == [(m_aiofiles.open.return_value.__aenter__.return_value,), + {'chunk_size': chunk_size}]) From e3f886dca8d5626cd78d7b0574d993c69c4b0b58 Mon Sep 17 00:00:00 2001 From: Ryan Northey Date: Fri, 20 Aug 2021 13:57:59 +0100 Subject: [PATCH 2/5] tooling: Add Github assets utils Signed-off-by: Ryan Northey --- tools/github/release/BUILD | 16 ++ tools/github/release/assets.py | 247 ++++++++++++++++++++ tools/github/release/tests/test_assets.py | 270 ++++++++++++++++++++++ 3 files changed, 533 insertions(+) create mode 100644 tools/github/release/assets.py create mode 100644 tools/github/release/tests/test_assets.py diff --git a/tools/github/release/BUILD b/tools/github/release/BUILD index e200f9f2665e0..6e0d7a0407b66 100644 --- a/tools/github/release/BUILD +++ b/tools/github/release/BUILD @@ -1,6 +1,7 @@ load("@github_pip3//:requirements.bzl", "requirement") load("@rules_python//python:defs.bzl", "py_library") load("//bazel:envoy_build_system.bzl", "envoy_package") +load("//tools/base:envoy_python.bzl", "envoy_py_library") licenses(["notice"]) # Apache 2 @@ -10,6 +11,7 @@ py_library( name = "abstract", srcs = ["abstract.py"], deps = [ + "//tools/base:abstract", "//tools/base:functional", "//tools/base:utils", requirement("aiohttp"), @@ -18,6 +20,20 @@ py_library( ], ) +envoy_py_library( + "tools.github.release.assets", + deps = [ + "//tools/base:aio", + "//tools/base:functional", + "//tools/base:stream", + "//tools/base:utils", + ":abstract", + ":exceptions", + requirement("gidgethub"), + requirement("packaging"), + ], +) + py_library( name = "exceptions", srcs = ["exceptions.py"], diff --git a/tools/github/release/assets.py b/tools/github/release/assets.py new file mode 100644 index 0000000000000..9da9354231fab --- /dev/null +++ b/tools/github/release/assets.py @@ -0,0 +1,247 @@ +import pathlib +import re +import tarfile +import tempfile +from functools import cached_property +from typing import ( + Any, AsyncGenerator, AsyncIterator, Awaitable, Coroutine, Dict, Iterator, + Optional, Pattern, Set, Tuple, Union) + +import aiohttp + +import gidgethub.abc +import gidgethub.aiohttp + +from tools.base import abstract, aio, stream, utils +from tools.base.functional import async_property +from tools.github.release.abstract import ( + AGithubRelease, AGithubReleaseAssets, AGithubReleaseAssetsFetcher, AGithubReleaseAssetsPusher) +from tools.github.release.exceptions import GithubReleaseError + + +@abstract.implementer(AGithubReleaseAssets) +class GithubReleaseAssets: + concurrency = 4 + + def __init__(self, release: AGithubRelease, path: pathlib.Path) -> None: + self.release = release + self._path = path + + async def __aiter__(self) -> AsyncGenerator[Dict[str, Union[str, pathlib.Path]], Awaitable]: + with self: + try: + async for result in self.run(): + yield result + except aio.ConcurrentIteratorError as e: + raise GithubReleaseError(e.args[0]) + + def __enter__(self) -> AGithubReleaseAssets: + return self + + def __exit__(self, *args) -> None: + self.cleanup() + + @async_property + async def assets(self) -> Dict: + return await self.release.assets + + @async_property + async def awaitables(self) -> AsyncGenerator[Coroutine[Any, Any, Dict[str, Union[str, pathlib.Path]]], Dict]: + raise NotImplementedError + + @property + def github(self) -> gidgethub.abc.GitHubAPI: + return self.release.github + + @property + def path(self) -> pathlib.Path: + return self._path + + @property + def session(self) -> aiohttp.ClientSession: + return self.release.session + + @property + def tasks(self) -> aio.concurrent: + return aio.concurrent(self.awaitables, limit=self.concurrency) + + @cached_property + def tempdir(self) -> tempfile.TemporaryDirectory: + return tempfile.TemporaryDirectory() + + @property + def version(self) -> str: + return self.release.version + + def cleanup(self) -> None: + if "tempdir" in self.__dict__: + self.tempdir.cleanup() + del self.__dict__["tempdir"] + + def fail(self, message: str) -> str: + return self.release.fail(message) + + async def run(self) -> AsyncGenerator[Dict[str, Union[str, pathlib.Path]], Awaitable]: + try: + async for result in self.tasks: + yield result + except aio.ConcurrentIteratorError as e: + # This should catch any errors running the upload coros + # In this case the exception is unwrapped, and the original + # error is raised. + raise e.args[0] + except aio.ConcurrentError as e: + yield dict(error=self.fail(e.args[0])) + + async def handle_result(self, result: Any) -> Any: + # ? + return result + + +@abstract.implementer(AGithubReleaseAssetsFetcher) +class GithubReleaseAssetsFetcher(GithubReleaseAssets): + + def __init__(self, release, path, asset_types, append=False) -> None: + super().__init__(release, path) + self._asset_types = asset_types + self._append = append + + def __exit__(self, *args) -> None: + # TODO(phlax): make this non-blocking + with tarfile.open(self._path, self.write_mode) as tar: + tar.add(self.path, arcname=self.version) + super().__exit__(*args) + + @property + def append(self) -> bool: + """Append to existing file or otherwise""" + return self._append + + @cached_property + def asset_types(self) -> Dict[str, Pattern[str]]: + return self._asset_types or dict(assets=re.compile(".*")) + + @async_property + async def awaitables(self) -> AsyncGenerator[Coroutine[Any, Any, Dict[str, Union[str, pathlib.Path]]], Dict]: + # assets categorised according to asset_types + for asset in await self.assets: + asset_type = self.asset_type(asset) + if not asset_type: + continue + asset["asset_type"] = asset_type + yield self.download(asset) + + @cached_property + def is_tarlike(self) -> bool: + return utils.is_tarlike(self._path) + + @property + def out_exists(self) -> bool: + return self._path.exists() and not self.append + + @cached_property + def path(self) -> pathlib.Path: + if self.out_exists: + self.fail( + f"Output directory exists: {self._path}" + if not self.is_tarlike + else f"Output tarball exists: {self._path}") + return ( + pathlib.Path(self.tempdir.name) + if self.is_tarlike + else self._path) + + @property + def write_mode(self) -> str: + return "a" if self.append else "w" + + def asset_type(self, asset: Dict) -> Optional[str]: + for k, v in self.asset_types.items(): + if v.search(asset["name"]): + return k + + async def download(self, asset: Dict) -> Dict[str, Union[str, pathlib.Path]]: + return await self.save( + asset["asset_type"], asset["name"], + await self.session.get(asset["browser_download_url"])) + + async def save(self, asset_type: str, name: str, download: aiohttp.ClientResponse) -> Dict[str, Union[str, pathlib.Path]]: + outfile = self.path.joinpath(asset_type, name) + outfile.parent.mkdir(exist_ok=True) + async with stream.async_writer(outfile) as f: + await f.stream_bytes(download) + result: Dict[str, Union[str, pathlib.Path]] = dict( + name=name, + outfile=outfile) + if download.status != 200: + result["error"] = self.fail(f"Failed downloading, got response:\n{download}") + return result + + +@abstract.implementer(AGithubReleaseAssetsPusher) +class GithubReleaseAssetsPusher(GithubReleaseAssets): + _artefacts_glob = "**/*{version}*" + file_exts = {"deb", "changes", "rpm"} + + @property + def artefacts(self) -> Iterator[pathlib.Path]: + for match in self.path.glob(self._artefacts_glob.format(version=self.version)): + if match.suffix[1:] in self.file_exts: + yield match + + @async_property + async def asset_names(self) -> Set[str]: + return await self.release.asset_names + + @async_property + async def awaitables(self) -> AsyncGenerator[Coroutine[Any, Any, Dict[str, Union[str, pathlib.Path]]], Dict]: + for artefact in self.artefacts: + yield self.upload(artefact, await self.artefact_url(artefact.name)) + + @cached_property + def is_dir(self) -> bool: + return self._path.is_dir() + + @cached_property + def is_tarball(self) -> bool: + return tarfile.is_tarfile(self._path) + + @cached_property + def path(self) -> pathlib.Path: + if not self.is_tarball and not self.is_dir: + raise GithubReleaseError( + f"Unrecognized target '{self._path}', should either be a directory or a tarball containing packages") + # TODO(phlax): make this non-blocking + return ( + utils.extract(self.tempdir.name, self._path) + if self.is_tarball + else self._path) + + @async_property + async def upload_url(self) -> str: + return await self.release.upload_url + + @property + def version(self) -> str: + return self.release.version + + async def artefact_url(self, name: str) -> str: + """URL to upload a provided artefact name as an asset""" + return f"{await self.upload_url}?name={name}" + + async def upload(self, artefact: pathlib.Path, url: str) -> Dict[str, Union[str, pathlib.Path]]: + if artefact.name in await self.asset_names: + return dict( + name=artefact.name, + url=url, + error=self.fail(f"Asset exists already {artefact.name}")) + async with stream.async_reader(artefact) as f: + response = await self.github.post( + url, + data=f, + content_type="application/octet-stream") + errored = (response.get("error") or not response.get("state") == "uploaded") + result = dict(name=artefact.name, url=response["url"] if not errored else url) + if errored: + result["error"] = self.fail(f"Something went wrong uploading {artefact.name} -> {url}, got:\n{response}") + return result diff --git a/tools/github/release/tests/test_assets.py b/tools/github/release/tests/test_assets.py new file mode 100644 index 0000000000000..db8024aa20204 --- /dev/null +++ b/tools/github/release/tests/test_assets.py @@ -0,0 +1,270 @@ + +from unittest.mock import AsyncMock, MagicMock, PropertyMock + +import pytest + +from tools.base import aio +from tools.base.functional import async_property +from tools.github.release import assets as github_assets + + +def test_assets_constructor(): + assets = github_assets.GithubReleaseAssets("RELEASE", "PATH") + assert assets.release == "RELEASE" + assert assets._path == "PATH" + assert assets.concurrency == 4 + + assert assets.path == "PATH" + assert "path" not in assets.__dict__ + + +def _check_assets_property(patches, prop, arg=None): + release = MagicMock() + assets = github_assets.GithubReleaseAssets(release, "VERSION") + assert getattr(assets, prop) == getattr(release, arg or prop) + assert prop not in assets.__dict__ + + +@pytest.mark.parametrize( + "prop", + [("github",), + ("session",)]) +def test_assets_props(patches, prop): + _check_assets_property(patches, *prop) + + +def test_assets_context(patches): + release = github_assets.GithubReleaseAssets("RELEASE", "PATH") + patched = patches( + "GithubReleaseAssets.cleanup", + prefix="tools.github.release.assets") + + with patched as (m_cleanup, ): + with release as _release: + pass + + assert _release is release + assert m_cleanup.called + + +@pytest.mark.asyncio +@pytest.mark.parametrize("raises", [None, BaseException, aio.ConcurrentIteratorError]) +async def test_assets_dunder_aiter(patches, raises): + assets = github_assets.GithubReleaseAssets("RELEASE", "PATH") + patched = patches( + "GithubReleaseAssets.__enter__", + "GithubReleaseAssets.__exit__", + ("GithubReleaseAssets.run", dict(new_callable=MagicMock)), + prefix="tools.github.release.assets") + _results = [] + + async def _run(): + for x in range(0, 5): + if x == 3 and raises: + raise raises("AN ERROR OCCURRED") + yield x + + with patched as (m_enter, m_exit, m_run): + m_run.return_value = _run() + m_enter.return_value = None + m_exit.return_value = None + if raises == BaseException: + with pytest.raises(BaseException) as e: + async for result in assets: + _results.append(result) + elif raises: + with pytest.raises(github_assets.GithubReleaseError) as e: + async for result in assets: + _results.append(result) + else: + async for result in assets: + _results.append(result) + + assert ( + list(m_run.call_args) + == [(), {}]) + assert ( + list(m_enter.call_args) + == [(), {}]) + + if raises: + assert ( + m_exit.call_args[0][0] + == (raises + if raises == BaseException + else github_assets.GithubReleaseError)) + assert m_exit.call_args[0][1] == e.value + assert _results == [0, 1, 2] + assert e.value.args[0] == "AN ERROR OCCURRED" + return + + assert ( + list(m_exit.call_args) + == [(None, None, None), {}]) + assert _results == list(range(0, 5)) + + +def test_assets_tasks(patches): + assets = github_assets.GithubReleaseAssets("RELEASE", "PATH") + patched = patches( + "aio", + ("GithubReleaseAssets.awaitables", dict(new_callable=PropertyMock)), + prefix="tools.github.release.assets") + assets.concurrency = 23 + + with patched as (m_aio, m_await): + assert assets.tasks == m_aio.concurrent.return_value + + assert ( + list(m_aio.concurrent.call_args) + == [(m_await.return_value,), dict(limit=23)]) + assert "tasks" not in assets.__dict__ + + +def test_assets_tempdir(patches): + assets = github_assets.GithubReleaseAssets("RELEASE", "PATH") + patched = patches( + "tempfile", + prefix="tools.github.release.assets") + assets.concurrency = 23 + + with patched as (m_temp, ): + assert assets.tempdir == m_temp.TemporaryDirectory.return_value + + assert ( + list(m_temp.TemporaryDirectory.call_args) + == [(), {}]) + + +def test_fetcher_constructor(patches): + patched = patches( + "GithubReleaseAssets.__init__", + prefix="tools.github.release.assets") + + with patched as (m_super, ): + fetcher = github_assets.GithubReleaseAssetsFetcher("RELEASE", "PATH", "ASSET_TYPES") + + assert ( + list(m_super.call_args) + == [("RELEASE", "PATH"), {}]) + assert fetcher._asset_types == "ASSET_TYPES" + + +@pytest.mark.asyncio +async def test_fetcher_assets(patches): + release = MagicMock() + type(release).assets = PropertyMock(side_effect=AsyncMock(return_value="ASSETS")) + fetcher = github_assets.GithubReleaseAssetsFetcher("RELEASE", "PATH", "ASSET_TYPES") + fetcher.release = release + assert await fetcher.assets == "ASSETS" + assert not hasattr(fetcher, async_property.cache_name) + + +@pytest.mark.asyncio +async def test_fetcher_download(patches): + fetcher = github_assets.GithubReleaseAssetsFetcher("RELEASE", "PATH", "ASSET_TYPES") + patched = patches( + ("GithubReleaseAssetsFetcher.save", dict(new_callable=AsyncMock)), + ("GithubReleaseAssetsFetcher.session", dict(new_callable=PropertyMock)), + prefix="tools.github.release.assets") + asset = dict( + asset_type="ASSET TYPE", + browser_download_url="ASSET DOWNLOAD URL", + name="ASSET NAME") + + with patched as (m_save, m_session): + m_session.return_value.get = AsyncMock() + assert ( + await fetcher.download(asset) + == m_save.return_value) + + assert ( + list(m_save.call_args) + == [("ASSET TYPE", "ASSET NAME", m_session.return_value.get.return_value), {}]) + assert ( + list(m_session.return_value.get.call_args) + == [('ASSET DOWNLOAD URL',), {}]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("status", [None, 200, 201]) +async def test_fetcher_save(patches, status): + fetcher = github_assets.GithubReleaseAssetsFetcher("RELEASE", "PATH", "ASSET_TYPES") + patched = patches( + "stream", + "GithubReleaseAssetsFetcher.fail", + ("GithubReleaseAssetsFetcher.path", dict(new_callable=PropertyMock)), + prefix="tools.github.release.assets") + download = MagicMock() + download.status = status + + with patched as (m_stream, m_fail, m_path): + m_stream.__aenter__ = AsyncMock() + outfile = m_path.return_value.joinpath.return_value + result = await fetcher.save("ASSET TYPE", "NAME", download) + + expected = dict(name="NAME", outfile=outfile) + if status != 200: + assert ( + list(m_fail.call_args) + == [(f"Failed downloading, got response:\n{download}", ), {}]) + expected["error"] = m_fail.return_value + else: + assert not m_fail.called + + assert result == expected + assert ( + list(m_path.return_value.joinpath.call_args) + == [('ASSET TYPE', 'NAME'), {}]) + assert ( + list(outfile.parent.mkdir.call_args) + == [(), dict(exist_ok=True)]) + assert ( + list(m_stream.async_writer.call_args) + == [(outfile, ), {}]) + assert ( + list(m_stream.async_writer.return_value.__aenter__.return_value.stream_bytes.call_args) + == [(download, ), {}]) + + +def test_pusher_constructor(patches): + patched = patches( + "GithubReleaseAssets.__init__", + prefix="tools.github.release.assets") + + with patched as (m_super, ): + m_super.return_value = None + github_assets.GithubReleaseAssetsPusher("RELEASE", "PATH") + + assert ( + list(m_super.call_args) + == [("RELEASE", "PATH"), {}]) + + +@pytest.mark.asyncio +async def test_fetcher_asset_names(patches): + release = MagicMock() + type(release).asset_names = PropertyMock(side_effect=AsyncMock(return_value="ASSET NAMES")) + pusher = github_assets.GithubReleaseAssetsPusher("RELEASE", "PATH") + pusher.release = release + assert await pusher.asset_names == "ASSET NAMES" + assert not hasattr(pusher, async_property.cache_name) + + +@pytest.mark.asyncio +async def test_fetcher_upload_url(patches): + release = MagicMock() + type(release).upload_url = PropertyMock(side_effect=AsyncMock(return_value="ASSET NAMES")) + pusher = github_assets.GithubReleaseAssetsPusher("RELEASE", "PATH") + pusher.release = release + assert await pusher.upload_url == "ASSET NAMES" + assert not hasattr(pusher, async_property.cache_name) + + +def test_fetcher_version(patches): + release = MagicMock() + type(release).version = PropertyMock(return_value="VERSION") + pusher = github_assets.GithubReleaseAssetsPusher("RELEASE", "PATH") + pusher.release = release + assert pusher.version == "VERSION" + assert not "version" in pusher.__dict__ From 0ae7ff96896c0b7528c2c66e06d5dfeb150356ce Mon Sep 17 00:00:00 2001 From: Ryan Northey Date: Tue, 17 Aug 2021 11:29:01 +0100 Subject: [PATCH 3/5] tooling: Add Github release Signed-off-by: Ryan Northey --- tools/github/release/BUILD | 12 + tools/github/release/release.py | 193 ++++++++ tools/github/release/tests/test_release.py | 525 +++++++++++++++++++++ 3 files changed, 730 insertions(+) create mode 100644 tools/github/release/release.py create mode 100644 tools/github/release/tests/test_release.py diff --git a/tools/github/release/BUILD b/tools/github/release/BUILD index 6e0d7a0407b66..e1250b8134ad3 100644 --- a/tools/github/release/BUILD +++ b/tools/github/release/BUILD @@ -38,3 +38,15 @@ py_library( name = "exceptions", srcs = ["exceptions.py"], ) + +envoy_py_library( + "tools.github.release.release", + deps = [ + "//tools/base:aio", + "//tools/base:functional", + "//tools/base:utils", + ":abstract", + ":exceptions", + requirement("gidgethub"), + ], +) diff --git a/tools/github/release/release.py b/tools/github/release/release.py new file mode 100644 index 0000000000000..d26354d5a812d --- /dev/null +++ b/tools/github/release/release.py @@ -0,0 +1,193 @@ +import pathlib +from functools import cached_property +from typing import ( + Dict, Iterable, + List, Optional, Pattern, Set, Tuple, Type, Union) + +import verboselogs # type:ignore + +import aiohttp + +import gidgethub.abc +import gidgethub.aiohttp + +from tools.base import abstract, aio +from tools.base.functional import async_property + +from tools.github.release.abstract import ( + AGithubRelease, AGithubReleaseAssetsFetcher, AGithubReleaseAssetsPusher, AGithubReleaseManager) +# from tools.github.release.assets import GithubReleaseAssetsFetcher, GithubReleaseAssetsPusher +from tools.github.release.exceptions import GithubReleaseError + + +@abstract.implementer(AGithubRelease) +class GithubRelease: + file_exts = {"deb", "changes", "rpm"} + + def __init__(self, manager: AGithubReleaseManager, version: str): + self.manager = manager + self._version = version + + @async_property(cache=True) + async def asset_names(self) -> Set[str]: + """Set of the names of assets for this release version""" + return set(asset["name"] for asset in await self.assets) + + @async_property(cache=True) + async def assets(self) -> Dict: + """Assets dictionary as returned by Github Release API""" + try: + return await self.github.getitem(await self.assets_url) + except gidgethub.GitHubException as e: + raise GithubReleaseError(e) + + @async_property(cache=True) + async def assets_url(self) -> str: + """URL for retrieving this version's assets information from""" + return (await self.release)["assets_url"] + + @async_property(cache=True) + async def delete_url(self) -> pathlib.PurePosixPath: + """Github API-relative URL for deleting this release version""" + return self.releases_url.joinpath(str(await self.release_id)) + + @async_property + async def exists(self) -> bool: + return self.version_name in await self.release_names + + @property + def fetcher(self) -> Type[AGithubReleaseAssetsFetcher]: + # return GithubReleaseAssetsFetcher + raise NotImplementedError + + @property + def github(self) -> gidgethub.abc.GitHubAPI: + return self.manager.github + + @property + def log(self) -> verboselogs.VerboseLogger: + return self.manager.log + + @property + def pusher(self) -> Type[AGithubReleaseAssetsPusher]: + # return GithubReleaseAssetsPusher + raise NotImplementedError + + @async_property(cache=True) + async def release(self) -> Dict: + """Dictionary of release version information as returned by the Github Release API""" + return await self.get() + + @async_property(cache=True) + async def release_id(self) -> int: + """The Github release ID for this version, required for some URLs""" + return (await self.release)["id"] + + @async_property + async def release_names(self) -> Tuple[str, ...]: + """Tuple of release tag names as returned by the Github Release API + + This is used to check whether the release exists already. + """ + return tuple(release["tag_name"] for release in await self.manager.releases) + + @property + def releases_url(self) -> pathlib.PurePosixPath: + return self.manager.releases_url + + @property + def session(self) -> aiohttp.ClientSession: + return self.manager.session + + @async_property(cache=True) + async def upload_url(self) -> str: + """Upload URL for this release version""" + return (await self.release)["upload_url"].split("{")[0] + + @property + def version(self) -> str: + return self._version + + @property + def version_name(self) -> str: + return self.manager.format_version(self.version) + + @cached_property + def version_url(self) -> pathlib.PurePosixPath: + """Github API-relative URL to retrieve release version information from""" + return self.releases_url.joinpath("tags", self.version_name) + + async def create(self, assets: Optional[List[pathlib.Path]] = None) -> Dict[str, Union[List[Dict[str, Union[str, pathlib.Path]]], Dict]]: + results: Dict[str, Union[List[Dict], Dict]] = {} + if await self.exists: + self.fail(f"Release {self.version_name} already exists") + else: + self.log.notice(f"Creating release {self.version}") + try: + results["release"] = await self.github.post( + str(self.releases_url), + data=dict(tag_name=self.version_name)) + except gidgethub.GitHubException as e: + raise GithubReleaseError(e) + self.log.success(f"Release created {self.version}") + if assets: + results.update(await self.push(assets)) + return results + + async def delete(self) -> None: + if not await self.exists: + raise GithubReleaseError( + f"Unable to delete version {self.version_name} as it does not exist") + self.log.notice(f"Deleting release version: {self.version_name}") + try: + await self.github.delete(str(await self.delete_url)) + except gidgethub.GitHubException as e: + raise GithubReleaseError(e) + self.log.success(f"Release version deleted: {self.version_name}") + + async def fetch( + self, + path: pathlib.Path, + asset_types: Optional[Dict[str, Pattern[str]]] = None, + append: Optional[bool] = False) -> Dict[str, List[Dict[str, Union[str, pathlib.Path]]]]: + self.log.notice(f"Downloading assets for release version: {self.version_name} -> {path}") + assets: List[Dict[str, Union[str, pathlib.Path]]] = [] + errors: List[Dict[str, Union[str, pathlib.Path]]] = [] + response = dict(assets=assets, errors=errors) + async for result in self.fetcher(self, path, asset_types, append=append): + if result.get("error"): + response["errors"].append(result) + continue + response["assets"].append(result) + self.log.info(f"Asset saved: {result['name']} -> {result['outfile']}") + if not response["errors"]: + self.log.success(f"Assets downloaded for release version: {self.version_name} -> {path}") + return response + + def fail(self, message: str) -> str: + return self.manager.fail(message) + + async def get(self) -> Dict: + try: + return await self.github.getitem(str(self.version_url)) + except gidgethub.GitHubException as e: + raise GithubReleaseError(e) + + async def push(self, artefacts: Iterable[pathlib.Path]) -> Dict[str, List[Dict[str, Union[str, pathlib.Path]]]]: + self.log.notice(f"Pushing assets for {self.version}") + assets: List[Dict[str, Union[str, pathlib.Path]]] = [] + errors: List[Dict[str, Union[str, pathlib.Path]]] = [] + response = dict(assets=assets, errors=errors) + try: + for path in artefacts: + async for result in self.pusher(self, path): + if result.get("error"): + response["errors"].append(result) + continue + response["assets"].append(result) + self.log.info(f"Release file uploaded {result['name']}") + except aio.ConcurrentError as e: + raise e.args[0] + if not response["errors"]: + self.log.success(f"Assets uploaded: {self.version}") + return response diff --git a/tools/github/release/tests/test_release.py b/tools/github/release/tests/test_release.py new file mode 100644 index 0000000000000..1a181ae9b3fb3 --- /dev/null +++ b/tools/github/release/tests/test_release.py @@ -0,0 +1,525 @@ + +from unittest.mock import AsyncMock, MagicMock, PropertyMock + +import pytest + +import gidgethub + +from tools.base import aio +from tools.base.functional import async_property +from tools.github.release import exceptions as github_errors, release as github_release + + +def test_release_constructor(): + release = github_release.GithubRelease("MANAGER", "VERSION") + assert release.manager == "MANAGER" + assert release.version == "VERSION" + + # assert release.fetcher == github_release.GithubReleaseAssetsFetcher + # assert "fetcher" not in release.__dict__ + # assert release.pusher == github_release.GithubReleaseAssetsPusher + # assert "pusher" not in release.__dict__ + + +def _check_manager_property(prop, arg=None): + _manager = MagicMock() + checker = github_release.GithubRelease(_manager, "VERSION") + assert getattr(checker, prop) == getattr(_manager, arg or prop) + assert prop not in checker.__dict__ + + +@pytest.mark.parametrize( + "prop", + [("github",), + ("log",), + ("releases_url",), + ("session",)]) +def test_release_manager_props(prop): + _check_manager_property(*prop) + + +@pytest.mark.asyncio +async def test_release_asset_names(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.assets", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + _assets = [MagicMock(), MagicMock()] + + with patched as (m_assets, ): + m_assets.side_effect = AsyncMock(return_value=_assets) + assert ( + await release.asset_names + == set(m.__getitem__.return_value for m in _assets)) + + for _asset in _assets: + assert ( + list(_asset.__getitem__.call_args) + == [('name',), {}]) + + assert "asset_names" in release.__async_prop_cache__ + + +@pytest.mark.asyncio +@pytest.mark.parametrize("raises", [None, BaseException, gidgethub.GitHubException]) +async def test_release_assets(patches, raises): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.assets_url", dict(new_callable=PropertyMock)), + ("GithubRelease.github", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_url, m_github): + _get = AsyncMock() + _url = AsyncMock() + m_github.return_value.getitem.side_effect = _get + m_url.side_effect = _url + if raises: + _get.side_effect = raises("AN ERROR OCCURRED") + _raises = ( + github_errors.GithubReleaseError + if raises == gidgethub.GitHubException + else raises) + with pytest.raises(_raises): + await release.assets + else: + assert ( + await release.assets + == _get.return_value) + + assert ( + list(_get.call_args) + == [(_url.return_value,), {}]) + if not raises: + assert "assets" in release.__async_prop_cache__ + + +@pytest.mark.asyncio +async def test_release_assets_url(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.release", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_release, ): + _release = AsyncMock() + m_release.side_effect = _release + assert ( + await release.assets_url + == _release.return_value.__getitem__.return_value) + + assert ( + list(_release.return_value.__getitem__.call_args) + == [('assets_url',), {}]) + assert "assets_url" in getattr(release, async_property.cache_name) + + +@pytest.mark.asyncio +async def test_release_delete_url(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.releases_url", dict(new_callable=PropertyMock)), + ("GithubRelease.release_id", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_url, m_id): + _id = AsyncMock() + m_id.side_effect = _id + assert ( + await release.delete_url + == m_url.return_value.joinpath.return_value) + + assert ( + list(m_url.return_value.joinpath.call_args) + == [(str(_id.return_value), ), {}]) + assert "delete_url" in getattr(release, async_property.cache_name) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("version", [f"VERSION{i}" for i in range(0, 7)]) +async def test_release_exists(patches, version): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.release_names", dict(new_callable=PropertyMock)), + ("GithubRelease.version_name", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + _versions = [f"VERSION{i}" for i in range(3, 5)] + + with patched as (m_release, m_name): + m_name.return_value = version + m_release.side_effect = AsyncMock(return_value=_versions) + assert await release.exists == (version in _versions) + + assert not hasattr(release, async_property.cache_name) + + +@pytest.mark.asyncio +async def test_release_release(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.get", dict(new_callable=AsyncMock)), + prefix="tools.github.release.release") + + with patched as (m_get, ): + assert await release.release == m_get.return_value + + assert "release" in getattr(release, async_property.cache_name) + + +@pytest.mark.asyncio +async def test_release_release_id(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.release", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_release, ): + _release = AsyncMock() + m_release.side_effect = _release + assert await release.release_id == _release.return_value.__getitem__.return_value + + assert ( + list(_release.return_value.__getitem__.call_args) + == [('id',), {}]) + assert "release_id" in getattr(release, async_property.cache_name) + + +@pytest.mark.asyncio +async def test_release_release_names(patches): + _manager = MagicMock() + + _release_names = [dict(tag_name=f"TAG{i}") for i in range(0, 3)] + + async def _releases(): + return _release_names + + _manager.releases = _releases() + release = github_release.GithubRelease(_manager, "VERSION") + assert ( + await release.release_names + == tuple(t["tag_name"] for t in _release_names)) + + +@pytest.mark.asyncio +async def test_release_upload_url(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.release", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_release, ): + _release = AsyncMock() + m_release.side_effect = _release + assert ( + await release.upload_url + == _release.return_value.__getitem__.return_value.split.return_value.__getitem__.return_value) + + assert ( + list(_release.return_value.__getitem__.call_args) + == [('upload_url',), {}]) + assert ( + list(_release.return_value.__getitem__.return_value.split.call_args) + == [('{',), {}]) + assert ( + list(_release.return_value.__getitem__.return_value.split.return_value.__getitem__.call_args) + == [(0,), {}]) + assert "upload_url" in release.__async_prop_cache__ + + +def test_release_version_name(patches): + _manager = MagicMock() + release = github_release.GithubRelease(_manager, "VERSION") + release.version_name == _manager.format_version.return_value + assert ( + list(_manager.format_version.call_args) + == [("VERSION",), {}]) + + +def test_release_version_url(patches): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.releases_url", dict(new_callable=PropertyMock)), + ("GithubRelease.version_name", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_releases, m_version): + assert release.version_url == m_releases.return_value.joinpath.return_value + + assert ( + list(m_releases.return_value.joinpath.call_args) + == [("tags", m_version.return_value), {}]) + assert "version_url" in release.__dict__ + + +@pytest.mark.asyncio +@pytest.mark.parametrize("exists", [True, False]) +@pytest.mark.parametrize("assets", [None, [], [f"ASSET{i}" for i in range(0, 3)]]) +@pytest.mark.parametrize("raises", [None, BaseException, gidgethub.GitHubException]) +async def test_release_create(patches, exists, assets, raises): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + "GithubRelease.fail", + ("GithubRelease.push", dict(new_callable=AsyncMock)), + ("GithubRelease.exists", dict(new_callable=PropertyMock)), + ("GithubRelease.github", dict(new_callable=PropertyMock)), + ("GithubRelease.log", dict(new_callable=PropertyMock)), + ("GithubRelease.releases_url", dict(new_callable=PropertyMock)), + ("GithubRelease.version_name", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + args = ( + (assets, ) + if assets is not None + else ()) + + with patched as (m_fail, m_push, m_exists, m_github, m_log, m_url, m_version): + m_exists.side_effect = AsyncMock(return_value=exists) + m_push.return_value = dict(PUSHED=True) + m_github.return_value.post = AsyncMock() + if raises: + m_github.return_value.post.side_effect = raises("AN ERROR OCCURRED") + if raises and not exists: + _raises = ( + github_errors.GithubReleaseError + if raises == gidgethub.GitHubException + else raises) + with pytest.raises(_raises): + await release.create(*args) + else: + result = await release.create(*args) + + expected = {} + if not exists: + assert ( + list(m_log.return_value.notice.call_args) + == [(f"Creating release VERSION", ), {}]) + assert ( + list(m_github.return_value.post.call_args) + == [(str(m_url.return_value), ), dict(data=dict(tag_name=m_version.return_value))]) + assert not m_fail.called + if not raises: + expected["release"] = m_github.return_value.post.return_value + assert ( + list(m_log.return_value.success.call_args) + == [(f"Release created VERSION", ), {}]) + else: + assert not m_log.return_value.success.called + else: + assert not m_github.return_value.post.called + assert not m_log.called + assert ( + list(m_fail.call_args) + == [(f"Release {m_version.return_value} already exists", ), {}]) + + if not exists and raises: + assert not m_push.called + return + if assets: + expected["PUSHED"] = True + assert ( + list(m_push.call_args) + == [(assets, ), {}]) + else: + assert not m_push.called + assert result == expected + + +@pytest.mark.asyncio +@pytest.mark.parametrize("exists", [True, False]) +@pytest.mark.parametrize("raises", [None, BaseException, gidgethub.GitHubException]) +async def test_release_delete(patches, exists, raises): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.delete_url", dict(new_callable=PropertyMock)), + ("GithubRelease.exists", dict(new_callable=PropertyMock)), + ("GithubRelease.github", dict(new_callable=PropertyMock)), + ("GithubRelease.log", dict(new_callable=PropertyMock)), + ("GithubRelease.version_name", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_url, m_exists, m_github, m_log, m_version): + _url = AsyncMock() + m_url.side_effect = _url + m_exists.side_effect = AsyncMock(return_value=exists) + m_github.return_value.delete = AsyncMock() + if raises: + m_github.return_value.delete.side_effect = raises("AN ERROR OCCURRED") + + if exists and not raises: + assert not await release.delete() + elif raises == BaseException: + with pytest.raises(BaseException) as e: + await release.delete() + else: + with pytest.raises(github_errors.GithubReleaseError) as e: + await release.delete() + + if not exists: + assert ( + e.value.args[0] + == f"Unable to delete version {m_version.return_value} as it does not exist") + assert not m_log.called + assert not m_github.called + return + assert ( + list(m_log.return_value.notice.call_args) + == [(f"Deleting release version: {m_version.return_value}", ), {}]) + assert ( + list(m_github.return_value.delete.call_args) + == [(str(_url.return_value), ), {}]) + if raises: + assert not m_log.return_value.success.called + return + assert ( + list(m_log.return_value.success.call_args) + == [(f"Release version deleted: {m_version.return_value}", ), {}]) + + +def test_release_fail(): + manager = MagicMock() + release = github_release.GithubRelease(manager, "VERSION") + assert release.fail("FAILURE") == manager.fail.return_value + assert ( + list(manager.fail.call_args) + == [("FAILURE", ), {}]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("asset_types", [None, (), tuple(f"ASSET_TYPE{i}" for i in range(0, 3))]) +@pytest.mark.parametrize("errors", [[], [0], [2, 4], range(0, 5)]) +async def test_release_fetch(patches, asset_types, errors): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.fetcher", dict(new_callable=PropertyMock)), + ("GithubRelease.log", dict(new_callable=PropertyMock)), + ("GithubRelease.version_name", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + kwargs = {} if asset_types is None else dict(asset_types=asset_types) + fetched = MagicMock() + + async def _fetcher(_releaser, path, asset_types, append=False): + fetched(_releaser, path, asset_types, append) + for x in range(0, 5): + response = dict(name=f"FETCHED{x}") + if x in errors: + response["error"] = f"ERROR{x}" + else: + response["outfile"] = f"OUTFILE{x}" + yield response + expected = dict( + errors=[ + dict(name=f"FETCHED{i}", error=f"ERROR{i}") + for i in errors], + assets=[ + dict(name=f"FETCHED{i}", outfile=f"OUTFILE{i}") + for i in range(0, 5) if i not in errors]) + + with patched as (m_fetcher, m_log, m_version): + m_fetcher.return_value = _fetcher + assert await release.fetch("PATH", **kwargs) == expected + + assert ( + list(m_log.return_value.notice.call_args) + == [(f"Downloading assets for release version: {m_version.return_value} -> PATH", ), {}]) + assert ( + list(fetched.call_args) + == [(release, 'PATH', asset_types, False), {}]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("raises", [None, BaseException, gidgethub.GitHubException]) +async def test_release_get(patches, raises): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.version_url", dict(new_callable=PropertyMock)), + ("GithubRelease.github", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + + with patched as (m_url, m_github): + m_github.return_value.getitem = AsyncMock() + if raises: + m_github.return_value.getitem.side_effect = raises("AN ERROR OCCURRED") + _raises = ( + github_errors.GithubReleaseError + if raises == gidgethub.GitHubException + else raises) + with pytest.raises(_raises): + await release.get() + else: + assert await release.get() == m_github.return_value.getitem.return_value + assert ( + list(m_github.return_value.getitem.call_args) + == [(str(m_url.return_value), ), {}]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("raises", [None, BaseException, aio.ConcurrentError]) +@pytest.mark.parametrize("errors", [[], [1, 3], range(0, 5)]) +async def test_release_push(patches, raises, errors): + release = github_release.GithubRelease("MANAGER", "VERSION") + patched = patches( + ("GithubRelease.log", dict(new_callable=PropertyMock)), + ("GithubRelease.pusher", dict(new_callable=PropertyMock)), + prefix="tools.github.release.release") + artefacts = [f"ARTEFACTS{i}" for i in range(0, 5)] + expected = dict(assets=[], errors=[]) + + for i in range(0, 5): + for x in range(0, 5): + result = dict( + name=f"ARTEFACTS{i}_ASSET{x}", + foo=f"ARTEFACTS{i}_BAR{x}") + if x in errors: + result["error"] = f"GOT AN ERROR ARTEFACTS{i} {x}" + expected["errors"].append(result) + else: + expected["assets"].append(result) + + + class SomeError(Exception): + pass + + async def _pusher(path): + if raises: + raise raises(SomeError("AN ERROR OCCURRED")) + for i in range(0, 5): + response = dict( + name=f"{path}_ASSET{i}", + foo=f"{path}_BAR{i}") + if i in errors: + response["error"] = f"GOT AN ERROR {path} {i}" + yield response + + with patched as (m_log, m_pusher): + m_pusher.return_value.side_effect = lambda _self, path: _pusher(path) + if raises: + with pytest.raises(BaseException if raises == BaseException else SomeError): + await release.push(artefacts) + else: + assert await release.push(artefacts) == expected + + assert ( + list(m_log.return_value.notice.call_args) + == [(f"Pushing assets for VERSION", ), {}]) + + if raises: + assert ( + list(list(c) for c in m_pusher.return_value.call_args_list) + == [[(release, 'ARTEFACTS0'), {}]]) + assert not m_log.return_value.info.called + else: + assert ( + list(list(c) for c in m_pusher.return_value.call_args_list) + == [[(release, f'ARTEFACTS{x}'), {}] for x in range(0, 5)]) + + if raises or errors: + assert not m_log.return_value.success.called + else: + assert ( + list(m_log.return_value.success.call_args) + == [(f"Assets uploaded: VERSION", ), {}]) + assert ( + list(list(c) for c in m_log.return_value.info.call_args_list) + == [[(f'Release file uploaded ARTEFACTS{i}_ASSET{x}',), {}] + for i in range(0, 5) + for x in range(0, 5)]) From 168d4d328753e0179208e6ff48169bb85588d0b8 Mon Sep 17 00:00:00 2001 From: Ryan Northey Date: Tue, 17 Aug 2021 10:53:24 +0100 Subject: [PATCH 4/5] tooling: Add Github Release Manager util Signed-off-by: Ryan Northey --- tools/github/release/BUILD | 14 + tools/github/release/manager.py | 141 +++++++++ tools/github/release/tests/test_manager.py | 352 +++++++++++++++++++++ 3 files changed, 507 insertions(+) create mode 100644 tools/github/release/manager.py create mode 100644 tools/github/release/tests/test_manager.py diff --git a/tools/github/release/BUILD b/tools/github/release/BUILD index e1250b8134ad3..6ddec8b92d233 100644 --- a/tools/github/release/BUILD +++ b/tools/github/release/BUILD @@ -39,6 +39,20 @@ py_library( srcs = ["exceptions.py"], ) +envoy_py_library( + "tools.github.release.manager", + deps = [ + ":abstract", + ":exceptions", + "//tools/base:abstract", + "//tools/base:functional", + "//tools/base:utils", + requirement("aiohttp"), + requirement("gidgethub"), + requirement("packaging"), + ], +) + envoy_py_library( "tools.github.release.release", deps = [ diff --git a/tools/github/release/manager.py b/tools/github/release/manager.py new file mode 100644 index 0000000000000..1d562fa7422cd --- /dev/null +++ b/tools/github/release/manager.py @@ -0,0 +1,141 @@ +import pathlib +import re +from functools import cached_property +from typing import Dict, List, Optional, Pattern, Type, Union + +import verboselogs # type:ignore + +import packaging.version + +import aiohttp + +import gidgethub.abc +import gidgethub.aiohttp + +from tools.base import abstract +from tools.base.functional import async_property + +from tools.github.release.abstract import AGithubRelease, AGithubReleaseManager +from tools.github.release.exceptions import GithubReleaseError +# from tools.github.release.release import GithubRelease + +VERSION_MIN = packaging.version.Version("0") + + +@abstract.implementer(AGithubReleaseManager) +class GithubReleaseManager: + + _version_re = r"v(\w+)" + _version_format = "v{version}" + + def __init__( + self, + path: Union[str, pathlib.Path], + repository: str, + continues: Optional[bool] = False, + create: Optional[bool] = True, + user: Optional[str] = None, + oauth_token: Optional[str] = None, + version: Optional[str] = None, + log: Optional[verboselogs.VerboseLogger] = None, + asset_types: Optional[Dict[str, Pattern[str]]] = None, + github: Optional[gidgethub.abc.GitHubAPI] = None, + session: Optional[aiohttp.ClientSession] = None) -> None: + self.version = version + self._path = path + self.repository = repository + self.continues = continues + self._log = log + self.oauth_token = oauth_token + self.user = user + self._asset_types = asset_types + self._github = github + self._session = session + self.create = create + + async def __aenter__(self) -> AGithubReleaseManager: + return self + + async def __aexit__(self, *args) -> None: + await self.close() + + def __getitem__(self, version) -> AGithubRelease: + # return self.release_class(self, version) + raise NotImplementedError + + @property + def release_class(self) -> Type[AGithubRelease]: + # return GithubRelease + raise NotImplementedError + + @cached_property + def github(self) -> gidgethub.abc.GitHubAPI: + return ( + self._github + or gidgethub.aiohttp.GitHubAPI(self.session, self.user, oauth_token=self.oauth_token)) + + @cached_property + def log(self) -> verboselogs.VerboseLogger: + return self._log or verboselogs.VerboseLogger(__name__) + + @cached_property + def path(self) -> pathlib.Path: + return pathlib.Path(self._path) + + @async_property + async def latest(self) -> Dict[str, packaging.version.Version]: + latest = {} + for release in await self.releases: + version = self.parse_version(release["tag_name"]) + if not version: + continue + latest[str(version)] = version + minor = f"{version.major}.{version.minor}" + if version > latest.get(minor, self.version_min): + latest[minor] = version + return latest + + @async_property + async def releases(self) -> List[Dict]: + # paging ? + return await self.github.getitem(str(self.releases_url)) + + @cached_property + def releases_url(self) -> pathlib.PurePosixPath: + return pathlib.PurePosixPath(f"/repos/{self.repository}/releases") + + @cached_property + def session(self) -> aiohttp.ClientSession: + return self._session or aiohttp.ClientSession() + + @property + def version_min(self) -> packaging.version.Version: + return VERSION_MIN + + @cached_property + def version_re(self) -> Pattern[str]: + return re.compile(self._version_re) + + async def close(self) -> None: + if not "session" in self.__dict__: + return + await self.session.close() + del self.__dict__["session"] + + def fail(self, message: str) -> str: + if not self.continues: + raise GithubReleaseError(message) + self.log.warning(message) + return message + + def format_version(self, version: Union[str, packaging.version.Version]) -> str: + return self._version_format.format(version=version) + + def parse_version(self, version: str) -> Optional[packaging.version.Version]: + _version = self.version_re.sub(r"\1", version) + if _version: + try: + return packaging.version.Version(_version) + except packaging.version.InvalidVersion: + pass + self.log.warning(f"Unable to parse version: {version}") diff --git a/tools/github/release/tests/test_manager.py b/tools/github/release/tests/test_manager.py new file mode 100644 index 0000000000000..fc912ba4edd4a --- /dev/null +++ b/tools/github/release/tests/test_manager.py @@ -0,0 +1,352 @@ + +from unittest.mock import AsyncMock, MagicMock, PropertyMock + +import pytest + +import packaging.version + +from tools.base.functional import async_property +from tools.github.release import manager, exceptions as github_errors + + +@pytest.mark.parametrize("continues", [None, True, False]) +@pytest.mark.parametrize("create", [None, True, False]) +@pytest.mark.parametrize("user", [None, "USER"]) +@pytest.mark.parametrize("oauth_token", [None, "OAUTH TOKEN"]) +@pytest.mark.parametrize("log", [None, "LOG"]) +@pytest.mark.parametrize("asset_types", [None, "ASSET TYPES"]) +@pytest.mark.parametrize("github", [None, "GITHUB"]) +@pytest.mark.parametrize("session", [None, "SESSION"]) +def test_release_manager_constructor(continues, create, user, oauth_token, log, asset_types, github, session): + kwargs = dict( + continues=continues, + create=create, + user=user, + oauth_token=oauth_token, + log=log, + asset_types=asset_types, + github=github, + session=session) + kwargs = {k: v for k, v in kwargs.items() if v is not None} + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY", **kwargs) + assert releaser._path == "PATH" + assert releaser.repository == "REPOSITORY" + assert releaser.continues == (continues if continues is not None else False) + assert releaser.create == (create if create is not None else True) + assert releaser._log == log + assert releaser.oauth_token == oauth_token + assert releaser.user == user + assert releaser._asset_types == asset_types + assert releaser._github == github + assert releaser._session == session + + assert releaser._version_re == r"v(\w+)" + assert ( + releaser.version_min + == manager.VERSION_MIN + == packaging.version.Version("0")) + assert "version_min" not in releaser.__dict__ + + +@pytest.mark.asyncio +async def test_release_manager_async_contextmanager(patches): + patched = patches( + ("GithubReleaseManager.close", dict(new_callable=AsyncMock)), + prefix="tools.github.release.manager") + + with patched as (m_close, ): + async with manager.GithubReleaseManager("PATH", "REPOSITORY") as releaser: + assert isinstance(releaser, manager.GithubReleaseManager) + assert not m_close.called + assert ( + list(m_close.call_args) + == [(), {}]) + + +def test_release_manager_dunder_getitem(): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + + with pytest.raises(NotImplementedError): + releaser["X.Y.Z"] + + +@pytest.mark.parametrize("oauth_token", [None, "OAUTH_TOKEN"]) +@pytest.mark.parametrize("user", [None, "USER"]) +@pytest.mark.parametrize("github", [True, False]) +def test_release_manager_github(patches, oauth_token, user, github): + kwargs = {} + if oauth_token: + kwargs["oauth_token"] = oauth_token + if user: + kwargs["user"] = user + if github: + kwargs["github"] = "GITHUB" + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY", **kwargs) + patched = patches( + "gidgethub", + ("GithubReleaseManager.session", dict(new_callable=PropertyMock)), + prefix="tools.github.release.manager") + + with patched as (m_api, m_session): + assert ( + releaser.github + == (m_api.aiohttp.GitHubAPI.return_value + if not github + else "GITHUB")) + + assert "github" in releaser.__dict__ + if github: + assert not m_api.aiohttp.GitHubAPI.called + return + assert ( + list(m_api.aiohttp.GitHubAPI.call_args) + == [(m_session.return_value, user), + {'oauth_token': oauth_token}]) + + +@pytest.mark.parametrize("log", [True, False]) +def test_release_manager_log(patches, log): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "verboselogs", + prefix="tools.github.release.manager") + if log: + releaser._log = "LOG" + + with patched as (m_log, ): + assert ( + releaser.log + == (m_log.VerboseLogger.return_value + if not log + else "LOG")) + + assert "log" in releaser.__dict__ + + if log: + assert not m_log.VerboseLogger.called + return + + assert ( + list(m_log.VerboseLogger.call_args) + == [('tools.github.release.manager',), {}]) + + +def test_release_manager_path(patches): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "pathlib", + prefix="tools.github.release.manager") + + with patched as (m_plib, ): + assert ( + releaser.path + == m_plib.Path.return_value) + + assert "path" in releaser.__dict__ + assert ( + list(m_plib.Path.call_args) + == [('PATH',), {}]) + + +@pytest.mark.asyncio +async def test_release_manager_latest(patches): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "GithubReleaseManager.parse_version", + ("GithubReleaseManager.releases", dict(new_callable=PropertyMock)), + prefix="tools.github.release.manager") + + _versions = [dict(tag_name=v) for v in ("1.19.2", "X", "1.19.1", "Y_Z", "1.20.3", "", "0.0.1")] + + with patched as (m_version, m_releases): + m_version.side_effect = lambda version: (packaging.version.Version(version) if "." in version else None) + m_releases.side_effect = AsyncMock(return_value=_versions) + result = await releaser.latest + + assert ( + result + == {'0.0.1': packaging.version.Version('0.0.1'), + '0.0': packaging.version.Version('0.0.1'), + '1.19.2': packaging.version.Version('1.19.2'), + '1.19': packaging.version.Version('1.19.2'), + '1.19.1': packaging.version.Version('1.19.1'), + '1.20.3': packaging.version.Version('1.20.3'), + '1.20': packaging.version.Version('1.20.3')}) + assert not hasattr(releaser, "__async_prop_cache__") + + +@pytest.mark.asyncio +async def test_release_manager_releases(patches): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + ("GithubReleaseManager.github", dict(new_callable=PropertyMock)), + ("GithubReleaseManager.releases_url", dict(new_callable=PropertyMock)), + prefix="tools.github.release.manager") + + with patched as (m_github, m_releases): + m_github.return_value.getitem = AsyncMock() + assert await releaser.releases == m_github.return_value.getitem.return_value + + assert ( + list(m_github.return_value.getitem.call_args) + == [(str(m_releases.return_value), ), {}]) + assert not hasattr(releaser, async_property.cache_name) + + +def test_release_manager_releases_url(patches): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "pathlib", + prefix="tools.github.release.manager") + + with patched as (m_plib, ): + assert releaser.releases_url == m_plib.PurePosixPath.return_value + + assert ( + list(m_plib.PurePosixPath.call_args) + == [(f"/repos/REPOSITORY/releases", ), {}]) + assert "releases_url" in releaser.__dict__ + + +@pytest.mark.parametrize("session", [True, False]) +def test_release_manager_session(patches, session): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "aiohttp", + prefix="tools.github.release.manager") + if session: + releaser._session = "SESSION" + + with patched as (m_http, ): + assert ( + releaser.session + == (m_http.ClientSession.return_value + if not session + else "SESSION")) + + assert "session" in releaser.__dict__ + if session: + assert not m_http.ClientSession.called + return + assert ( + list(m_http.ClientSession.call_args) + == [(), {}]) + + +def test_release_manager_version_re(patches): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "re", + prefix="tools.github.release.manager") + releaser._version_re = "VERSION RE" + + with patched as (m_re, ): + assert releaser.version_re == m_re.compile.return_value + + assert ( + list(m_re.compile.call_args) + == [("VERSION RE", ), {}]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("session", [True, False]) +async def test_release_manager_close(patches, session): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + ("GithubReleaseManager.session", dict(new_callable=PropertyMock)), + prefix="tools.github.release.manager") + + if session: + releaser.__dict__["session"] = "SESSION" + + with patched as (m_session, ): + m_session.return_value.close = AsyncMock() + assert not await releaser.close() + + assert "session" not in releaser.__dict__ + + if not session: + assert not m_session.called + return + + assert ( + list(m_session.return_value.close.call_args) + == [(), {}]) + + +@pytest.mark.parametrize("continues", [True, False]) +def test_release_manager_fail(patches, continues): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY", continues=continues) + parser = MagicMock() + patched = patches( + ("GithubReleaseManager.log", dict(new_callable=PropertyMock)), + prefix="tools.github.release.manager") + + with patched as (m_log, ): + if continues: + assert ( + releaser.fail("MESSAGE") + == "MESSAGE") + else: + with pytest.raises(github_errors.GithubReleaseError): + releaser.fail("MESSAGE") + + if not continues: + assert not m_log.return_value.warning.called + return + + assert ( + list(m_log.return_value.warning.call_args) + == [("MESSAGE", ), {}]) + + +def test_release_manager_format_version(): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + releaser._version_format = MagicMock() + assert releaser.format_version("VERSION") == releaser._version_format.format.return_value + assert ( + list(releaser._version_format.format.call_args) + == [(), dict(version="VERSION")]) + + +@pytest.mark.parametrize("version", [None, 0, "", "1.2.3"]) +@pytest.mark.parametrize("raises", [None, BaseException, packaging.version.InvalidVersion]) +def test_release_manager_parse_version(patches, version, raises): + releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "packaging.version.Version", + ("GithubReleaseManager.log", dict(new_callable=PropertyMock)), + ("GithubReleaseManager.version_re", dict(new_callable=PropertyMock)), + prefix="tools.github.release.manager") + + with patched as (m_packaging, m_log, m_version): + m_version.return_value.sub.return_value = version + if raises: + m_packaging.side_effect = raises() + + if version and raises == BaseException: + with pytest.raises(BaseException): + releaser.parse_version("VERSION") + else: + assert ( + releaser.parse_version("VERSION") + == (None + if not version or raises + else m_packaging.return_value)) + + assert ( + list(m_version.return_value.sub.call_args) + == [(r"\1", "VERSION"), {}]) + if version: + assert ( + list(m_packaging.call_args) + == [(m_version.return_value.sub.return_value, ), {}]) + else: + assert not m_packaging.called + + if not version or raises and raises != BaseException: + assert ( + list(m_log.return_value.warning.call_args) + == [("Unable to parse version: VERSION", ), {}]) + else: + assert not m_log.called From ec058dff88322bd605044ad3d7dcbeae4401cced Mon Sep 17 00:00:00 2001 From: Ryan Northey Date: Tue, 17 Aug 2021 10:53:24 +0100 Subject: [PATCH 5/5] tooling: Add github release_manager util Signed-off-by: Ryan Northey --- tools/github/release/BUILD | 3 +++ tools/github/release/manager.py | 8 +++----- tools/github/release/release.py | 8 +++----- tools/github/release/tests/test_exceptions.py | 4 ++++ tools/github/release/tests/test_manager.py | 13 ++++++++++--- 5 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 tools/github/release/tests/test_exceptions.py diff --git a/tools/github/release/BUILD b/tools/github/release/BUILD index 6ddec8b92d233..b9a1cbcb83071 100644 --- a/tools/github/release/BUILD +++ b/tools/github/release/BUILD @@ -44,6 +44,7 @@ envoy_py_library( deps = [ ":abstract", ":exceptions", + ":release", "//tools/base:abstract", "//tools/base:functional", "//tools/base:utils", @@ -60,7 +61,9 @@ envoy_py_library( "//tools/base:functional", "//tools/base:utils", ":abstract", + ":assets", ":exceptions", + requirement("aiohttp"), requirement("gidgethub"), ], ) diff --git a/tools/github/release/manager.py b/tools/github/release/manager.py index 1d562fa7422cd..81922dc3da956 100644 --- a/tools/github/release/manager.py +++ b/tools/github/release/manager.py @@ -17,7 +17,7 @@ from tools.github.release.abstract import AGithubRelease, AGithubReleaseManager from tools.github.release.exceptions import GithubReleaseError -# from tools.github.release.release import GithubRelease +from tools.github.release.release import GithubRelease VERSION_MIN = packaging.version.Version("0") @@ -60,13 +60,11 @@ async def __aexit__(self, *args) -> None: await self.close() def __getitem__(self, version) -> AGithubRelease: - # return self.release_class(self, version) - raise NotImplementedError + return self.release_class(self, version) @property def release_class(self) -> Type[AGithubRelease]: - # return GithubRelease - raise NotImplementedError + return GithubRelease @cached_property def github(self) -> gidgethub.abc.GitHubAPI: diff --git a/tools/github/release/release.py b/tools/github/release/release.py index d26354d5a812d..d6a3dfb220e1b 100644 --- a/tools/github/release/release.py +++ b/tools/github/release/release.py @@ -16,7 +16,7 @@ from tools.github.release.abstract import ( AGithubRelease, AGithubReleaseAssetsFetcher, AGithubReleaseAssetsPusher, AGithubReleaseManager) -# from tools.github.release.assets import GithubReleaseAssetsFetcher, GithubReleaseAssetsPusher +from tools.github.release.assets import GithubReleaseAssetsFetcher, GithubReleaseAssetsPusher from tools.github.release.exceptions import GithubReleaseError @@ -57,8 +57,7 @@ async def exists(self) -> bool: @property def fetcher(self) -> Type[AGithubReleaseAssetsFetcher]: - # return GithubReleaseAssetsFetcher - raise NotImplementedError + return GithubReleaseAssetsFetcher @property def github(self) -> gidgethub.abc.GitHubAPI: @@ -70,8 +69,7 @@ def log(self) -> verboselogs.VerboseLogger: @property def pusher(self) -> Type[AGithubReleaseAssetsPusher]: - # return GithubReleaseAssetsPusher - raise NotImplementedError + return GithubReleaseAssetsPusher @async_property(cache=True) async def release(self) -> Dict: diff --git a/tools/github/release/tests/test_exceptions.py b/tools/github/release/tests/test_exceptions.py new file mode 100644 index 0000000000000..84669ab45c2fc --- /dev/null +++ b/tools/github/release/tests/test_exceptions.py @@ -0,0 +1,4 @@ + + +def test_nothing(): + pass diff --git a/tools/github/release/tests/test_manager.py b/tools/github/release/tests/test_manager.py index fc912ba4edd4a..7ab36da50a8b0 100644 --- a/tools/github/release/tests/test_manager.py +++ b/tools/github/release/tests/test_manager.py @@ -63,11 +63,18 @@ async def test_release_manager_async_contextmanager(patches): == [(), {}]) -def test_release_manager_dunder_getitem(): +def test_release_manager_dunder_getitem(patches): releaser = manager.GithubReleaseManager("PATH", "REPOSITORY") + patched = patches( + "GithubRelease", + prefix="tools.github.release.manager") - with pytest.raises(NotImplementedError): - releaser["X.Y.Z"] + with patched as (m_release, ): + assert releaser["X.Y.Z"] == m_release.return_value + + assert ( + list(m_release.call_args) + == [(releaser, "X.Y.Z"), {}]) @pytest.mark.parametrize("oauth_token", [None, "OAUTH_TOKEN"])