Skip to content

Commit

Permalink
Batch objects in Branch.delete_objects calls (#285)
Browse files Browse the repository at this point in the history
lakeFS supports a maximum of 1000 objects per `delete_objects()`
invocation, since that calls the object deletion API under the hood.
This limitation is not without precedent, and exists also e.g. on
AWS S3.

Hence, we batch delete objects on the client side, dispatching multiple
API calls in the cases where the deleted objects list contains more
than a thousand objects.

The threshold number of objects is currently hardcoded to 1000, since
that is the limitation of the lakeFS server.
  • Loading branch information
nicholasjng committed Jul 17, 2024
1 parent 361dbfc commit 41254a2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
18 changes: 13 additions & 5 deletions src/lakefs_spec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

from lakefs_spec.errors import translate_lakefs_error
from lakefs_spec.transaction import LakeFSTransaction
from lakefs_spec.util import md5_checksum, parse
from lakefs_spec.util import batched, md5_checksum, parse

logger = logging.getLogger("lakefs-spec")

MAX_DELETE_OBJS = 1000


class LakeFSFileSystem(AbstractFileSystem):
"""
Expand Down Expand Up @@ -713,12 +715,18 @@ def rm(

with self.wrapped_api_call(rpath=path):
branch = lakefs.Branch(repository, ref, client=self.client)
objgen = branch.objects(prefix=prefix, delimiter="" if recursive else "/")
objgen_batched = batched(
branch.objects(prefix=prefix, delimiter="" if recursive else "/"), n=MAX_DELETE_OBJS
)
if maxdepth is None:
branch.delete_objects(obj.path for obj in objgen)
for objgen in objgen_batched:
branch.delete_objects(obj.path for obj in objgen)
else:
# nesting level is just the amount of "/"s in the path, no leading "/".
branch.delete_objects(obj.path for obj in objgen if obj.path.count("/") <= maxdepth)
for objgen in objgen_batched:
# nesting level is just the amount of "/"s in the path, no leading "/".
branch.delete_objects(
obj.path for obj in objgen if obj.path.count("/") <= maxdepth
)

# Directory listing cache for the containing folder must be invalidated
self.dircache.pop(self._parent(path), None)
Expand Down
20 changes: 19 additions & 1 deletion src/lakefs_spec/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from __future__ import annotations

import hashlib
import itertools
import os
import re
from typing import Any, Callable, Generator, Protocol
import sys
from typing import Any, Callable, Generator, Iterable, Protocol

from lakefs_sdk import Pagination
from lakefs_sdk import __version__ as __lakefs_sdk_version__
Expand Down Expand Up @@ -49,6 +51,22 @@ def depaginate(
kwargs["after"] = resp.pagination.next_offset


def batched(iterable: Iterable, n: int) -> Iterable[tuple]:
pyversion = tuple(sys.version_info[:3])
# itertools.batched was added in Python 3.12.
if pyversion >= (3, 12):
# TODO(nicholasjng): Remove once target Python version is 3.12
return itertools.batched(iterable, n) # type: ignore
else:
# "roughly equivalent" block from
# https://docs.python.org/3/library/itertools.html#itertools.batched
if n < 1:
raise ValueError("n must be at least one")
iterator = iter(iterable)
while batch := tuple(itertools.islice(iterator, n)):
yield batch


def md5_checksum(lpath: str | os.PathLike[str], blocksize: int = 2**22) -> str:
"""
Calculate a local file's MD5 hash.
Expand Down

0 comments on commit 41254a2

Please sign in to comment.