Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec wrapper #32

Closed
wants to merge 4 commits into from
Closed

fsspec wrapper #32

wants to merge 4 commits into from

Conversation

kylebarron
Copy link
Member

Wrapper around the existing object store API we present to Python to be used

E.g. something like this should work, but doesn't quite yet:

import object_store_rs as obs
import pyarrow.parquet as pq
from object_store_rs.fsspec import AsyncFsspecStore

store = obs.store.HTTPStore.from_url("https://github.com")
fs = AsyncFsspecStore(store)
url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet"
test = pq.read_metadata(url, filesystem=fs)
File ~/github/developmentseed/object-store-rs/.venv/lib/python3.11/site-packages/fsspec/spec.py:1918, in AbstractBufferedFile._fetch_range(self, start, end)
   1916 def _fetch_range(self, start, end):
   1917     """Get the specified set of bytes from remote"""
-> 1918     raise NotImplementedError

cc @martindurant

@martindurant
Copy link
Contributor

Here, you are using the file-like API. I can make a thin wrapper which calls cat_file with a range for any input filesystem, and then this should work.

@kylebarron
Copy link
Member Author

I figured that I only needed as a minimum to implement the methods on the AsyncFileSystem base class that raised NotImplementedError, and that everything else would be supported out of the box?

@martindurant
Copy link
Contributor

Yeah, a fallback implementation in fsspec would totally make sense. This does the business

--- a/fsspec/spec.py
+++ b/fsspec/spec.py
@@ -1915,7 +1915,7 @@ class AbstractBufferedFile(io.IOBase):

     def _fetch_range(self, start, end):
         """Get the specified set of bytes from remote"""
-        raise NotImplementedError
+        return self.fs.cat_file(self.path, start=start, end=end)

     def read(self, length=-1):

together with

--- a/object-store-rs/python/object_store_rs/fsspec.py
+++ b/object-store-rs/python/object_store_rs/fsspec.py
@@ -55,7 +55,7 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
             resp = await obs.get_async(self.store, path)
             return await resp.bytes_async()

-        if start and end:
+        if start is not None and end is not None:
             return await obs.get_range_async(
                 self.store, path, offset=start, length=end - start
             )

(because start can be and often is 0, and actually None means 0 anyway).

Leads to:

In [1]: import object_store_rs as obs
   ...: import pyarrow.parquet as pq
   ...: from object_store_rs.fsspec import AsyncFsspecStore
   ...:
   ...: store = obs.store.HTTPStore.from_url("https://github.com")
   ...: fs = AsyncFsspecStore(store)
   ...: url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet"
   ...: pq.read_metadata(url, filesystem=fs)

Out[1]:
<pyarrow._parquet.FileMetaData object at 0x10a8539c0>
  created_by: parquet-cpp-arrow version 16.1.0
  num_columns: 10
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 6685

@kylebarron
Copy link
Member Author

Until fsspec/filesystem_spec#1732 is merged and released, is it expected for AsyncFsspecStore to subclass both from fsspec.asyn.AsyncFileSystem and AbstractBufferedFile?

@martindurant
Copy link
Contributor

is it expected for AsyncFsspecStore to subclass both from fsspec.asyn.AsyncFileSystem and AbstractBufferedFile?

That sounds ... painful. The linked PR has no negative side effects, so I only need to write some kind of test for it.

Comment on lines +129 to +131
async def _ls(self, path, detail=True, **kwargs):
if detail:
raise NotImplementedError("Not sure how to format these dicts")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant can you point to an example of what's expected from these returned metadata objects?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are dicts, with at least the fields "type" (file or directory or maybe something else), "size" (number of bytes as a number) and "name" (as understood by the filesystem object).

Other keys are optional, but there have been conversations around making these extra keys better defined (e.g., "timestamp").

@kylebarron
Copy link
Member Author

@martindurant Do you have any interest in pushing this PR over the line, since you know fsspec so well?

@martindurant
Copy link
Contributor

Sure! What needs doing besides the fsspec PR?

@kylebarron
Copy link
Member Author

I'm not fully sure how to test it. I'm also not sure if there are any provided methods in the fsspec base class that we should be implementing, that could be more efficiently provided from the Rust side (without adding a bunch of new Rust code; I'd like this fsspec wrapper to ideally be python-only)

@kumare3
Copy link

kumare3 commented Oct 30, 2024

we would love to contribute to this as well and make it work with flyteorg/flyte - wdyt?

@kylebarron
Copy link
Member Author

I'm happy to accept contributions

@pingsutw
Copy link

@kylebarron, When will this be merged? I'd like to help work on it as well.

@kylebarron
Copy link
Member Author

kylebarron commented Oct 30, 2024

I'd be happy to accept a PR with tests. There was some progress from @martindurant in #60 which was unblocked by adding test setup in #62

@kylebarron
Copy link
Member Author

Superseded by #63

@kylebarron kylebarron closed this Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants