diff --git a/python/Makefile b/python/Makefile index 78c0489c522..1f1a30f7842 100644 --- a/python/Makefile +++ b/python/Makefile @@ -9,7 +9,7 @@ test: .PHONY: test integtest: - pytest --run-integration $(PYTEST_ARGS) python/tests/test_s3_ddb.py + pytest --run-integration $(PYTEST_ARGS) python/tests/test_s3_ddb.py python/tests/test_namespace_integration.py .PHONY: integtest doctest: diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index d9a96a86c85..d23b3594e2a 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -5108,6 +5108,7 @@ def write_dataset( namespace: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, ignore_namespace_table_storage_options: bool = False, + s3_credentials_refresh_offset_seconds: Optional[int] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -5215,6 +5216,13 @@ def write_dataset( not be created, so credentials will not be automatically refreshed. This is useful when you want to use your own credentials instead of the namespace-provided credentials. + s3_credentials_refresh_offset_seconds : optional, int + The number of seconds before credential expiration to trigger a refresh. + Default is 60 seconds. Only applicable when using AWS S3 with temporary + credentials. For example, if set to 60, credentials will be refreshed + when they have less than 60 seconds remaining before expiration. This + should be set shorter than the credential lifetime to avoid using + expired credentials. Notes ----- @@ -5343,6 +5351,12 @@ def write_dataset( if storage_options_provider is not None: params["storage_options_provider"] = storage_options_provider + # Add s3_credentials_refresh_offset_seconds if specified + if s3_credentials_refresh_offset_seconds is not None: + params["s3_credentials_refresh_offset_seconds"] = ( + s3_credentials_refresh_offset_seconds + ) + if commit_lock: if not callable(commit_lock): raise TypeError(f"commit_lock must be a function, got {type(commit_lock)}") diff --git a/python/python/lance/file.py b/python/python/lance/file.py index 73d308380f9..cab73f2b633 100644 --- a/python/python/lance/file.py +++ b/python/python/lance/file.py @@ -6,6 +6,7 @@ import pyarrow as pa +from .io import StorageOptionsProvider from .lance import ( LanceBufferDescriptor, LanceColumnMetadata, @@ -66,6 +67,8 @@ def __init__( storage_options: Optional[Dict[str, str]] = None, columns: Optional[List[str]] = None, *, + storage_options_provider: Optional[StorageOptionsProvider] = None, + s3_credentials_refresh_offset_seconds: Optional[int] = None, _inner_reader: Optional[_LanceFileReader] = None, ): """ @@ -80,6 +83,12 @@ def __init__( storage_options : optional, dict Extra options to be used for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. + storage_options_provider : optional + A provider that can provide storage options dynamically. This is useful + for credentials that need to be refreshed or vended on-demand. + s3_credentials_refresh_offset_seconds : optional, int + How early (in seconds) before expiration to refresh S3 credentials. + Default is 60 seconds. Only applies when using storage_options_provider. columns: list of str, default None List of column names to be fetched. All columns are fetched if None or unspecified. @@ -90,7 +99,11 @@ def __init__( if isinstance(path, Path): path = str(path) self._reader = _LanceFileReader( - path, storage_options=storage_options, columns=columns + path, + storage_options=storage_options, + storage_options_provider=storage_options_provider, + s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds, + columns=columns, ) def read_all(self, *, batch_size: int = 1024, batch_readahead=16) -> ReaderResults: @@ -202,7 +215,11 @@ class LanceFileSession: """ def __init__( - self, base_path: str, storage_options: Optional[Dict[str, str]] = None + self, + base_path: str, + storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + s3_credentials_refresh_offset_seconds: Optional[int] = None, ): """ Creates a new file session @@ -216,10 +233,21 @@ def __init__( storage_options : optional, dict Extra options to be used for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. + storage_options_provider : optional + A provider that can provide storage options dynamically. This is useful + for credentials that need to be refreshed or vended on-demand. + s3_credentials_refresh_offset_seconds : optional, int + How early (in seconds) before expiration to refresh S3 credentials. + Default is 60 seconds. Only applies when using storage_options_provider. """ if isinstance(base_path, Path): base_path = str(base_path) - self._session = _LanceFileSession(base_path, storage_options=storage_options) + self._session = _LanceFileSession( + base_path, + storage_options=storage_options, + storage_options_provider=storage_options_provider, + s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds, + ) def open_reader( self, path: str, columns: Optional[List[str]] = None @@ -281,6 +309,39 @@ def open_writer( _inner_writer=inner, ) + def contains(self, path: str) -> bool: + """ + Check if a file exists at the given path (relative to this session's base path). + + Parameters + ---------- + path : str + Path relative to `base_path` to check for existence. + + Returns + ------- + bool + True if the file exists, False otherwise. + """ + return self._session.contains(path) + + def list(self, path: Optional[str] = None) -> List[str]: + """ + List all files at the given path (relative to this session's base path). + + Parameters + ---------- + path : str, optional + Path relative to `base_path` to list files from. If None, lists files + from the base path. + + Returns + ------- + List[str] + List of file paths. + """ + return self._session.list(path) + class LanceFileWriter: """ @@ -299,7 +360,8 @@ def __init__( data_cache_bytes: Optional[int] = None, version: Optional[str] = None, storage_options: Optional[Dict[str, str]] = None, - storage_options_provider=None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + s3_credentials_refresh_offset_seconds: Optional[int] = None, max_page_bytes: Optional[int] = None, _inner_writer: Optional[_LanceFileWriter] = None, **kwargs, @@ -330,6 +392,9 @@ def __init__( A storage options provider that can fetch and refresh storage options dynamically. This is useful for credentials that expire and need to be refreshed automatically. + s3_credentials_refresh_offset_seconds : optional, int + How early (in seconds) before expiration to refresh S3 credentials. + Default is 60 seconds. Only applies when using storage_options_provider. max_page_bytes : optional, int The maximum size of a page in bytes, if a single array would create a page larger than this then it will be split into multiple pages. The @@ -347,6 +412,7 @@ def __init__( version=version, storage_options=storage_options, storage_options_provider=storage_options_provider, + s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds, max_page_bytes=max_page_bytes, **kwargs, ) diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 2fd57e307e3..2aba75b68e7 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -101,6 +101,7 @@ class LanceFileWriter: version: Optional[str], storage_options: Optional[Dict[str, str]], storage_options_provider: Optional[StorageOptionsProvider], + s3_credentials_refresh_offset_seconds: Optional[int], keep_original_array: Optional[bool], max_page_bytes: Optional[int], ): ... @@ -111,7 +112,11 @@ class LanceFileWriter: class LanceFileSession: def __init__( - self, base_path: str, storage_options: Optional[Dict[str, str]] = None + self, + base_path: str, + storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + s3_credentials_refresh_offset_seconds: Optional[int] = None, ): ... def open_reader( self, path: str, columns: Optional[List[str]] = None @@ -125,12 +130,16 @@ class LanceFileSession: keep_original_array: Optional[bool] = None, max_page_bytes: Optional[int] = None, ) -> LanceFileWriter: ... + def contains(self, path: str) -> bool: ... + def list(self, path: Optional[str] = None) -> List[str]: ... class LanceFileReader: def __init__( self, path: str, storage_options: Optional[Dict[str, str]], + storage_options_provider: Optional[StorageOptionsProvider], + s3_credentials_refresh_offset_seconds: Optional[int], columns: Optional[List[str]] = None, ): ... def read_all( diff --git a/python/python/tests/test_file.py b/python/python/tests/test_file.py index e7058ef90f5..44af6a8e2d5 100644 --- a/python/python/tests/test_file.py +++ b/python/python/tests/test_file.py @@ -648,3 +648,112 @@ def write_thread_data(thread_id, writer, num_records): pc.equal(result_table.column("thread_id"), thread_id) ) assert thread_rows.num_rows == records_per_thread + + +def test_session_list_all_files(tmp_path): + """Test that LanceFileSession.list() returns all files with relative paths""" + session = LanceFileSession(str(tmp_path)) + schema = pa.schema([pa.field("x", pa.int64())]) + + # Write files at different levels + with session.open_writer("file1.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [1]})) + + with session.open_writer("file2.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [2]})) + + with session.open_writer("subdir/file3.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [3]})) + + with session.open_writer("subdir/file4.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [4]})) + + with session.open_writer("other/file5.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [5]})) + + # List all files + files = sorted(session.list()) + + # Verify relative paths (no absolute paths) + assert files == [ + "file1.lance", + "file2.lance", + "other/file5.lance", + "subdir/file3.lance", + "subdir/file4.lance", + ] + + # Verify no absolute paths + for f in files: + assert not f.startswith("/") + assert str(tmp_path) not in f + + +def test_session_list_with_prefix(tmp_path): + """Test that LanceFileSession.list() filters by prefix correctly""" + session = LanceFileSession(str(tmp_path)) + schema = pa.schema([pa.field("x", pa.int64())]) + + # Write files in different directories + with session.open_writer("file1.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [1]})) + + with session.open_writer("subdir/file2.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [2]})) + + with session.open_writer("subdir/file3.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [3]})) + + with session.open_writer("other/file4.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [4]})) + + # List with prefix "subdir" + subdir_files = sorted(session.list("subdir")) + assert subdir_files == ["subdir/file2.lance", "subdir/file3.lance"] + + # List with prefix "other" + other_files = sorted(session.list("other")) + assert other_files == ["other/file4.lance"] + + # List with non-existent prefix + empty = session.list("nonexistent") + assert empty == [] + + +def test_session_list_with_trailing_slash(tmp_path): + """Test that LanceFileSession.list() handles trailing slashes correctly""" + session = LanceFileSession(str(tmp_path)) + schema = pa.schema([pa.field("x", pa.int64())]) + + with session.open_writer("dir/file.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [1]})) + + # Both with and without trailing slash should work + files_no_slash = session.list("dir") + files_with_slash = session.list("dir/") + + assert files_no_slash == files_with_slash + assert files_no_slash == ["dir/file.lance"] + + +def test_session_contains(tmp_path): + """Test that LanceFileSession.contains() works correctly""" + session = LanceFileSession(str(tmp_path)) + schema = pa.schema([pa.field("x", pa.int64())]) + + # File doesn't exist yet + assert not session.contains("test.lance") + + # Write a file + with session.open_writer("test.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [1]})) + + # File exists now + assert session.contains("test.lance") + + # Nested file + with session.open_writer("subdir/nested.lance", schema=schema) as writer: + writer.write_batch(pa.table({"x": [2]})) + + assert session.contains("subdir/nested.lance") + assert not session.contains("subdir/nonexistent.lance") diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 190221d4708..a62fb7f1bf7 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -219,7 +219,11 @@ def test_namespace_with_refresh(s3_bucket: str): assert namespace.get_describe_call_count() == 0 ds = lance.write_dataset( - table1, namespace=namespace, table_id=table_id, mode="create" + table1, + namespace=namespace, + table_id=table_id, + mode="create", + s3_credentials_refresh_offset_seconds=1, ) assert ds.count_rows() == 2 assert namespace.get_create_call_count() == 1 @@ -562,6 +566,7 @@ def test_file_writer_with_storage_options_provider(s3_bucket: str): schema=schema, storage_options=namespace_storage_options, storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, ) batch = pa.RecordBatch.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]}, schema=schema) @@ -576,7 +581,12 @@ def test_file_writer_with_storage_options_provider(s3_bucket: str): describe_count_after_write = namespace.get_describe_call_count() assert describe_count_after_write == initial_describe_count - reader = LanceFileReader(file_uri, storage_options=namespace_storage_options) + reader = LanceFileReader( + file_uri, + storage_options=namespace_storage_options, + storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, + ) result = reader.read_all(batch_size=1024) result_table = result.to_table() assert result_table.num_rows == 6 @@ -595,6 +605,7 @@ def test_file_writer_with_storage_options_provider(s3_bucket: str): schema=schema, storage_options=namespace_storage_options, storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, ) batch3 = pa.RecordBatch.from_pydict( @@ -606,9 +617,223 @@ def test_file_writer_with_storage_options_provider(s3_bucket: str): final_describe_count = namespace.get_describe_call_count() assert final_describe_count == describe_count_after_write + 1 - reader2 = LanceFileReader(file_uri2, storage_options=namespace_storage_options) + reader2 = LanceFileReader( + file_uri2, + storage_options=namespace_storage_options, + storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, + ) + result2 = reader2.read_all(batch_size=1024) + result_table2 = result2.to_table() + assert result_table2.num_rows == 2 + expected_table2 = pa.table({"x": [100, 200], "y": [300, 400]}, schema=schema) + assert result_table2 == expected_table2 + + +@pytest.mark.integration +def test_file_reader_with_storage_options_provider(s3_bucket: str): + """Test LanceFileReader with storage_options_provider and credential refresh.""" + from lance import LanceNamespaceStorageOptionsProvider + from lance.file import LanceFileReader, LanceFileWriter + + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3, + ) + + table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}]) + table_name = uuid.uuid4().hex + table_id = ["test_ns", table_name] + + ds = lance.write_dataset( + table1, namespace=namespace, table_id=table_id, mode="create" + ) + assert ds.count_rows() == 2 + + describe_response = namespace.describe_table( + DescribeTableRequest(id=table_id, version=None) + ) + namespace_storage_options = describe_response.storage_options + + provider = LanceNamespaceStorageOptionsProvider( + namespace=namespace, table_id=table_id + ) + + file_uri = f"s3://{s3_bucket}/{table_name}_file_reader_test.lance" + schema = pa.schema([pa.field("x", pa.int64()), pa.field("y", pa.int64())]) + + # Write a file first (without provider to keep it simple) + writer = LanceFileWriter( + file_uri, + schema=schema, + storage_options=namespace_storage_options, + ) + batch = pa.RecordBatch.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]}, schema=schema) + writer.write_batch(batch) + writer.close() + + # Get fresh credentials for reading + describe_response = namespace.describe_table( + DescribeTableRequest(id=table_id, version=None) + ) + namespace_storage_options = describe_response.storage_options + + initial_describe_count = namespace.get_describe_call_count() + + # First read should work without needing refresh + reader = LanceFileReader( + file_uri, + storage_options=namespace_storage_options, + storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, + ) + result = reader.read_all(batch_size=1024) + result_table = result.to_table() + assert result_table.num_rows == 3 + assert result_table.schema == schema + + describe_count_after_first_read = namespace.get_describe_call_count() + assert describe_count_after_first_read == initial_describe_count + + # Wait for credentials to expire + time.sleep(5) + + # Write a second file + file_uri2 = f"s3://{s3_bucket}/{table_name}_file_reader_test2.lance" + writer2 = LanceFileWriter( + file_uri2, + schema=schema, + storage_options=namespace_storage_options, + ) + batch2 = pa.RecordBatch.from_pydict( + {"x": [100, 200], "y": [300, 400]}, schema=schema + ) + writer2.write_batch(batch2) + writer2.close() + + # Second read should trigger credential refresh + reader2 = LanceFileReader( + file_uri2, + storage_options=namespace_storage_options, + storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, + ) + result2 = reader2.read_all(batch_size=1024) + result_table2 = result2.to_table() + assert result_table2.num_rows == 2 + expected_table2 = pa.table({"x": [100, 200], "y": [300, 400]}, schema=schema) + assert result_table2 == expected_table2 + + final_describe_count = namespace.get_describe_call_count() + assert final_describe_count == describe_count_after_first_read + 1 + + +@pytest.mark.integration +def test_file_session_with_storage_options_provider(s3_bucket: str): + """Test LanceFileSession with storage_options_provider and credential refresh.""" + from lance import LanceNamespaceStorageOptionsProvider + from lance.file import LanceFileSession + + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3, + ) + + table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}]) + table_name = uuid.uuid4().hex + table_id = ["test_ns", table_name] + + ds = lance.write_dataset( + table1, namespace=namespace, table_id=table_id, mode="create" + ) + assert ds.count_rows() == 2 + + describe_response = namespace.describe_table( + DescribeTableRequest(id=table_id, version=None) + ) + namespace_storage_options = describe_response.storage_options + + provider = LanceNamespaceStorageOptionsProvider( + namespace=namespace, table_id=table_id + ) + + initial_describe_count = namespace.get_describe_call_count() + + # Create session with storage_options_provider + session = LanceFileSession( + f"s3://{s3_bucket}/{table_name}_session", + storage_options=namespace_storage_options, + storage_options_provider=provider, + s3_credentials_refresh_offset_seconds=1, + ) + + # Test contains method + assert not session.contains("session_test.lance") + + # Test list method + files = session.list() + assert isinstance(files, list) + + schema = pa.schema([pa.field("x", pa.int64()), pa.field("y", pa.int64())]) + + # Write using session - should not trigger credential refresh + writer = session.open_writer( + "session_test.lance", + schema=schema, + ) + batch = pa.RecordBatch.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]}, schema=schema) + writer.write_batch(batch) + writer.close() + + describe_count_after_first_write = namespace.get_describe_call_count() + assert describe_count_after_first_write == initial_describe_count + + # Test contains method after write + assert session.contains("session_test.lance") + + # Read using session - should not trigger credential refresh + reader = session.open_reader("session_test.lance") + result = reader.read_all(batch_size=1024) + result_table = result.to_table() + assert result_table.num_rows == 3 + assert result_table.schema == schema + + expected_table = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}, schema=schema) + assert result_table == expected_table + + describe_count_after_first_read = namespace.get_describe_call_count() + assert describe_count_after_first_read == describe_count_after_first_write + + # Wait for credentials to expire + time.sleep(5) + + # Write again, should trigger credential refresh + writer2 = session.open_writer( + "session_test2.lance", + schema=schema, + ) + batch2 = pa.RecordBatch.from_pydict( + {"x": [100, 200], "y": [300, 400]}, schema=schema + ) + writer2.write_batch(batch2) + writer2.close() + + describe_count_after_second_write = namespace.get_describe_call_count() + assert describe_count_after_second_write == describe_count_after_first_read + 1 + + # Read the second file - should not trigger another refresh since we just refreshed + reader2 = session.open_reader("session_test2.lance") result2 = reader2.read_all(batch_size=1024) result_table2 = result2.to_table() assert result_table2.num_rows == 2 expected_table2 = pa.table({"x": [100, 200], "y": [300, 400]}, schema=schema) assert result_table2 == expected_table2 + + final_describe_count = namespace.get_describe_call_count() + assert final_describe_count == describe_count_after_second_write diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 91108192c47..d47f2a36937 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2935,10 +2935,21 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult(options, "s3_credentials_refresh_offset_seconds")?; + + if storage_options.is_some() + || storage_options_provider.is_some() + || s3_credentials_refresh_offset_seconds.is_some() + { + let s3_credentials_refresh_offset = s3_credentials_refresh_offset_seconds + .map(std::time::Duration::from_secs) + .unwrap_or(std::time::Duration::from_secs(60)); + p.store_params = Some(ObjectStoreParams { storage_options, storage_options_provider, + s3_credentials_refresh_offset, ..Default::default() }); } diff --git a/python/src/file.rs b/python/src/file.rs index 2dc2a31489f..11971e5d5d7 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -20,6 +20,7 @@ use bytes::Bytes; use futures::stream::StreamExt; use lance::io::{ObjectStore, RecordBatchStream}; use lance_core::cache::LanceCache; +use lance_core::utils::path::LancePathExt; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; use lance_file::reader::{ BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions, FileStatistics, @@ -35,16 +36,14 @@ use lance_io::{ }; use object_store::path::Path; use pyo3::{ - exceptions::{PyIOError, PyRuntimeError, PyValueError}, - pyclass, pyfunction, pymethods, IntoPyObjectExt, PyObject, PyResult, Python, + exceptions::{PyIOError, PyRuntimeError}, + pyclass, pyfunction, pymethods, IntoPyObjectExt, PyErr, PyObject, PyResult, Python, }; -use regex::Regex; use serde::Serialize; use std::collections::HashMap; use std::{pin::Pin, sync::Arc}; +use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; -use url::Url; - #[pyclass(get_all)] #[derive(Clone, Debug, Serialize)] pub struct LanceBufferDescriptor { @@ -240,6 +239,7 @@ impl LanceFileWriter { version: Option, storage_options: Option>, storage_options_provider: Option>, + s3_credentials_refresh_offset_seconds: Option, keep_original_array: Option, max_page_bytes: Option, ) -> PyResult { @@ -247,6 +247,7 @@ impl LanceFileWriter { uri_or_path, storage_options, storage_options_provider, + s3_credentials_refresh_offset_seconds, ) .await?; Self::open_with_store( @@ -296,7 +297,7 @@ impl LanceFileWriter { #[pymethods] impl LanceFileWriter { #[new] - #[pyo3(signature=(path, schema=None, data_cache_bytes=None, version=None, storage_options=None, storage_options_provider=None, keep_original_array=None, max_page_bytes=None))] + #[pyo3(signature=(path, schema=None, data_cache_bytes=None, version=None, storage_options=None, storage_options_provider=None, s3_credentials_refresh_offset_seconds=None, keep_original_array=None, max_page_bytes=None))] #[allow(clippy::too_many_arguments)] pub fn new( path: String, @@ -305,6 +306,7 @@ impl LanceFileWriter { version: Option, storage_options: Option>, storage_options_provider: Option, + s3_credentials_refresh_offset_seconds: Option, keep_original_array: Option, max_page_bytes: Option, ) -> PyResult { @@ -322,6 +324,7 @@ impl LanceFileWriter { version, storage_options, provider, + s3_credentials_refresh_offset_seconds, keep_original_array, max_page_bytes, ), @@ -368,85 +371,44 @@ impl Drop for LanceFileWriter { } } -fn path_to_parent(path: &Path) -> PyResult<(Path, String)> { - let mut parts = path.parts().collect::>(); - if parts.is_empty() { - return Err(PyValueError::new_err(format!( - "Path {} is not a valid path to a file", - path, - ))); - } - let filename = parts.pop().unwrap().as_ref().to_owned(); - Ok((Path::from_iter(parts), filename)) -} - pub async fn object_store_from_uri_or_path_no_options( uri_or_path: impl AsRef, ) -> PyResult<(Arc, Path)> { object_store_from_uri_or_path(uri_or_path, None).await } -// The ObjectStore::from_uri_or_path expects a path to a directory (and it creates it if it does -// not exist). We are given a path to a file and so we need to strip the last component -// before creating the object store. We then return the object store and the new relative path -// to the file. pub async fn object_store_from_uri_or_path( uri_or_path: impl AsRef, storage_options: Option>, ) -> PyResult<(Arc, Path)> { - object_store_from_uri_or_path_with_provider(uri_or_path, storage_options, None).await + object_store_from_uri_or_path_with_provider(uri_or_path, storage_options, None, None).await } pub async fn object_store_from_uri_or_path_with_provider( uri_or_path: impl AsRef, storage_options: Option>, storage_options_provider: Option>, + s3_credentials_refresh_offset_seconds: Option, ) -> PyResult<(Arc, Path)> { - if let Ok(mut url) = Url::parse(uri_or_path.as_ref()) { - if url.scheme().len() > 1 { - let path = object_store::path::Path::parse(url.path()).map_err(|e| { - PyIOError::new_err(format!("Invalid URL path `{}`: {}", url.path(), e)) - })?; - let (parent_path, filename) = path_to_parent(&path)?; - url.set_path(parent_path.as_ref()); - - let object_store_registry = Arc::new(lance::io::ObjectStoreRegistry::default()); - let object_store_params = - if storage_options.is_some() || storage_options_provider.is_some() { - Some(ObjectStoreParams { - storage_options: storage_options.clone(), - storage_options_provider, - ..Default::default() - }) - } else { - None - }; - - let (object_store, dir_path) = ObjectStore::from_uri_and_params( - object_store_registry, - url.as_str(), - &object_store_params.unwrap_or_default(), - ) - .await - .infer_error()?; - let child_path = dir_path.child(filename); - return Ok((object_store, child_path)); - } - } - let regex = Regex::new(r".:\\").unwrap(); - let adjusted_path; - let uri_or_path: &str = if regex.is_match(uri_or_path.as_ref()) { - // Windows paths like C:\ currently do not get handled correctly by - // Path::parse (https://github.com/apache/arrow-rs-object-store/issues/499) - // and we need to change the first \ into a / - adjusted_path = uri_or_path.as_ref().to_string().replacen("\\", "/", 1); - adjusted_path.as_str() - } else { - uri_or_path.as_ref() + let object_store_registry = Arc::new(lance::io::ObjectStoreRegistry::default()); + let mut object_store_params = ObjectStoreParams { + storage_options: storage_options.clone(), + storage_options_provider, + ..Default::default() }; - let path = Path::parse(uri_or_path) - .map_err(|e| PyIOError::new_err(format!("Invalid path `{}`: {}", uri_or_path, e)))?; - let object_store = Arc::new(ObjectStore::local()); + if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds { + object_store_params.s3_credentials_refresh_offset = + std::time::Duration::from_secs(offset_seconds); + } + + let (object_store, path) = ObjectStore::from_uri_and_params( + object_store_registry, + uri_or_path.as_ref(), + &object_store_params, + ) + .await + .infer_error()?; + Ok((object_store, path)) } @@ -460,9 +422,16 @@ impl LanceFileSession { pub async fn try_new( uri_or_path: String, storage_options: Option>, + storage_options_provider: Option>, + s3_credentials_refresh_offset_seconds: Option, ) -> PyResult { - let (object_store, base_path) = - object_store_from_uri_or_path(uri_or_path, storage_options).await?; + let (object_store, base_path) = object_store_from_uri_or_path_with_provider( + uri_or_path, + storage_options, + storage_options_provider, + s3_credentials_refresh_offset_seconds, + ) + .await?; Ok(Self { object_store, base_path, @@ -473,12 +442,25 @@ impl LanceFileSession { #[pymethods] impl LanceFileSession { #[new] - #[pyo3(signature=(uri_or_path, storage_options=None))] + #[pyo3(signature=(uri_or_path, storage_options=None, storage_options_provider=None, s3_credentials_refresh_offset_seconds=None))] pub fn new( uri_or_path: String, storage_options: Option>, + storage_options_provider: Option, + s3_credentials_refresh_offset_seconds: Option, ) -> PyResult { - rt().block_on(None, Self::try_new(uri_or_path, storage_options))? + let provider = storage_options_provider + .map(crate::storage_options::py_object_to_storage_options_provider) + .transpose()?; + rt().block_on( + None, + Self::try_new( + uri_or_path, + storage_options, + provider, + s3_credentials_refresh_offset_seconds, + ), + )? } #[pyo3(signature=(path, columns=None))] @@ -487,7 +469,7 @@ impl LanceFileSession { path: String, columns: Option>, ) -> PyResult { - let path = self.base_path.child(path); + let path = self.base_path.child_path(&Path::from(path)); rt().block_on( None, LanceFileReader::open_with_store(self.object_store.clone(), path, columns), @@ -511,7 +493,7 @@ impl LanceFileSession { keep_original_array: Option, max_page_bytes: Option, ) -> PyResult { - let path = self.base_path.child(path); + let path = self.base_path.child_path(&Path::from(path)); rt().block_on( None, LanceFileWriter::open_with_store( @@ -525,6 +507,129 @@ impl LanceFileSession { ), )? } + + pub fn contains(&self, path: String) -> PyResult { + let full_path = self.base_path.child_path(&Path::from(path)); + rt().block_on(None, async { + self.object_store + .exists(&full_path) + .await + .map_err(|e| PyErr::new::(format!("{}", e))) + })? + } + + pub fn list(&self, path: Option) -> PyResult> { + use futures::stream::StreamExt; + + rt().block_on(None, async { + // Construct the full path to list from + let list_path = if let Some(prefix) = path { + self.base_path.child_path(&Path::from(prefix)) + } else { + self.base_path.clone() + }; + + // List all files under the specified path + let stream = self.object_store.list(Some(list_path)); + let results: Vec<_> = stream.collect().await; + + let mut paths: Vec = Vec::new(); + for meta_result in results { + let meta = meta_result.map_err(|e| { + PyErr::new::(format!("{}", e)) + })?; + + // Strip the base_path prefix to make it relative + // Use prefix_match which handles path separators correctly across platforms + let relative_parts = + meta.location.prefix_match(&self.base_path).ok_or_else(|| { + PyErr::new::(format!( + "Path '{}' does not start with base path '{}'", + meta.location.as_ref(), + self.base_path.as_ref() + )) + })?; + let relative = Path::from_iter(relative_parts).as_ref().to_string(); + + paths.push(relative); + } + + Ok(paths) + })? + } + + /// Upload a file from local filesystem to the object store + /// + /// Parameters + /// ---------- + /// local_path : str + /// Local file path to upload + /// remote_path : str + /// Remote path relative to session's base_path + pub fn upload_file(&self, local_path: String, remote_path: String) -> PyResult<()> { + rt().block_on(None, async { + let local_file = tokio::fs::File::open(&local_path).await.map_err(|e| { + PyIOError::new_err(format!("Failed to open local file {}: {}", local_path, e)) + })?; + let mut reader = tokio::io::BufReader::new(local_file); + let full_path = self.base_path.child_path(&Path::from(remote_path)); + + let mut writer = + self.object_store.create(&full_path).await.map_err(|e| { + PyIOError::new_err(format!("Failed to create remote file: {}", e)) + })?; + + tokio::io::copy(&mut reader, &mut writer) + .await + .map_err(|e| PyIOError::new_err(format!("Failed to upload file: {}", e)))?; + writer + .shutdown() + .await + .map_err(|e| PyIOError::new_err(format!("Failed to finalize upload: {}", e)))?; + + Ok(()) + })? + } + + /// Download a file from object store to local filesystem + /// + /// Parameters + /// ---------- + /// remote_path : str + /// Remote path relative to session's base_path + /// local_path : str + /// Local file path where the file will be saved + pub fn download_file(&self, remote_path: String, local_path: String) -> PyResult<()> { + rt().block_on(None, async { + let full_path = self.base_path.child_path(&Path::from(remote_path)); + let get_result = self + .object_store + .inner + .get(&full_path) + .await + .map_err(|e| PyIOError::new_err(format!("Failed to get remote file: {}", e)))?; + + let mut stream = get_result.into_stream(); + let mut writer = tokio::fs::File::create(&local_path).await.map_err(|e| { + PyIOError::new_err(format!("Failed to create local file {}: {}", local_path, e)) + })?; + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result.map_err(|e| { + PyIOError::new_err(format!("Failed to read chunk from remote: {}", e)) + })?; + writer.write_all(&chunk).await.map_err(|e| { + PyIOError::new_err(format!("Failed to write chunk to local file: {}", e)) + })?; + } + + writer + .flush() + .await + .map_err(|e| PyIOError::new_err(format!("Failed to flush local file: {}", e)))?; + + Ok(()) + })? + } } #[pyclass] @@ -536,10 +641,17 @@ impl LanceFileReader { async fn open( uri_or_path: String, storage_options: Option>, + storage_options_provider: Option>, + s3_credentials_refresh_offset_seconds: Option, columns: Option>, ) -> PyResult { - let (object_store, path) = - object_store_from_uri_or_path(uri_or_path, storage_options).await?; + let (object_store, path) = object_store_from_uri_or_path_with_provider( + uri_or_path, + storage_options, + storage_options_provider, + s3_credentials_refresh_offset_seconds, + ) + .await?; Self::open_with_store(object_store, path, columns).await } @@ -635,13 +747,27 @@ impl LanceFileReader { #[pymethods] impl LanceFileReader { #[new] - #[pyo3(signature=(path, storage_options=None, columns=None))] + #[pyo3(signature=(path, storage_options=None, storage_options_provider=None, s3_credentials_refresh_offset_seconds=None, columns=None))] pub fn new( path: String, storage_options: Option>, + storage_options_provider: Option, + s3_credentials_refresh_offset_seconds: Option, columns: Option>, ) -> PyResult { - rt().block_on(None, Self::open(path, storage_options, columns))? + let provider = storage_options_provider + .map(crate::storage_options::py_object_to_storage_options_provider) + .transpose()?; + rt().block_on( + None, + Self::open( + path, + storage_options, + provider, + s3_credentials_refresh_offset_seconds, + columns, + ), + )? } pub fn read_all( diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 1883336feba..694ac976615 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -611,6 +611,11 @@ impl DynamicStorageOptionsCredentialProvider { } } + log::debug!( + "Refreshing S3 credentials from storage options provider: {}", + self.provider.provider_id() + ); + let storage_options_map = self .provider .fetch_storage_options() @@ -643,6 +648,24 @@ impl DynamicStorageOptionsCredentialProvider { source: Box::new(e), })?; + if let Some(expires_at) = expires_at_millis { + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_millis() as u64; + let expires_in_secs = (expires_at.saturating_sub(now_ms)) / 1000; + log::debug!( + "Successfully refreshed S3 credentials from provider: {}, credentials expire in {} seconds", + self.provider.provider_id(), + expires_in_secs + ); + } else { + log::debug!( + "Successfully refreshed S3 credentials from provider: {} (no expiration)", + self.provider.provider_id() + ); + } + *cache = Some(CachedCredential { credential: credential.clone(), expires_at_millis,