From 41254a2120fa84e2647205bb138249b8553f6d6c Mon Sep 17 00:00:00 2001 From: Nicholas Junge Date: Wed, 17 Jul 2024 10:03:14 +0200 Subject: [PATCH] Batch objects in `Branch.delete_objects` calls (#285) lakeFS supports a maximum of 1000 objects per `delete_objects()` invocation, since that calls the object deletion API under the hood. This limitation is not without precedent, and exists also e.g. on AWS S3. Hence, we batch delete objects on the client side, dispatching multiple API calls in the cases where the deleted objects list contains more than a thousand objects. The threshold number of objects is currently hardcoded to 1000, since that is the limitation of the lakeFS server. --- src/lakefs_spec/spec.py | 18 +++++++++++++----- src/lakefs_spec/util.py | 20 +++++++++++++++++++- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/lakefs_spec/spec.py b/src/lakefs_spec/spec.py index fc6f6a58..e1a09ba6 100644 --- a/src/lakefs_spec/spec.py +++ b/src/lakefs_spec/spec.py @@ -27,10 +27,12 @@ from lakefs_spec.errors import translate_lakefs_error from lakefs_spec.transaction import LakeFSTransaction -from lakefs_spec.util import md5_checksum, parse +from lakefs_spec.util import batched, md5_checksum, parse logger = logging.getLogger("lakefs-spec") +MAX_DELETE_OBJS = 1000 + class LakeFSFileSystem(AbstractFileSystem): """ @@ -713,12 +715,18 @@ def rm( with self.wrapped_api_call(rpath=path): branch = lakefs.Branch(repository, ref, client=self.client) - objgen = branch.objects(prefix=prefix, delimiter="" if recursive else "/") + objgen_batched = batched( + branch.objects(prefix=prefix, delimiter="" if recursive else "/"), n=MAX_DELETE_OBJS + ) if maxdepth is None: - branch.delete_objects(obj.path for obj in objgen) + for objgen in objgen_batched: + branch.delete_objects(obj.path for obj in objgen) else: - # nesting level is just the amount of "/"s in the path, no leading "/". - branch.delete_objects(obj.path for obj in objgen if obj.path.count("/") <= maxdepth) + for objgen in objgen_batched: + # nesting level is just the amount of "/"s in the path, no leading "/". + branch.delete_objects( + obj.path for obj in objgen if obj.path.count("/") <= maxdepth + ) # Directory listing cache for the containing folder must be invalidated self.dircache.pop(self._parent(path), None) diff --git a/src/lakefs_spec/util.py b/src/lakefs_spec/util.py index 83422328..df16519e 100644 --- a/src/lakefs_spec/util.py +++ b/src/lakefs_spec/util.py @@ -5,9 +5,11 @@ from __future__ import annotations import hashlib +import itertools import os import re -from typing import Any, Callable, Generator, Protocol +import sys +from typing import Any, Callable, Generator, Iterable, Protocol from lakefs_sdk import Pagination from lakefs_sdk import __version__ as __lakefs_sdk_version__ @@ -49,6 +51,22 @@ def depaginate( kwargs["after"] = resp.pagination.next_offset +def batched(iterable: Iterable, n: int) -> Iterable[tuple]: + pyversion = tuple(sys.version_info[:3]) + # itertools.batched was added in Python 3.12. + if pyversion >= (3, 12): + # TODO(nicholasjng): Remove once target Python version is 3.12 + return itertools.batched(iterable, n) # type: ignore + else: + # "roughly equivalent" block from + # https://docs.python.org/3/library/itertools.html#itertools.batched + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(itertools.islice(iterator, n)): + yield batch + + def md5_checksum(lpath: str | os.PathLike[str], blocksize: int = 2**22) -> str: """ Calculate a local file's MD5 hash.