Skip to content

Commit

Permalink
Splitting remove files requests into 100-chunk batched requests. (#496)
Browse files Browse the repository at this point in the history
* Splitting remove files requests into 100-chunk batched reqeust.

Fixes #495.

* rm unused import.

* better docstring.

* ran lint

* added a test to exercise chunking.
  • Loading branch information
alxmrs authored Sep 1, 2022
1 parent 7108ade commit 5255214
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 30 deletions.
77 changes: 47 additions & 30 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def _location():
)


def _chunks(lst, n):
"""
Yield evenly-sized chunks from a list.
Implementation based on https://stackoverflow.com/a/312464.
"""
for i in range(0, len(lst), n):
yield lst[i : i + n]


class GCSFileSystem(AsyncFileSystem):
r"""
Connect to Google Cloud Storage.
Expand Down Expand Up @@ -919,37 +929,44 @@ async def _rm_files(self, paths):
"Content-Type: application/json\n"
"accept: application/json\ncontent-length: 0\n"
)
body = "".join(
[
template.format(
i=i + 1,
bucket=p.split("/", 1)[0],
key=quote_plus(p.split("/", 1)[1]),
)
for i, p in enumerate(paths)
]
)
headers, content = await self._call(
"POST",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)
errors = []
# Splitting requests into 100 chunk batches
# See https://cloud.google.com/storage/docs/batch
for chunk in _chunks(paths, 100):
body = "".join(
[
template.format(
i=i + 1,
bucket=p.split("/", 1)[0],
key=quote_plus(p.split("/", 1)[1]),
)
for i, p in enumerate(chunk)
]
)
headers, content = await self._call(
"POST",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)

boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
txt = content.decode()
if any(
not ("200 OK" in c or "204 No Content" in c)
for c in txt.split(boundary)[1:-1]
):
pattern = '"message": "([^"]+)"'
out = set(re.findall(pattern, txt))
raise OSError(out)
boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
txt = content.decode()
if any(
not ("200 OK" in c or "204 No Content" in c)
for c in txt.split(boundary)[1:-1]
):
pattern = '"message": "([^"]+)"'
out = set(re.findall(pattern, txt))
errors.extend(out)

if errors:
raise OSError(errors)

@property
def on_google(self):
Expand Down
16 changes: 16 additions & 0 deletions gcsfs/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ def test_rm_recursive(gcs):
assert not gcs.exists(TEST_BUCKET + files[-1])


def test_rm_chunked_batch(gcs):
files = [f"{TEST_BUCKET}/t{i}" for i in range(303)]
for fn in files:
gcs.touch(fn)

files_created = gcs.find(TEST_BUCKET)
for fn in files:
assert fn in files_created

gcs.rm(files)

files_removed = gcs.find(TEST_BUCKET)
for fn in files:
assert fn not in files_removed


def test_file_access(gcs):
fn = TEST_BUCKET + "/nested/file1"
data = b"hello\n"
Expand Down

0 comments on commit 5255214

Please sign in to comment.