Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions opendalfs/decorator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import functools


def generate_blocking_methods(cls):
"""
Class decorator that automatically creates blocking versions of async methods.
Expand All @@ -12,9 +13,9 @@ def generate_blocking_methods(cls):

# Find all async methods that start with underscore
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
if name.startswith('_') and inspect.iscoroutinefunction(method):
if name.startswith("_") and inspect.iscoroutinefunction(method):
# Skip any private methods with double underscore
if name.startswith('__'):
if name.startswith("__"):
continue

# Get the method name without underscore
Expand Down
25 changes: 17 additions & 8 deletions opendalfs/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,31 @@ def __init__(
**kwargs,
)

def _fetch_range(self, start: int, end: int):
"""Download data between start and end"""
pass
def _fetch_range(self, start: int, end: int) -> bytes:
"""Download data between start and end."""
logger.debug(f"Fetching bytes from {start} to {end} for {self.path}")
data = self.fs.fs.read(self.path) # sync operator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now have native support for read(self.path, offset, size) after apache/opendal#6086 was merged. Would you like to create an issue to track this, so we can update our usage and avoid reading the entire file?

Copy link
Author

@umangbudhwar umangbudhwar Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#28 Created the issue

return data[start:end]

def _upload_chunk(self, final: bool = False):
"""Upload partial chunk of data"""
"""No-op: we buffer until close and upload once."""
pass

def _initiate_upload(self) -> None:
"""Prepare for uploading"""
pass
logger.debug(f"Initiated upload for {self.path}")

def _commit_upload(self) -> None:
"""Ensure upload is complete"""
pass
"""Write the full buffer to the backend once"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenDAL has it's own buffer, so we just need to implement write for OpendalBufferedFile.

logger.debug(f"Committing full upload for {self.path}")
self.buffer.seek(0)
data = self.buffer.read()
self.fs.write(self.path, data)

def close(self):
"""Ensure data is written before closing"""
pass
if not self.closed:
if self.mode in ("wb", "ab"):
self._commit_upload()
super().close()
logger.debug(f"Closed file {self.path}")
9 changes: 6 additions & 3 deletions opendalfs/fs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Any
from fsspec.asyn import AsyncFileSystem, sync
import logging
from typing import Any

from fsspec.asyn import AsyncFileSystem
from opendal import Operator, AsyncOperator
from .file import OpendalBufferedFile

from .decorator import generate_blocking_methods
from .file import OpendalBufferedFile

logger = logging.getLogger("opendalfs")


@generate_blocking_methods
class OpendalFileSystem(AsyncFileSystem):
"""OpenDAL implementation of fsspec AsyncFileSystem.
Expand Down
66 changes: 66 additions & 0 deletions tests/core/test_buffered_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest
from opendalfs.file import OpendalBufferedFile
from opendalfs.fs import OpendalFileSystem


@pytest.fixture
def fs_dir():
return OpendalFileSystem("fs", root="/tmp/")


@pytest.fixture
def file_path():
return "testdir/testfile.txt"


def test_fetch_range_basic(fs_dir, file_path):
fs_dir.write(file_path, b"Hello, world!")
f = OpendalBufferedFile(fs_dir, file_path, mode="rb")
assert f._fetch_range(0, 5) == b"Hello"
assert f._fetch_range(7, 12) == b"world"


def test_fetch_range_out_of_bounds(fs_dir, file_path):
fs_dir.write(file_path, b"short")
f = OpendalBufferedFile(fs_dir, file_path, mode="rb")
assert f._fetch_range(0, 100) == b"short"
assert f._fetch_range(10, 20) == b""


def test_initiate_upload(fs_dir, file_path):
f = OpendalBufferedFile(fs_dir, file_path, mode="wb")
f._initiate_upload()
assert f.buffer is not None


def test_commit_upload(fs_dir, file_path):
f = OpendalBufferedFile(fs_dir, file_path, mode="wb")
f.buffer.write(b"OpenDAL test")
f._commit_upload()
assert fs_dir.read(file_path) == b"OpenDAL test"


def test_commit_upload_empty_buffer(fs_dir, file_path):
f = OpendalBufferedFile(fs_dir, file_path, mode="wb")
f._commit_upload() # should not raise even if buffer is empty
assert fs_dir.exists(file_path) is True
assert fs_dir.read(file_path) == b""


def test_upload_chunk_noop(fs_dir, file_path):
f = OpendalBufferedFile(fs_dir, file_path, mode="wb")
f._upload_chunk() # should not error
f._upload_chunk(final=True) # should not error


def test_close_writes_buffer(fs_dir, file_path):
with OpendalBufferedFile(fs_dir, file_path, mode="wb") as f:
f.buffer.write(b"closing buffer test")
assert fs_dir.read(file_path) == b"closing buffer test"


def test_close_does_not_write_on_read_mode(fs_dir, file_path):
fs_dir.write(file_path, b"read test")
f = OpendalBufferedFile(fs_dir, file_path, mode="rb")
f.close()
assert fs_dir.read(file_path) == b"read test"