Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/dvc_data/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import logging
import os
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterator, MutableMapping
from typing import TYPE_CHECKING, Any, Callable, Optional, cast

import attrs
from fsspec import Callback
from fsspec.callbacks import DEFAULT_CALLBACK
from sqltrie import JSONTrie, PyGTrie, ShortKeyError, SQLiteTrie

from dvc_data.compat import cached_property
Expand Down Expand Up @@ -156,6 +159,19 @@ def exists(self, entry: "DataIndexEntry") -> bool:
fs, path = self.get(entry)
return fs.exists(path)

def bulk_exists(
self,
entries: list["DataIndexEntry"],
refresh: bool = False,
jobs: Optional[int] = None,
callback: "Callback" = DEFAULT_CALLBACK,
) -> dict["DataIndexEntry", bool]:
results = {}
for entry in callback.wrap(entries):
results[entry] = self.exists(entry)

return results


class ObjectStorage(Storage):
def __init__(
Expand Down Expand Up @@ -224,6 +240,66 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool:
finally:
self.index.commit()

def bulk_exists(
self,
entries: list["DataIndexEntry"],
refresh: bool = False,
jobs: Optional[int] = None,
callback: "Callback" = DEFAULT_CALLBACK,
) -> dict["DataIndexEntry", bool]:
if not entries:
return {}

entries_with_hash = [e for e in entries if e.hash_info]
entries_without_hash = [e for e in entries if not e.hash_info]
results = dict.fromkeys(entries_without_hash, False)
callback.relative_update(len(entries_without_hash))

if self.index is None or not refresh:
for entry in callback.wrap(entries_with_hash):
assert entry.hash_info
value = cast("str", entry.hash_info.value)
if self.index is None:
exists = self.odb.exists(value)
else:
key = self.odb._oid_parts(value)
exists = key in self.index
results[entry] = exists
return results

entry_map: dict[str, DataIndexEntry] = {
self.get(entry)[1]: entry for entry in entries_with_hash
}
info_results = self.fs.info(
list(entry_map.keys()),
batch_size=jobs,
return_exceptions=True,
callback=callback,
)

for (path, entry), info in zip(entry_map.items(), info_results):
assert entry.hash_info # built from entries_with_hash
value = cast("str", entry.hash_info.value)
key = self.odb._oid_parts(value)

if isinstance(info, FileNotFoundError) or info is None:
self.index.pop(key, None)
results[entry] = False
continue
if isinstance(info, Exception):
raise info

from .build import build_entry

built_entry = build_entry(path, self.fs, info=info)
self.index[key] = built_entry
results[entry] = True

if self.index is not None:
self.index.commit()

return results


class FileStorage(Storage):
def __init__(
Expand Down Expand Up @@ -442,6 +518,69 @@ def remote_exists(self, entry: "DataIndexEntry", **kwargs) -> bool:

return storage.remote.exists(entry, **kwargs)

def _bulk_storage_exists(
self,
entries: list[DataIndexEntry],
storage: str,
**kwargs,
) -> dict[DataIndexEntry, bool]:
by_storage: dict[Storage, list[DataIndexEntry]] = defaultdict(list)
by_odb: dict[Optional[HashFileDB], dict[Storage, list[DataIndexEntry]]] = (
defaultdict(lambda: defaultdict(list))
)
for entry in entries:
storage_info = self[entry.key]
storage_obj = getattr(storage_info, storage) if storage_info else None
if isinstance(storage_obj, ObjectStorage):
by_odb[storage_obj.odb][storage_obj].append(entry)
elif storage_obj is not None:
by_storage[storage_obj].append(entry)

for storages in by_odb.values():
assert storages # cannot be empty, we always add at least one entry
representative = next(iter(storages))
by_storage[representative] = [
e for entries in storages.values() for e in entries
]

results = {}

for storage_obj, storage_entries in by_storage.items():
results.update(
storage_obj.bulk_exists(
storage_entries,
**kwargs,
)
)

return results

def bulk_cache_exists(
self,
entries: list[DataIndexEntry],
callback: Callback = DEFAULT_CALLBACK,
**kwargs,
) -> dict[DataIndexEntry, bool]:
return self._bulk_storage_exists(
entries,
"cache",
callback=callback,
**kwargs,
)

def bulk_remote_exists(
self,
entries: list[DataIndexEntry],
callback: Callback = DEFAULT_CALLBACK,
**kwargs,
) -> dict[DataIndexEntry, bool]:
return self._bulk_storage_exists(
entries,
"remote",
callback=callback,
**kwargs,
)


class BaseDataIndex(ABC, MutableMapping[DataIndexKey, DataIndexEntry]):
storage_map: StorageMapping
Expand Down
Loading
Loading