Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test:
.PHONY: test

integtest:
pytest --run-integration $(PYTEST_ARGS) python/tests/test_s3_ddb.py
pytest --run-integration $(PYTEST_ARGS) python/tests/test_s3_ddb.py python/tests/test_namespace_integration.py
.PHONY: integtest

doctest:
Expand Down
14 changes: 14 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5108,6 +5108,7 @@ def write_dataset(
namespace: Optional[LanceNamespace] = None,
table_id: Optional[List[str]] = None,
ignore_namespace_table_storage_options: bool = False,
s3_credentials_refresh_offset_seconds: Optional[int] = None,
) -> LanceDataset:
"""Write a given data_obj to the given uri

Expand Down Expand Up @@ -5215,6 +5216,13 @@ def write_dataset(
not be created, so credentials will not be automatically refreshed.
This is useful when you want to use your own credentials instead of the
namespace-provided credentials.
s3_credentials_refresh_offset_seconds : optional, int
The number of seconds before credential expiration to trigger a refresh.
Default is 60 seconds. Only applicable when using AWS S3 with temporary
credentials. For example, if set to 60, credentials will be refreshed
when they have less than 60 seconds remaining before expiration. This
should be set shorter than the credential lifetime to avoid using
expired credentials.

Notes
-----
Expand Down Expand Up @@ -5343,6 +5351,12 @@ def write_dataset(
if storage_options_provider is not None:
params["storage_options_provider"] = storage_options_provider

# Add s3_credentials_refresh_offset_seconds if specified
if s3_credentials_refresh_offset_seconds is not None:
params["s3_credentials_refresh_offset_seconds"] = (
s3_credentials_refresh_offset_seconds
)

if commit_lock:
if not callable(commit_lock):
raise TypeError(f"commit_lock must be a function, got {type(commit_lock)}")
Expand Down
74 changes: 70 additions & 4 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pyarrow as pa

from .io import StorageOptionsProvider
from .lance import (
LanceBufferDescriptor,
LanceColumnMetadata,
Expand Down Expand Up @@ -66,6 +67,8 @@ def __init__(
storage_options: Optional[Dict[str, str]] = None,
columns: Optional[List[str]] = None,
*,
storage_options_provider: Optional[StorageOptionsProvider] = None,
s3_credentials_refresh_offset_seconds: Optional[int] = None,
_inner_reader: Optional[_LanceFileReader] = None,
):
"""
Expand All @@ -80,6 +83,12 @@ def __init__(
storage_options : optional, dict
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : optional
A provider that can provide storage options dynamically. This is useful
for credentials that need to be refreshed or vended on-demand.
s3_credentials_refresh_offset_seconds : optional, int
How early (in seconds) before expiration to refresh S3 credentials.
Default is 60 seconds. Only applies when using storage_options_provider.
columns: list of str, default None
List of column names to be fetched.
All columns are fetched if None or unspecified.
Expand All @@ -90,7 +99,11 @@ def __init__(
if isinstance(path, Path):
path = str(path)
self._reader = _LanceFileReader(
path, storage_options=storage_options, columns=columns
path,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds,
columns=columns,
)

def read_all(self, *, batch_size: int = 1024, batch_readahead=16) -> ReaderResults:
Expand Down Expand Up @@ -202,7 +215,11 @@ class LanceFileSession:
"""

def __init__(
self, base_path: str, storage_options: Optional[Dict[str, str]] = None
self,
base_path: str,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
s3_credentials_refresh_offset_seconds: Optional[int] = None,
):
"""
Creates a new file session
Expand All @@ -216,10 +233,21 @@ def __init__(
storage_options : optional, dict
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : optional
A provider that can provide storage options dynamically. This is useful
for credentials that need to be refreshed or vended on-demand.
s3_credentials_refresh_offset_seconds : optional, int
How early (in seconds) before expiration to refresh S3 credentials.
Default is 60 seconds. Only applies when using storage_options_provider.
"""
if isinstance(base_path, Path):
base_path = str(base_path)
self._session = _LanceFileSession(base_path, storage_options=storage_options)
self._session = _LanceFileSession(
base_path,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds,
)

def open_reader(
self, path: str, columns: Optional[List[str]] = None
Expand Down Expand Up @@ -281,6 +309,39 @@ def open_writer(
_inner_writer=inner,
)

def contains(self, path: str) -> bool:
"""
Check if a file exists at the given path (relative to this session's base path).

Parameters
----------
path : str
Path relative to `base_path` to check for existence.

Returns
-------
bool
True if the file exists, False otherwise.
"""
return self._session.contains(path)

def list(self, path: Optional[str] = None) -> List[str]:
"""
List all files at the given path (relative to this session's base path).

Parameters
----------
path : str, optional
Path relative to `base_path` to list files from. If None, lists files
from the base path.

Returns
-------
List[str]
List of file paths.
"""
return self._session.list(path)


class LanceFileWriter:
"""
Expand All @@ -299,7 +360,8 @@ def __init__(
data_cache_bytes: Optional[int] = None,
version: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider=None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
s3_credentials_refresh_offset_seconds: Optional[int] = None,
max_page_bytes: Optional[int] = None,
_inner_writer: Optional[_LanceFileWriter] = None,
**kwargs,
Expand Down Expand Up @@ -330,6 +392,9 @@ def __init__(
A storage options provider that can fetch and refresh storage options
dynamically. This is useful for credentials that expire and need to be
refreshed automatically.
s3_credentials_refresh_offset_seconds : optional, int
How early (in seconds) before expiration to refresh S3 credentials.
Default is 60 seconds. Only applies when using storage_options_provider.
max_page_bytes : optional, int
The maximum size of a page in bytes, if a single array would create a
page larger than this then it will be split into multiple pages. The
Expand All @@ -347,6 +412,7 @@ def __init__(
version=version,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds,
max_page_bytes=max_page_bytes,
**kwargs,
)
Expand Down
11 changes: 10 additions & 1 deletion python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class LanceFileWriter:
version: Optional[str],
storage_options: Optional[Dict[str, str]],
storage_options_provider: Optional[StorageOptionsProvider],
s3_credentials_refresh_offset_seconds: Optional[int],
keep_original_array: Optional[bool],
max_page_bytes: Optional[int],
): ...
Expand All @@ -111,7 +112,11 @@ class LanceFileWriter:

class LanceFileSession:
def __init__(
self, base_path: str, storage_options: Optional[Dict[str, str]] = None
self,
base_path: str,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
s3_credentials_refresh_offset_seconds: Optional[int] = None,
): ...
def open_reader(
self, path: str, columns: Optional[List[str]] = None
Expand All @@ -125,12 +130,16 @@ class LanceFileSession:
keep_original_array: Optional[bool] = None,
max_page_bytes: Optional[int] = None,
) -> LanceFileWriter: ...
def contains(self, path: str) -> bool: ...
def list(self, path: Optional[str] = None) -> List[str]: ...

class LanceFileReader:
def __init__(
self,
path: str,
storage_options: Optional[Dict[str, str]],
storage_options_provider: Optional[StorageOptionsProvider],
s3_credentials_refresh_offset_seconds: Optional[int],
columns: Optional[List[str]] = None,
): ...
def read_all(
Expand Down
109 changes: 109 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,3 +648,112 @@ def write_thread_data(thread_id, writer, num_records):
pc.equal(result_table.column("thread_id"), thread_id)
)
assert thread_rows.num_rows == records_per_thread


def test_session_list_all_files(tmp_path):
"""Test that LanceFileSession.list() returns all files with relative paths"""
session = LanceFileSession(str(tmp_path))
schema = pa.schema([pa.field("x", pa.int64())])

# Write files at different levels
with session.open_writer("file1.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [1]}))

with session.open_writer("file2.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [2]}))

with session.open_writer("subdir/file3.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [3]}))

with session.open_writer("subdir/file4.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [4]}))

with session.open_writer("other/file5.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [5]}))

# List all files
files = sorted(session.list())

# Verify relative paths (no absolute paths)
assert files == [
"file1.lance",
"file2.lance",
"other/file5.lance",
"subdir/file3.lance",
"subdir/file4.lance",
]

# Verify no absolute paths
for f in files:
assert not f.startswith("/")
assert str(tmp_path) not in f


def test_session_list_with_prefix(tmp_path):
"""Test that LanceFileSession.list() filters by prefix correctly"""
session = LanceFileSession(str(tmp_path))
schema = pa.schema([pa.field("x", pa.int64())])

# Write files in different directories
with session.open_writer("file1.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [1]}))

with session.open_writer("subdir/file2.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [2]}))

with session.open_writer("subdir/file3.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [3]}))

with session.open_writer("other/file4.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [4]}))

# List with prefix "subdir"
subdir_files = sorted(session.list("subdir"))
assert subdir_files == ["subdir/file2.lance", "subdir/file3.lance"]

# List with prefix "other"
other_files = sorted(session.list("other"))
assert other_files == ["other/file4.lance"]

# List with non-existent prefix
empty = session.list("nonexistent")
assert empty == []


def test_session_list_with_trailing_slash(tmp_path):
"""Test that LanceFileSession.list() handles trailing slashes correctly"""
session = LanceFileSession(str(tmp_path))
schema = pa.schema([pa.field("x", pa.int64())])

with session.open_writer("dir/file.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [1]}))

# Both with and without trailing slash should work
files_no_slash = session.list("dir")
files_with_slash = session.list("dir/")

assert files_no_slash == files_with_slash
assert files_no_slash == ["dir/file.lance"]


def test_session_contains(tmp_path):
"""Test that LanceFileSession.contains() works correctly"""
session = LanceFileSession(str(tmp_path))
schema = pa.schema([pa.field("x", pa.int64())])

# File doesn't exist yet
assert not session.contains("test.lance")

# Write a file
with session.open_writer("test.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [1]}))

# File exists now
assert session.contains("test.lance")

# Nested file
with session.open_writer("subdir/nested.lance", schema=schema) as writer:
writer.write_batch(pa.table({"x": [2]}))

assert session.contains("subdir/nested.lance")
assert not session.contains("subdir/nonexistent.lance")
Loading
Loading