From 91ce5ea8d9f871e91249b145fa8f3bb41be9f67c Mon Sep 17 00:00:00 2001 From: Sami Jawhar Date: Wed, 3 Dec 2025 03:50:21 +0000 Subject: [PATCH 1/9] Add optional bulk_remote_exists --- src/dvc_data/index/index.py | 123 ++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index 283066ad..52fee3e8 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -3,6 +3,7 @@ import os from abc import ABC, abstractmethod from collections.abc import Iterator, MutableMapping +from concurrent.futures import ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING, Any, Callable, Optional, cast import attrs @@ -224,6 +225,76 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool: finally: self.index.commit() + def bulk_exists( + self, + entries: list["DataIndexEntry"], + refresh: bool = False, + max_workers: int = 20, + ) -> dict["DataIndexEntry", bool]: + results = {} + + if not entries: + return results + + entries_with_hash = [e for e in entries if e.hash_info] + entries_without_hash = [e for e in entries if not e.hash_info] + + for entry in entries_without_hash: + results[entry] = False + + if self.index is None: + for entry in entries_with_hash: + value = cast("str", entry.hash_info.value) + results[entry] = self.odb.exists(value) + return results + + if not refresh: + for entry in entries_with_hash: + value = cast("str", entry.hash_info.value) + key = self.odb._oid_parts(value) + results[entry] = key in self.index + return results + + fs = self.fs + + def check_exists(entry: "DataIndexEntry") -> tuple["DataIndexEntry", bool]: + try: + _, path = self.get(entry) + return entry, fs.exists(path) + except Exception: + return entry, False + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(check_exists, entry): entry + for entry in entries_with_hash + } + + for future in as_completed(futures): + entry, exists = future.result() + value = cast("str", entry.hash_info.value) + key = self.odb._oid_parts(value) + + if exists: + from .build import build_entry + + try: + _, path = self.get(entry) + built_entry = build_entry(path, fs) + self.index[key] = built_entry + results[entry] = True + except Exception: + self.index.pop(key, None) + results[entry] = False + else: + self.index.pop(key, None) + results[entry] = False + + if self.index is not None: + self.index.commit() + + return results + class FileStorage(Storage): def __init__( @@ -442,6 +513,58 @@ def remote_exists(self, entry: "DataIndexEntry", **kwargs) -> bool: return storage.remote.exists(entry, **kwargs) + def bulk_cache_exists( + self, entries: list[DataIndexEntry], **kwargs + ) -> dict[DataIndexEntry, bool]: + by_storage: dict[Optional[Storage], list[DataIndexEntry]] = {} + for entry in entries: + storage_info = self[entry.key] + storage = storage_info.cache if storage_info else None + if storage not in by_storage: + by_storage[storage] = [] + by_storage[storage].append(entry) + + results = {} + for storage, storage_entries in by_storage.items(): + if storage is None: + for entry in storage_entries: + raise StorageKeyError(entry.key) + + if hasattr(storage, "bulk_exists"): + storage_results = storage.bulk_exists(storage_entries, **kwargs) + results.update(storage_results) + else: + for entry in storage_entries: + results[entry] = storage.exists(entry, **kwargs) + + return results + + def bulk_remote_exists( + self, entries: list[DataIndexEntry], **kwargs + ) -> dict[DataIndexEntry, bool]: + by_storage: dict[Optional[Storage], list[DataIndexEntry]] = {} + for entry in entries: + storage_info = self[entry.key] + storage = storage_info.remote if storage_info else None + if storage not in by_storage: + by_storage[storage] = [] + by_storage[storage].append(entry) + + results = {} + for storage, storage_entries in by_storage.items(): + if storage is None: + for entry in storage_entries: + raise StorageKeyError(entry.key) + + if hasattr(storage, "bulk_exists"): + storage_results = storage.bulk_exists(storage_entries, **kwargs) + results.update(storage_results) + else: + for entry in storage_entries: + results[entry] = storage.exists(entry, **kwargs) + + return results + class BaseDataIndex(ABC, MutableMapping[DataIndexKey, DataIndexEntry]): storage_map: StorageMapping From d06e149b8fc665996525055eb99b00c0ce49c9d4 Mon Sep 17 00:00:00 2001 From: Falko Galperin Date: Sun, 7 Dec 2025 00:50:27 +0100 Subject: [PATCH 2/9] feat: improve bulk_remote_exists and bulk_cache_exists - use return_exceptions=True for batch retrieval - skip unnecessary network calls by accepting cached_info - do a single fs.info call, then pass that info to build_entry - we group storage instances by their underlying ODB path to unify batches and perform the fs.info call for the entire batch --- src/dvc_data/index/index.py | 193 +++++++++++++++++++++++------------- 1 file changed, 122 insertions(+), 71 deletions(-) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index 52fee3e8..3380bd3c 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -2,11 +2,13 @@ import logging import os from abc import ABC, abstractmethod +from collections import defaultdict from collections.abc import Iterator, MutableMapping -from concurrent.futures import ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING, Any, Callable, Optional, cast import attrs +from fsspec import Callback +from fsspec.callbacks import DEFAULT_CALLBACK from sqltrie import JSONTrie, PyGTrie, ShortKeyError, SQLiteTrie from dvc_data.compat import cached_property @@ -229,7 +231,9 @@ def bulk_exists( self, entries: list["DataIndexEntry"], refresh: bool = False, - max_workers: int = 20, + max_workers: int | None = None, + callback: "Callback" = DEFAULT_CALLBACK, + cached_info: dict[str, Any] | None = None, ) -> dict["DataIndexEntry", bool]: results = {} @@ -239,56 +243,55 @@ def bulk_exists( entries_with_hash = [e for e in entries if e.hash_info] entries_without_hash = [e for e in entries if not e.hash_info] - for entry in entries_without_hash: + for entry in callback.wrap(entries_without_hash): results[entry] = False if self.index is None: - for entry in entries_with_hash: + for entry in callback.wrap(entries_with_hash): + assert entry.hash_info value = cast("str", entry.hash_info.value) results[entry] = self.odb.exists(value) return results if not refresh: - for entry in entries_with_hash: + for entry in callback.wrap(entries_with_hash): + assert entry.hash_info value = cast("str", entry.hash_info.value) key = self.odb._oid_parts(value) results[entry] = key in self.index return results - fs = self.fs - - def check_exists(entry: "DataIndexEntry") -> tuple["DataIndexEntry", bool]: - try: - _, path = self.get(entry) - return entry, fs.exists(path) - except Exception: - return entry, False - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = { - executor.submit(check_exists, entry): entry - for entry in entries_with_hash - } + entry_map: dict[str, DataIndexEntry] = { + self.get(entry)[1]: entry for entry in entries_with_hash + } + if cached_info is not None: + # Instead of doing the network call, we use the pre-computed info. + info_results = [ + cached_info.get(path) for path in callback.wrap(entry_map.keys()) + ] + else: + info_results = self.fs.info( + list(entry_map.keys()), + batch_size=max_workers, + return_exceptions=True, + callback=callback, + ) - for future in as_completed(futures): - entry, exists = future.result() - value = cast("str", entry.hash_info.value) - key = self.odb._oid_parts(value) + results = {} + for (path, entry), info in zip(entry_map.items(), info_results): + assert entry.hash_info # built from entries_with_hash + value = cast("str", entry.hash_info.value) + key = self.odb._oid_parts(value) + + if isinstance(info, Exception) or info is None: + self.index.pop(key, None) + results[entry] = False + else: + from .build import build_entry - if exists: - from .build import build_entry - - try: - _, path = self.get(entry) - built_entry = build_entry(path, fs) - self.index[key] = built_entry - results[entry] = True - except Exception: - self.index.pop(key, None) - results[entry] = False - else: - self.index.pop(key, None) - results[entry] = False + built_entry = build_entry(path, self.fs, info=info) + self.index[key] = built_entry + results[entry] = True if self.index is not None: self.index.commit() @@ -513,58 +516,106 @@ def remote_exists(self, entry: "DataIndexEntry", **kwargs) -> bool: return storage.remote.exists(entry, **kwargs) - def bulk_cache_exists( - self, entries: list[DataIndexEntry], **kwargs + def _bulk_storage_exists( + self, + entries: list[DataIndexEntry], + storage_selector: Callable[["StorageInfo"], Optional["Storage"]], + callback: Callback = DEFAULT_CALLBACK, + **kwargs, ) -> dict[DataIndexEntry, bool]: - by_storage: dict[Optional[Storage], list[DataIndexEntry]] = {} + by_storage: dict[Optional[Storage], list[DataIndexEntry]] = defaultdict(list) for entry in entries: storage_info = self[entry.key] - storage = storage_info.cache if storage_info else None - if storage not in by_storage: - by_storage[storage] = [] + storage = storage_selector(storage_info) if storage_info else None by_storage[storage].append(entry) results = {} + + # Unify batches per actual underlying ODB path. + # Maps from (storage_type, odb_path) to [(StorageInstance, entries)] + odb_batches: dict[ + tuple[type, str | None], list[tuple[ObjectStorage, list[DataIndexEntry]]] + ] = defaultdict(list) + for storage, storage_entries in by_storage.items(): if storage is None: for entry in storage_entries: raise StorageKeyError(entry.key) + continue - if hasattr(storage, "bulk_exists"): - storage_results = storage.bulk_exists(storage_entries, **kwargs) - results.update(storage_results) - else: - for entry in storage_entries: - results[entry] = storage.exists(entry, **kwargs) - - return results + if not isinstance(storage, ObjectStorage): + # We won't optimize this and run it normally. + if hasattr(storage, "bulk_exists"): + storage_results = storage.bulk_exists( + storage_entries, callback=callback, **kwargs + ) + results.update(storage_results) + else: + for entry in callback.wrap(storage_entries): + results[entry] = storage.exists(entry, **kwargs) + continue - def bulk_remote_exists( - self, entries: list[DataIndexEntry], **kwargs - ) -> dict[DataIndexEntry, bool]: - by_storage: dict[Optional[Storage], list[DataIndexEntry]] = {} - for entry in entries: - storage_info = self[entry.key] - storage = storage_info.remote if storage_info else None - if storage not in by_storage: - by_storage[storage] = [] - by_storage[storage].append(entry) + key = (type(storage), storage.path) + odb_batches[key].append((storage, storage_entries)) + + # Actually process batches + for storage_groups in odb_batches.values(): + all_paths = [ + storage.get(entry)[1] + for storage, entries in storage_groups + for entry in entries + ] + + # Any storage is representative for this batch + batch_info = storage_groups[0][0].fs.info( + all_paths, + return_exceptions=True, + callback=callback, + ) - results = {} - for storage, storage_entries in by_storage.items(): - if storage is None: - for entry in storage_entries: - raise StorageKeyError(entry.key) + # Maps from path to info + cached_info: dict[str, Any] = { + p: info if not isinstance(info, Exception) else None + for p, info in zip(all_paths, batch_info) + } - if hasattr(storage, "bulk_exists"): - storage_results = storage.bulk_exists(storage_entries, **kwargs) + # Finally, distribute results back to original storages + for storage, storage_entries in storage_groups: + storage_results = storage.bulk_exists( + storage_entries, + cached_info=cached_info, + **kwargs, + ) results.update(storage_results) - else: - for entry in storage_entries: - results[entry] = storage.exists(entry, **kwargs) return results + def bulk_cache_exists( + self, + entries: list[DataIndexEntry], + callback: Callback = DEFAULT_CALLBACK, + **kwargs, + ) -> dict[DataIndexEntry, bool]: + return self._bulk_storage_exists( + entries, + lambda info: info.cache, + callback=callback, + **kwargs, + ) + + def bulk_remote_exists( + self, + entries: list[DataIndexEntry], + callback: Callback = DEFAULT_CALLBACK, + **kwargs, + ) -> dict[DataIndexEntry, bool]: + return self._bulk_storage_exists( + entries, + lambda info: info.remote, + callback=callback, + **kwargs, + ) + class BaseDataIndex(ABC, MutableMapping[DataIndexKey, DataIndexEntry]): storage_map: StorageMapping From 66bc28dee613689897939485cee97de6f05ca9d1 Mon Sep 17 00:00:00 2001 From: Falko Galperin Date: Sun, 7 Dec 2025 17:09:06 +0100 Subject: [PATCH 3/9] fixup! feat: improve bulk_remote_exists and bulk_cache_exists --- src/dvc_data/index/index.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index 3380bd3c..f207ef9f 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -159,6 +159,20 @@ def exists(self, entry: "DataIndexEntry") -> bool: fs, path = self.get(entry) return fs.exists(path) + def bulk_exists( + self, + entries: list["DataIndexEntry"], + refresh: bool = False, + max_workers: int | None = None, + callback: "Callback" = DEFAULT_CALLBACK, + cached_info: dict[str, Any] | None = None, + ) -> dict["DataIndexEntry", bool]: + results = {} + for entry in callback.wrap(entries): + results[entry] = self.exists(entry) + + return results + class ObjectStorage(Storage): def __init__( @@ -283,9 +297,11 @@ def bulk_exists( value = cast("str", entry.hash_info.value) key = self.odb._oid_parts(value) - if isinstance(info, Exception) or info is None: + if isinstance(info, FileNotFoundError) or info is None: self.index.pop(key, None) results[entry] = False + elif isinstance(info, Exception): + raise info else: from .build import build_entry @@ -545,14 +561,10 @@ def _bulk_storage_exists( if not isinstance(storage, ObjectStorage): # We won't optimize this and run it normally. - if hasattr(storage, "bulk_exists"): - storage_results = storage.bulk_exists( - storage_entries, callback=callback, **kwargs - ) - results.update(storage_results) - else: - for entry in callback.wrap(storage_entries): - results[entry] = storage.exists(entry, **kwargs) + storage_results = storage.bulk_exists( + storage_entries, callback=callback, **kwargs + ) + results.update(storage_results) continue key = (type(storage), storage.path) From 5b982120bc6cfcd688d8ce59e26be1222b1de124 Mon Sep 17 00:00:00 2001 From: Sami Jawhar Date: Sun, 14 Dec 2025 03:18:07 +0000 Subject: [PATCH 4/9] Simplify --- src/dvc_data/index/index.py | 67 ++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index f207ef9f..e530df01 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -163,9 +163,9 @@ def bulk_exists( self, entries: list["DataIndexEntry"], refresh: bool = False, - max_workers: int | None = None, + max_workers: Optional[int] = None, callback: "Callback" = DEFAULT_CALLBACK, - cached_info: dict[str, Any] | None = None, + cached_info: Optional[dict[str, Any]] = None, ) -> dict["DataIndexEntry", bool]: results = {} for entry in callback.wrap(entries): @@ -241,39 +241,43 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool: finally: self.index.commit() + def _bulk_exists_from_cache( + self, + entries_with_hash: list["DataIndexEntry"], + callback: "Callback", + ) -> dict["DataIndexEntry", bool]: + results = {} + for entry in callback.wrap(entries_with_hash): + assert entry.hash_info + value = cast("str", entry.hash_info.value) + if self.index is None: + exists = self.odb.exists(value) + else: + key = self.odb._oid_parts(value) + exists = key in self.index + results[entry] = exists + + return results + def bulk_exists( self, entries: list["DataIndexEntry"], refresh: bool = False, - max_workers: int | None = None, + max_workers: Optional[int] = None, callback: "Callback" = DEFAULT_CALLBACK, - cached_info: dict[str, Any] | None = None, + cached_info: Optional[dict[str, Any]] = None, ) -> dict["DataIndexEntry", bool]: - results = {} - if not entries: - return results + return {} entries_with_hash = [e for e in entries if e.hash_info] entries_without_hash = [e for e in entries if not e.hash_info] - for entry in callback.wrap(entries_without_hash): - results[entry] = False - - if self.index is None: - for entry in callback.wrap(entries_with_hash): - assert entry.hash_info - value = cast("str", entry.hash_info.value) - results[entry] = self.odb.exists(value) - return results - - if not refresh: - for entry in callback.wrap(entries_with_hash): - assert entry.hash_info - value = cast("str", entry.hash_info.value) - key = self.odb._oid_parts(value) - results[entry] = key in self.index - return results + if self.index is None or not refresh: + return { + **dict.fromkeys(callback.wrap(entries_without_hash), False), + **self._bulk_exists_from_cache(entries_with_hash, callback), + } entry_map: dict[str, DataIndexEntry] = { self.get(entry)[1]: entry for entry in entries_with_hash @@ -300,14 +304,15 @@ def bulk_exists( if isinstance(info, FileNotFoundError) or info is None: self.index.pop(key, None) results[entry] = False - elif isinstance(info, Exception): + continue + if isinstance(info, Exception): raise info - else: - from .build import build_entry - built_entry = build_entry(path, self.fs, info=info) - self.index[key] = built_entry - results[entry] = True + from .build import build_entry + + built_entry = build_entry(path, self.fs, info=info) + self.index[key] = built_entry + results[entry] = True if self.index is not None: self.index.commit() @@ -550,7 +555,7 @@ def _bulk_storage_exists( # Unify batches per actual underlying ODB path. # Maps from (storage_type, odb_path) to [(StorageInstance, entries)] odb_batches: dict[ - tuple[type, str | None], list[tuple[ObjectStorage, list[DataIndexEntry]]] + tuple[type, Optional[str]], list[tuple[ObjectStorage, list[DataIndexEntry]]] ] = defaultdict(list) for storage, storage_entries in by_storage.items(): From c6dc3d1ee9bc25e06655f8ce165695b751f23abd Mon Sep 17 00:00:00 2001 From: Sami Jawhar Date: Sun, 14 Dec 2025 03:18:07 +0000 Subject: [PATCH 5/9] Tests --- tests/index/test_storage.py | 226 +++++++++++++++++++++++++++++++++++- 1 file changed, 225 insertions(+), 1 deletion(-) diff --git a/tests/index/test_storage.py b/tests/index/test_storage.py index c28d4523..3ced7dc6 100644 --- a/tests/index/test_storage.py +++ b/tests/index/test_storage.py @@ -1,6 +1,17 @@ +import pytest from dvc_objects.fs.local import LocalFileSystem -from dvc_data.index import FileStorage, ObjectStorage, StorageInfo, StorageMapping +from dvc_data.hashfile.hash_info import HashInfo +from dvc_data.hashfile.meta import Meta +from dvc_data.index import ( + DataIndex, + DataIndexEntry, + FileStorage, + ObjectStorage, + StorageInfo, + StorageKeyError, + StorageMapping, +) def test_map_get(tmp_path, odb): @@ -47,3 +58,216 @@ def test_map_get(tmp_path, odb): assert sinfo.data == data assert sinfo.cache == cache assert sinfo.remote == remote + + +class TestObjectStorageBulkExists: + def test_empty_entries(self, odb): + storage = ObjectStorage(key=(), odb=odb) + result = storage.bulk_exists([]) + assert result == {} + + def test_entries_without_hash(self, odb): + storage = ObjectStorage(key=(), odb=odb) + entry = DataIndexEntry(key=("foo",), meta=Meta()) + result = storage.bulk_exists([entry]) + assert result == {entry: False} + + def test_entries_exist_in_odb(self, odb): + storage = ObjectStorage(key=(), odb=odb) + entry = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ) + result = storage.bulk_exists([entry]) + assert result == {entry: True} + + def test_entries_not_in_odb(self, make_odb): + empty_odb = make_odb() + storage = ObjectStorage(key=(), odb=empty_odb) + entry = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "nonexistent"), + ) + result = storage.bulk_exists([entry]) + assert result == {entry: False} + + def test_with_index_no_refresh(self, odb): + index = DataIndex() + key = odb._oid_parts("d3b07384d113edec49eaa6238ad5ff00") + index[key] = DataIndexEntry(key=key) + + storage = ObjectStorage(key=(), odb=odb, index=index) + entry_exists = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ) + entry_not_in_index = DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"), + ) + + result = storage.bulk_exists([entry_exists, entry_not_in_index], refresh=False) + assert result[entry_exists] is True + assert result[entry_not_in_index] is False + + def test_with_index_refresh_existing(self, odb): + index = DataIndex() + storage = ObjectStorage(key=(), odb=odb, index=index) + + entry_exists = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ) + + result = storage.bulk_exists([entry_exists], refresh=True) + assert result[entry_exists] is True + + key_exists = odb._oid_parts("d3b07384d113edec49eaa6238ad5ff00") + assert key_exists in index + + def test_mixed_entries(self, odb): + storage = ObjectStorage(key=(), odb=odb) + entry_with_hash = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ) + entry_without_hash = DataIndexEntry(key=("bar",), meta=Meta()) + + result = storage.bulk_exists([entry_with_hash, entry_without_hash]) + assert result[entry_with_hash] is True + assert result[entry_without_hash] is False + + def test_multiple_entries(self, odb): + storage = ObjectStorage(key=(), odb=odb) + entries = [ + DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ), + DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"), + ), + DataIndexEntry( + key=("baz",), + hash_info=HashInfo("md5", "258622b1688250cb619f3c9ccaefb7eb"), + ), + ] + + result = storage.bulk_exists(entries) + assert all(result[e] is True for e in entries) + + +class TestStorageMappingBulkExists: + def test_bulk_cache_exists_empty(self, odb): + smap = StorageMapping() + smap.add_cache(ObjectStorage(key=(), odb=odb)) + result = smap.bulk_cache_exists([]) + assert result == {} + + def test_bulk_remote_exists_empty(self, odb): + smap = StorageMapping() + smap.add_remote(ObjectStorage(key=(), odb=odb)) + result = smap.bulk_remote_exists([]) + assert result == {} + + def test_bulk_cache_exists_all_exist(self, make_odb): + cache_odb = make_odb() + cache_odb.add_bytes("d3b07384d113edec49eaa6238ad5ff00", b"foo\n") + cache_odb.add_bytes("c157a79031e1c40f85931829bc5fc552", b"bar\n") + + smap = StorageMapping() + smap.add_cache(ObjectStorage(key=(), odb=cache_odb)) + + entries = [ + DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ), + DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"), + ), + ] + + result = smap.bulk_cache_exists(entries) + assert all(result[e] is True for e in entries) + + def test_bulk_remote_exists_all_exist(self, odb): + smap = StorageMapping() + smap.add_remote(ObjectStorage(key=(), odb=odb)) + + entries = [ + DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ), + DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"), + ), + ] + + result = smap.bulk_remote_exists(entries) + assert all(result[e] is True for e in entries) + + def test_bulk_cache_exists_missing_storage(self, odb): + smap = StorageMapping() + smap.add_remote(ObjectStorage(key=(), odb=odb)) + + entry = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ) + + with pytest.raises(StorageKeyError): + smap.bulk_cache_exists([entry]) + + def test_bulk_remote_exists_missing_storage(self, odb): + smap = StorageMapping() + smap.add_cache(ObjectStorage(key=(), odb=odb)) + + entry = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ) + + with pytest.raises(StorageKeyError): + smap.bulk_remote_exists([entry]) + + def test_bulk_exists_multiple_storages(self, make_odb): + cache1 = make_odb() + cache1.add_bytes("hash1", b"data1") + cache2 = make_odb() + cache2.add_bytes("hash2", b"data2") + + smap = StorageMapping() + smap.add_cache(ObjectStorage(key=(), odb=cache1)) + smap.add_cache(ObjectStorage(key=("subdir",), odb=cache2)) + + entry1 = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "hash1"), + ) + entry2 = DataIndexEntry( + key=("subdir", "bar"), + hash_info=HashInfo("md5", "hash2"), + ) + + result = smap.bulk_cache_exists([entry1, entry2]) + assert result[entry1] is True + assert result[entry2] is True + + def test_bulk_cache_exists_with_file_storage(self, tmp_path): + (tmp_path / "foo.txt").write_text("hello") + fs = LocalFileSystem() + + smap = StorageMapping() + smap.add_cache(FileStorage(key=(), fs=fs, path=str(tmp_path))) + + entry_exists = DataIndexEntry(key=("foo.txt",)) + entry_not_exists = DataIndexEntry(key=("bar.txt",)) + + result = smap.bulk_cache_exists([entry_exists, entry_not_exists]) + assert result[entry_exists] is True + assert result[entry_not_exists] is False From b3c055835660f58f2653252fa7d7fa50ee89072e Mon Sep 17 00:00:00 2001 From: Falko Galperin Date: Wed, 17 Dec 2025 20:43:19 +0100 Subject: [PATCH 6/9] refactor: address reviewer comments for bulk remote checks --- src/dvc_data/index/index.py | 133 +++++++++++------------------------- 1 file changed, 40 insertions(+), 93 deletions(-) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index e530df01..82f8332a 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -163,9 +163,8 @@ def bulk_exists( self, entries: list["DataIndexEntry"], refresh: bool = False, - max_workers: Optional[int] = None, + jobs: Optional[int] = None, callback: "Callback" = DEFAULT_CALLBACK, - cached_info: Optional[dict[str, Any]] = None, ) -> dict["DataIndexEntry", bool]: results = {} for entry in callback.wrap(entries): @@ -241,61 +240,42 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool: finally: self.index.commit() - def _bulk_exists_from_cache( - self, - entries_with_hash: list["DataIndexEntry"], - callback: "Callback", - ) -> dict["DataIndexEntry", bool]: - results = {} - for entry in callback.wrap(entries_with_hash): - assert entry.hash_info - value = cast("str", entry.hash_info.value) - if self.index is None: - exists = self.odb.exists(value) - else: - key = self.odb._oid_parts(value) - exists = key in self.index - results[entry] = exists - - return results - def bulk_exists( self, entries: list["DataIndexEntry"], refresh: bool = False, - max_workers: Optional[int] = None, + jobs: Optional[int] = None, callback: "Callback" = DEFAULT_CALLBACK, - cached_info: Optional[dict[str, Any]] = None, ) -> dict["DataIndexEntry", bool]: if not entries: return {} entries_with_hash = [e for e in entries if e.hash_info] entries_without_hash = [e for e in entries if not e.hash_info] + results = dict.fromkeys(callback.wrap(entries_without_hash), False) if self.index is None or not refresh: - return { - **dict.fromkeys(callback.wrap(entries_without_hash), False), - **self._bulk_exists_from_cache(entries_with_hash, callback), - } + for entry in callback.wrap(entries_with_hash): + assert entry.hash_info + value = cast("str", entry.hash_info.value) + if self.index is None: + exists = self.odb.exists(value) + else: + key = self.odb._oid_parts(value) + exists = key in self.index + results[entry] = exists + return results entry_map: dict[str, DataIndexEntry] = { self.get(entry)[1]: entry for entry in entries_with_hash } - if cached_info is not None: - # Instead of doing the network call, we use the pre-computed info. - info_results = [ - cached_info.get(path) for path in callback.wrap(entry_map.keys()) - ] - else: - info_results = self.fs.info( - list(entry_map.keys()), - batch_size=max_workers, - return_exceptions=True, - callback=callback, - ) + info_results = self.fs.info( + list(entry_map.keys()), + batch_size=jobs, + return_exceptions=True, + callback=callback, + ) - results = {} for (path, entry), info in zip(entry_map.items(), info_results): assert entry.hash_info # built from entries_with_hash value = cast("str", entry.hash_info.value) @@ -540,70 +520,37 @@ def remote_exists(self, entry: "DataIndexEntry", **kwargs) -> bool: def _bulk_storage_exists( self, entries: list[DataIndexEntry], - storage_selector: Callable[["StorageInfo"], Optional["Storage"]], - callback: Callback = DEFAULT_CALLBACK, + storage_attr: str = "cache", # TODO: proper name **kwargs, ) -> dict[DataIndexEntry, bool]: - by_storage: dict[Optional[Storage], list[DataIndexEntry]] = defaultdict(list) + by_storage: dict[Storage, list[DataIndexEntry]] = defaultdict(list) + by_odb: dict[Optional[HashFileDB], dict[Storage, list[DataIndexEntry]]] = ( + defaultdict(lambda: defaultdict(list)) + ) for entry in entries: storage_info = self[entry.key] - storage = storage_selector(storage_info) if storage_info else None - by_storage[storage].append(entry) - - results = {} - - # Unify batches per actual underlying ODB path. - # Maps from (storage_type, odb_path) to [(StorageInstance, entries)] - odb_batches: dict[ - tuple[type, Optional[str]], list[tuple[ObjectStorage, list[DataIndexEntry]]] - ] = defaultdict(list) - - for storage, storage_entries in by_storage.items(): - if storage is None: - for entry in storage_entries: - raise StorageKeyError(entry.key) - continue + storage = getattr(storage_info, storage_attr) if storage_info else None + if isinstance(storage, ObjectStorage): + by_odb[storage.odb][storage].append(entry) + elif storage is not None: + by_storage[storage].append(entry) - if not isinstance(storage, ObjectStorage): - # We won't optimize this and run it normally. - storage_results = storage.bulk_exists( - storage_entries, callback=callback, **kwargs - ) - results.update(storage_results) - continue + for storages in by_odb.values(): + assert storages # cannot be empty, we always add at least one entry + rep = next(iter(storages)) + by_storage[rep] = [e for entries in storages.values() for e in entries] - key = (type(storage), storage.path) - odb_batches[key].append((storage, storage_entries)) + results = {} # Actually process batches - for storage_groups in odb_batches.values(): - all_paths = [ - storage.get(entry)[1] - for storage, entries in storage_groups - for entry in entries - ] - - # Any storage is representative for this batch - batch_info = storage_groups[0][0].fs.info( - all_paths, - return_exceptions=True, - callback=callback, - ) - - # Maps from path to info - cached_info: dict[str, Any] = { - p: info if not isinstance(info, Exception) else None - for p, info in zip(all_paths, batch_info) - } - + for storage, storage_entries in by_storage.items(): # Finally, distribute results back to original storages - for storage, storage_entries in storage_groups: - storage_results = storage.bulk_exists( + results.update( + storage.bulk_exists( storage_entries, - cached_info=cached_info, **kwargs, ) - results.update(storage_results) + ) return results @@ -615,7 +562,7 @@ def bulk_cache_exists( ) -> dict[DataIndexEntry, bool]: return self._bulk_storage_exists( entries, - lambda info: info.cache, + "cache", callback=callback, **kwargs, ) @@ -628,7 +575,7 @@ def bulk_remote_exists( ) -> dict[DataIndexEntry, bool]: return self._bulk_storage_exists( entries, - lambda info: info.remote, + "remote", callback=callback, **kwargs, ) From c01374905f0eb23c223ea2a17cda006b32bacfcc Mon Sep 17 00:00:00 2001 From: Falko Galperin Date: Wed, 17 Dec 2025 20:56:57 +0100 Subject: [PATCH 7/9] test: fix tests for bulk remote checks --- tests/index/test_storage.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/tests/index/test_storage.py b/tests/index/test_storage.py index 3ced7dc6..737b98ed 100644 --- a/tests/index/test_storage.py +++ b/tests/index/test_storage.py @@ -1,4 +1,3 @@ -import pytest from dvc_objects.fs.local import LocalFileSystem from dvc_data.hashfile.hash_info import HashInfo @@ -9,7 +8,6 @@ FileStorage, ObjectStorage, StorageInfo, - StorageKeyError, StorageMapping, ) @@ -220,8 +218,9 @@ def test_bulk_cache_exists_missing_storage(self, odb): hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), ) - with pytest.raises(StorageKeyError): - smap.bulk_cache_exists([entry]) + result = smap.bulk_cache_exists([entry]) + # no cache storage, should be skipped + assert entry not in result def test_bulk_remote_exists_missing_storage(self, odb): smap = StorageMapping() @@ -232,8 +231,9 @@ def test_bulk_remote_exists_missing_storage(self, odb): hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), ) - with pytest.raises(StorageKeyError): - smap.bulk_remote_exists([entry]) + result = smap.bulk_remote_exists([entry]) + # no remote storage, should be skipped + assert entry not in result def test_bulk_exists_multiple_storages(self, make_odb): cache1 = make_odb() @@ -258,6 +258,29 @@ def test_bulk_exists_multiple_storages(self, make_odb): assert result[entry1] is True assert result[entry2] is True + def test_bulk_exists_shared_odb(self, make_odb): + odb = make_odb() + odb.add_bytes("hash1", b"data1") + odb.add_bytes("hash2", b"data2") + + smap = StorageMapping() + # two logical storages, one physical ODB + smap.add_cache(ObjectStorage(key=(), odb=odb)) + smap.add_cache(ObjectStorage(key=("subdir",), odb=odb)) + + entry1 = DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "hash1"), + ) + entry2 = DataIndexEntry( + key=("subdir", "bar"), + hash_info=HashInfo("md5", "hash2"), + ) + + result = smap.bulk_cache_exists([entry1, entry2]) + assert result[entry1] is True + assert result[entry2] is True + def test_bulk_cache_exists_with_file_storage(self, tmp_path): (tmp_path / "foo.txt").write_text("hello") fs = LocalFileSystem() From 4bc35465f3aaa28ad944de77a1a77b8d65b4ca19 Mon Sep 17 00:00:00 2001 From: skshetry <18718008+skshetry@users.noreply.github.com> Date: Thu, 18 Dec 2025 09:55:05 +0545 Subject: [PATCH 8/9] Apply suggestion from @skshetry --- src/dvc_data/index/index.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index 82f8332a..c034d229 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -252,7 +252,8 @@ def bulk_exists( entries_with_hash = [e for e in entries if e.hash_info] entries_without_hash = [e for e in entries if not e.hash_info] - results = dict.fromkeys(callback.wrap(entries_without_hash), False) + results = dict.fromkeys(entries_without_hash, False) + callback.relative_update(len(entries_without_hash)) if self.index is None or not refresh: for entry in callback.wrap(entries_with_hash): From 76a4a5dfdd412657fe815ef64cba5b789d11974a Mon Sep 17 00:00:00 2001 From: Falko Galperin Date: Thu, 18 Dec 2025 16:36:20 +0100 Subject: [PATCH 9/9] refactor: further review comments for bulk remote checks --- src/dvc_data/index/index.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index c034d229..b9fa3918 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -521,7 +521,7 @@ def remote_exists(self, entry: "DataIndexEntry", **kwargs) -> bool: def _bulk_storage_exists( self, entries: list[DataIndexEntry], - storage_attr: str = "cache", # TODO: proper name + storage: str, **kwargs, ) -> dict[DataIndexEntry, bool]: by_storage: dict[Storage, list[DataIndexEntry]] = defaultdict(list) @@ -530,24 +530,24 @@ def _bulk_storage_exists( ) for entry in entries: storage_info = self[entry.key] - storage = getattr(storage_info, storage_attr) if storage_info else None - if isinstance(storage, ObjectStorage): - by_odb[storage.odb][storage].append(entry) - elif storage is not None: - by_storage[storage].append(entry) + storage_obj = getattr(storage_info, storage) if storage_info else None + if isinstance(storage_obj, ObjectStorage): + by_odb[storage_obj.odb][storage_obj].append(entry) + elif storage_obj is not None: + by_storage[storage_obj].append(entry) for storages in by_odb.values(): assert storages # cannot be empty, we always add at least one entry - rep = next(iter(storages)) - by_storage[rep] = [e for entries in storages.values() for e in entries] + representative = next(iter(storages)) + by_storage[representative] = [ + e for entries in storages.values() for e in entries + ] results = {} - # Actually process batches - for storage, storage_entries in by_storage.items(): - # Finally, distribute results back to original storages + for storage_obj, storage_entries in by_storage.items(): results.update( - storage.bulk_exists( + storage_obj.bulk_exists( storage_entries, **kwargs, )