diff --git a/opendalfs/decorator.py b/opendalfs/decorator.py index 9fe8879..645248f 100644 --- a/opendalfs/decorator.py +++ b/opendalfs/decorator.py @@ -1,6 +1,7 @@ import inspect import functools + def generate_blocking_methods(cls): """ Class decorator that automatically creates blocking versions of async methods. @@ -12,9 +13,9 @@ def generate_blocking_methods(cls): # Find all async methods that start with underscore for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): - if name.startswith('_') and inspect.iscoroutinefunction(method): + if name.startswith("_") and inspect.iscoroutinefunction(method): # Skip any private methods with double underscore - if name.startswith('__'): + if name.startswith("__"): continue # Get the method name without underscore diff --git a/opendalfs/file.py b/opendalfs/file.py index 04a056a..5a7b4c3 100644 --- a/opendalfs/file.py +++ b/opendalfs/file.py @@ -31,22 +31,31 @@ def __init__( **kwargs, ) - def _fetch_range(self, start: int, end: int): - """Download data between start and end""" - pass + def _fetch_range(self, start: int, end: int) -> bytes: + """Download data between start and end.""" + logger.debug(f"Fetching bytes from {start} to {end} for {self.path}") + data = self.fs.fs.read(self.path) # sync operator + return data[start:end] def _upload_chunk(self, final: bool = False): - """Upload partial chunk of data""" + """No-op: we buffer until close and upload once.""" pass def _initiate_upload(self) -> None: """Prepare for uploading""" - pass + logger.debug(f"Initiated upload for {self.path}") def _commit_upload(self) -> None: - """Ensure upload is complete""" - pass + """Write the full buffer to the backend once""" + logger.debug(f"Committing full upload for {self.path}") + self.buffer.seek(0) + data = self.buffer.read() + self.fs.write(self.path, data) def close(self): """Ensure data is written before closing""" - pass + if not self.closed: + if self.mode in ("wb", "ab"): + self._commit_upload() + super().close() + logger.debug(f"Closed file {self.path}") diff --git a/opendalfs/fs.py b/opendalfs/fs.py index 93e6144..4beacf2 100644 --- a/opendalfs/fs.py +++ b/opendalfs/fs.py @@ -1,12 +1,15 @@ -from typing import Any -from fsspec.asyn import AsyncFileSystem, sync import logging +from typing import Any + +from fsspec.asyn import AsyncFileSystem from opendal import Operator, AsyncOperator -from .file import OpendalBufferedFile + from .decorator import generate_blocking_methods +from .file import OpendalBufferedFile logger = logging.getLogger("opendalfs") + @generate_blocking_methods class OpendalFileSystem(AsyncFileSystem): """OpenDAL implementation of fsspec AsyncFileSystem. diff --git a/tests/core/test_buffered_file.py b/tests/core/test_buffered_file.py new file mode 100644 index 0000000..ae30c5c --- /dev/null +++ b/tests/core/test_buffered_file.py @@ -0,0 +1,66 @@ +import pytest +from opendalfs.file import OpendalBufferedFile +from opendalfs.fs import OpendalFileSystem + + +@pytest.fixture +def fs_dir(): + return OpendalFileSystem("fs", root="/tmp/") + + +@pytest.fixture +def file_path(): + return "testdir/testfile.txt" + + +def test_fetch_range_basic(fs_dir, file_path): + fs_dir.write(file_path, b"Hello, world!") + f = OpendalBufferedFile(fs_dir, file_path, mode="rb") + assert f._fetch_range(0, 5) == b"Hello" + assert f._fetch_range(7, 12) == b"world" + + +def test_fetch_range_out_of_bounds(fs_dir, file_path): + fs_dir.write(file_path, b"short") + f = OpendalBufferedFile(fs_dir, file_path, mode="rb") + assert f._fetch_range(0, 100) == b"short" + assert f._fetch_range(10, 20) == b"" + + +def test_initiate_upload(fs_dir, file_path): + f = OpendalBufferedFile(fs_dir, file_path, mode="wb") + f._initiate_upload() + assert f.buffer is not None + + +def test_commit_upload(fs_dir, file_path): + f = OpendalBufferedFile(fs_dir, file_path, mode="wb") + f.buffer.write(b"OpenDAL test") + f._commit_upload() + assert fs_dir.read(file_path) == b"OpenDAL test" + + +def test_commit_upload_empty_buffer(fs_dir, file_path): + f = OpendalBufferedFile(fs_dir, file_path, mode="wb") + f._commit_upload() # should not raise even if buffer is empty + assert fs_dir.exists(file_path) is True + assert fs_dir.read(file_path) == b"" + + +def test_upload_chunk_noop(fs_dir, file_path): + f = OpendalBufferedFile(fs_dir, file_path, mode="wb") + f._upload_chunk() # should not error + f._upload_chunk(final=True) # should not error + + +def test_close_writes_buffer(fs_dir, file_path): + with OpendalBufferedFile(fs_dir, file_path, mode="wb") as f: + f.buffer.write(b"closing buffer test") + assert fs_dir.read(file_path) == b"closing buffer test" + + +def test_close_does_not_write_on_read_mode(fs_dir, file_path): + fs_dir.write(file_path, b"read test") + f = OpendalBufferedFile(fs_dir, file_path, mode="rb") + f.close() + assert fs_dir.read(file_path) == b"read test"