forked from langchain-ai/langchain
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
community[minor]: Add CloudBlobLoader that supports loading data from…
… cloud buckets (langchain-ai#21957) Thank you for contributing to LangChain! - [ ] **PR title**: "Add CloudBlobLoader" - community: Add CloudBlobLoader - [ ] **PR message**: Add cloud blob loader - **Description:** Langchain provides several approaches to read different file formats: Specific loaders (`CVSLoader`) or blob-compatible loaders (`FileSystemBlobLoader`). The only implementation proposed for BlobLoader is `FileSystemBlobLoader`. Many projects retrieve files from cloud storage. We propose a new implementation of `BlobLoader` to read files from the three cloud storage systems. The interface is strictly identical to `FileSystemBlobLoader`. The only difference is the constructor, which takes a cloud "url" object such as `s3://my-bucket`, `az://my-bucket`, or `gs://my-bucket`. By streamlining the process, this novel implementation eliminates the requirement to pre-download files from cloud storage to local temporary files (which are seldom removed). The code relies on the [CloudPathLib](https://cloudpathlib.drivendata.org/stable/) library to interpret cloud URLs. This has been added as an optional dependency. ```Python loader = CloudBlobLoader("s3://mybucket/id") for blob in loader.yield_blobs(): print(blob) ``` - [X] **Dependencies:** CloudPathLib - [X] **Twitter handle:** pprados - [X] **Add tests and docs**: Add unit test, but it's easy to convert to integration test, with some files in a cloud storage (see `test_cloud_blob_loader.py`) - [X] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. Hello from Paris @hwchase17. Can you review this PR? --------- Co-authored-by: Eugene Yurtsev <[email protected]>
- Loading branch information
Showing
7 changed files
with
550 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
289 changes: 289 additions & 0 deletions
289
libs/community/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,289 @@ | ||
"""Use to load blobs from the local file system.""" | ||
import contextlib | ||
import mimetypes | ||
import tempfile | ||
from io import BufferedReader, BytesIO | ||
from pathlib import Path | ||
from typing import ( | ||
TYPE_CHECKING, | ||
Callable, | ||
Generator, | ||
Iterable, | ||
Iterator, | ||
Optional, | ||
Sequence, | ||
TypeVar, | ||
Union, | ||
) | ||
from urllib.parse import urlparse | ||
|
||
if TYPE_CHECKING: | ||
from cloudpathlib import AnyPath | ||
|
||
from langchain_community.document_loaders.blob_loaders.schema import ( | ||
Blob, | ||
BlobLoader, | ||
) | ||
|
||
T = TypeVar("T") | ||
|
||
|
||
class _CloudBlob(Blob): | ||
def as_string(self) -> str: | ||
"""Read data as a string.""" | ||
from cloudpathlib import AnyPath | ||
|
||
if self.data is None and self.path: | ||
return AnyPath(self.path).read_text(encoding=self.encoding) # type: ignore | ||
elif isinstance(self.data, bytes): | ||
return self.data.decode(self.encoding) | ||
elif isinstance(self.data, str): | ||
return self.data | ||
else: | ||
raise ValueError(f"Unable to get string for blob {self}") | ||
|
||
def as_bytes(self) -> bytes: | ||
"""Read data as bytes.""" | ||
from cloudpathlib import AnyPath | ||
|
||
if isinstance(self.data, bytes): | ||
return self.data | ||
elif isinstance(self.data, str): | ||
return self.data.encode(self.encoding) | ||
elif self.data is None and self.path: | ||
return AnyPath(self.path).read_bytes() # type: ignore | ||
else: | ||
raise ValueError(f"Unable to get bytes for blob {self}") | ||
|
||
@contextlib.contextmanager | ||
def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]: | ||
"""Read data as a byte stream.""" | ||
from cloudpathlib import AnyPath | ||
|
||
if isinstance(self.data, bytes): | ||
yield BytesIO(self.data) | ||
elif self.data is None and self.path: | ||
return AnyPath(self.path).read_bytes() # type: ignore | ||
else: | ||
raise NotImplementedError(f"Unable to convert blob {self}") | ||
|
||
|
||
def _url_to_filename(url: str) -> str: | ||
""" | ||
Convert file:, s3:, az: or gs: url to localfile. | ||
If the file is not here, download it in a temporary file. | ||
""" | ||
from cloudpathlib import AnyPath | ||
|
||
url_parsed = urlparse(url) | ||
suffix = Path(url_parsed.path).suffix | ||
if url_parsed.scheme in ["s3", "az", "gs"]: | ||
with AnyPath(url).open("rb") as f: # type: ignore | ||
temp_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) | ||
while True: | ||
buf = f.read() | ||
if not buf: | ||
break | ||
temp_file.write(buf) | ||
temp_file.close() | ||
file_path = temp_file.name | ||
elif url_parsed.scheme in ["file", ""]: | ||
file_path = url_parsed.path | ||
else: | ||
raise ValueError(f"Scheme {url_parsed.scheme} not supported") | ||
return file_path | ||
|
||
|
||
def _make_iterator( | ||
length_func: Callable[[], int], show_progress: bool = False | ||
) -> Callable[[Iterable[T]], Iterator[T]]: | ||
"""Create a function that optionally wraps an iterable in tqdm.""" | ||
if show_progress: | ||
try: | ||
from tqdm.auto import tqdm | ||
except ImportError: | ||
raise ImportError( | ||
"You must install tqdm to use show_progress=True." | ||
"You can install tqdm with `pip install tqdm`." | ||
) | ||
|
||
# Make sure to provide `total` here so that tqdm can show | ||
# a progress bar that takes into account the total number of files. | ||
def _with_tqdm(iterable: Iterable[T]) -> Iterator[T]: | ||
"""Wrap an iterable in a tqdm progress bar.""" | ||
return tqdm(iterable, total=length_func()) | ||
|
||
iterator = _with_tqdm | ||
else: | ||
iterator = iter # type: ignore | ||
|
||
return iterator | ||
|
||
|
||
# PUBLIC API | ||
|
||
|
||
class CloudBlobLoader(BlobLoader): | ||
"""Load blobs from cloud URL or file:. | ||
Example: | ||
.. code-block:: python | ||
loader = CloudBlobLoader("s3://mybucket/id") | ||
for blob in loader.yield_blobs(): | ||
print(blob) | ||
""" # noqa: E501 | ||
|
||
def __init__( | ||
self, | ||
url: Union[str, "AnyPath"], | ||
*, | ||
glob: str = "**/[!.]*", | ||
exclude: Sequence[str] = (), | ||
suffixes: Optional[Sequence[str]] = None, | ||
show_progress: bool = False, | ||
) -> None: | ||
"""Initialize with a url and how to glob over it. | ||
Use [CloudPathLib](https://cloudpathlib.drivendata.org/). | ||
Args: | ||
url: Cloud URL to load from. | ||
If a path to a file is provided, glob/exclude/suffixes are ignored. | ||
glob: Glob pattern relative to the specified path | ||
by default set to pick up all non-hidden files | ||
exclude: patterns to exclude from results, use glob syntax | ||
suffixes: Provide to keep only files with these suffixes | ||
Useful when wanting to keep files with different suffixes | ||
Suffixes must include the dot, e.g. ".txt" | ||
show_progress: If true, will show a progress bar as the files are loaded. | ||
This forces an iteration through all matching files | ||
to count them prior to loading them. | ||
Examples: | ||
.. code-block:: python | ||
from langchain_community.document_loaders.blob_loaders import CloudBlobLoader | ||
# Load a single file. | ||
loader = CloudBlobLoader("s3://mybucket/id") # az:// | ||
# Recursively load all text files in a directory. | ||
loader = CloudBlobLoader("az://mybucket/id", glob="**/*.txt") | ||
# Recursively load all non-hidden files in a directory. | ||
loader = CloudBlobLoader("gs://mybucket/id", glob="**/[!.]*") | ||
# Load all files in a directory without recursion. | ||
loader = CloudBlobLoader("s3://mybucket/id", glob="*") | ||
# Recursively load all files in a directory, except for py or pyc files. | ||
loader = CloudBlobLoader( | ||
"s3://mybucket/id", | ||
glob="**/*.txt", | ||
exclude=["**/*.py", "**/*.pyc"] | ||
) | ||
""" # noqa: E501 | ||
from cloudpathlib import AnyPath | ||
|
||
url_parsed = urlparse(str(url)) | ||
|
||
if url_parsed.scheme == "file": | ||
url = url_parsed.path | ||
|
||
if isinstance(url, str): | ||
self.path = AnyPath(url) | ||
else: | ||
self.path = url | ||
|
||
self.glob = glob | ||
self.suffixes = set(suffixes or []) | ||
self.show_progress = show_progress | ||
self.exclude = exclude | ||
|
||
def yield_blobs( | ||
self, | ||
) -> Iterable[Blob]: | ||
"""Yield blobs that match the requested pattern.""" | ||
iterator = _make_iterator( | ||
length_func=self.count_matching_files, show_progress=self.show_progress | ||
) | ||
|
||
for path in iterator(self._yield_paths()): | ||
# yield Blob.from_path(path) | ||
yield self.from_path(path) | ||
|
||
def _yield_paths(self) -> Iterable["AnyPath"]: | ||
"""Yield paths that match the requested pattern.""" | ||
if self.path.is_file(): # type: ignore | ||
yield self.path | ||
return | ||
|
||
paths = self.path.glob(self.glob) | ||
for path in paths: | ||
if self.exclude: | ||
if any(path.match(glob) for glob in self.exclude): | ||
continue | ||
if path.is_file(): | ||
if self.suffixes and path.suffix not in self.suffixes: | ||
continue # FIXME | ||
yield path | ||
|
||
def count_matching_files(self) -> int: | ||
"""Count files that match the pattern without loading them.""" | ||
# Carry out a full iteration to count the files without | ||
# materializing anything expensive in memory. | ||
num = 0 | ||
for _ in self._yield_paths(): | ||
num += 1 | ||
return num | ||
|
||
@classmethod | ||
def from_path( | ||
cls, | ||
path: "AnyPath", | ||
*, | ||
encoding: str = "utf-8", | ||
mime_type: Optional[str] = None, | ||
guess_type: bool = True, | ||
metadata: Optional[dict] = None, | ||
) -> Blob: | ||
"""Load the blob from a path like object. | ||
Args: | ||
path: path like object to file to be read | ||
encoding: Encoding to use if decoding the bytes into a string | ||
mime_type: if provided, will be set as the mime-type of the data | ||
guess_type: If True, the mimetype will be guessed from the file extension, | ||
if a mime-type was not provided | ||
metadata: Metadata to associate with the blob | ||
Returns: | ||
Blob instance | ||
""" | ||
if mime_type is None and guess_type: | ||
_mimetype = mimetypes.guess_type(path)[0] if guess_type else None # type: ignore | ||
else: | ||
_mimetype = mime_type | ||
|
||
url_parsed = urlparse(str(path)) | ||
if url_parsed.scheme in ["file", ""]: | ||
if url_parsed.scheme == "file": | ||
local_path = url_parsed.path | ||
else: | ||
local_path = str(path) | ||
return Blob( | ||
data=None, | ||
mimetype=_mimetype, | ||
encoding=encoding, | ||
path=local_path, | ||
metadata=metadata if metadata is not None else {}, | ||
) | ||
|
||
return _CloudBlob( | ||
data=None, | ||
mimetype=_mimetype, | ||
encoding=encoding, | ||
path=str(path), | ||
metadata=metadata if metadata is not None else {}, | ||
) |
Oops, something went wrong.