From 5b5a2d099a9e0f9a65bb280da50868c30f6f6aec Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 28 Oct 2025 23:41:42 -0700 Subject: [PATCH 1/4] commit --- .github/workflows/java.yml | 17 + .../lance/namespace/LanceNamespace.class | Bin 0 -> 7895 bytes java/lance-jni/Cargo.lock | 3 + java/lance-jni/Cargo.toml | 3 + java/lance-jni/src/blocking_dataset.rs | 104 ++- java/lance-jni/src/lib.rs | 2 + java/lance-jni/src/storage_options.rs | 197 ++++++ java/pom.xml | 18 + .../main/java/com/lancedb/lance/Dataset.java | 41 +- .../com/lancedb/lance/OpenDatasetBuilder.java | 243 +++++++ .../java/com/lancedb/lance/ReadOptions.java | 47 ++ .../lance/io/StorageOptionsProvider.java | 104 +++ .../LanceNamespaceStorageOptionsProvider.java | 127 ++++ .../lance/NamespaceIntegrationTest.java | 405 +++++++++++ python/Cargo.lock | 19 + python/Cargo.toml | 2 + python/pyproject.toml | 2 +- python/python/lance/__init__.py | 119 +++- python/python/lance/dataset.py | 69 +- python/python/lance/io.py | 140 ++++ python/python/lance/namespace.py | 138 ++++ .../tests/test_namespace_integration.py | 334 +++++++++ python/src/dataset.rs | 26 +- python/src/lib.rs | 1 + python/src/storage_options.rs | 169 +++++ rust/lance-io/src/object_store.rs | 30 +- .../src/object_store/providers/aws.rs | 649 +++++++++++++++++- .../src/object_store/storage_options.rs | 119 ++++ rust/lance-namespace-impls/src/dir.rs | 16 + rust/lance-namespace-impls/src/rest.rs | 19 + rust/lance-namespace/src/namespace.rs | 17 +- rust/lance-table/src/io/commit.rs | 3 + rust/lance/src/dataset/builder.rs | 179 ++++- 33 files changed, 3315 insertions(+), 47 deletions(-) create mode 100644 java/com/lancedb/lance/namespace/LanceNamespace.class create mode 100644 java/lance-jni/src/storage_options.rs create mode 100644 java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java create mode 100644 java/src/main/java/com/lancedb/lance/io/StorageOptionsProvider.java create mode 100644 java/src/main/java/com/lancedb/lance/namespace/LanceNamespaceStorageOptionsProvider.java create mode 100644 java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java create mode 100644 python/python/lance/io.py create mode 100644 python/python/lance/namespace.py create mode 100644 python/python/tests/test_namespace_integration.py create mode 100644 python/src/storage_options.rs create mode 100644 rust/lance-io/src/object_store/storage_options.rs diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index d99aa156561..b8bc66961ec 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -56,6 +56,21 @@ jobs: matrix: java-version: [8, 11, 17] name: Build and Test with Java ${{ matrix.java-version }} + services: + localstack: + image: localstack/localstack:4.0 + ports: + - 4566:4566 + env: + SERVICES: s3,dynamodb,kms + AWS_ACCESS_KEY_ID: ACCESS_KEY + AWS_SECRET_ACCESS_KEY: SECRET_KEY + options: >- + --health-cmd "curl -s http://localhost:4566/_localstack/health" + --health-interval 5s + --health-timeout 3s + --health-retries 3 + --health-start-period 10s steps: - name: Install dependencies run: | @@ -87,5 +102,7 @@ jobs: mvn spotless:check - name: Running tests with Java ${{ matrix.java-version }} working-directory: java + env: + LANCE_INTEGRATION_TEST: "1" run: | mvn install diff --git a/java/com/lancedb/lance/namespace/LanceNamespace.class b/java/com/lancedb/lance/namespace/LanceNamespace.class new file mode 100644 index 0000000000000000000000000000000000000000..1bec2b43572374d68c4092e03f1bf93d9b6d964f GIT binary patch literal 7895 zcmbW5*>@C06o+pGBtt?7`zA{SWfK_}Kwv_UEJ$PslCX$Tu`^Q)4Kv+CcMnNG1;qsy zRP^}3gAcy=;DfK8qvzkFfY#5psb>YY>x|9SXgmB+g2e>3Ot`^-7<6=?Y5T#1)uMje zs!_iO2rO9!-B7~!LEEu|Ap&c6SCbj00?&3Dl|2_r&_iG#N2^o6=Gik;5KFL(z}DQT znn$f5J!Uz9&ABmk&uynDD`bkpy&juG4Soq$5!jfcgzC;)Gfj$ruTe}AyUT`@V4Xa| zqYXq*FCMXhz^2?3L5SxrY$C8B_kSoPw%#m@sMFjGTjeFHxg94`G3m~tkOA3=D5CK+ z+X-yTIT8Io)@ldy@t~(=Uu?&xUNG(iZaiiufgL$ei+T+@o*%ed_NddY;~K}Hz4Byr z+N1$J>A9_V%szSbZg!~0{oSt<#mvW@I-M)Q^YS2(0XW3Q3@^cp1a{_B@w81?Q-Kwx z)Byt9bz?ot@vT}^Scl|^t!9`u$)IjHOki)#ZH1LuqxG5Sy)4qWTn&F0Io}OO39Lv8 z@9fNVS_=sDgjH!D;k6WGo28LM5Xx?ZDfVJ%;yWy|y2*>a1vTyMTS+_`#{ zdMBDqw`K*dhXqVv>6G1Ytf1o|$;y!i<(ij&87em4@@%sq(AiyGC?G8tv!qXqjJ^#bfZ@ZYe6#|P#kW8SzYCCkI)0&Cvo4~3nhNpSa^6apxSvuh8;F|3d*tg(} z*~P;-y1mF#0!J1!Tu3B=l{_^?UJ$rr61CBMCoU2=g|Ud;pn)2NeY9#A^IY>MFl!=b zB>TuHB1SS5QjQu(k*v9pzRVMffNvsWH2qX7A|hufBpf!7khA3i`bFo7o?ZlwnMe?c z7+DDs*iTI2sWHdCCOS3NTFbY%QJ2jW=W9Vbrf-( zmld0A61Qw|J2OfG)uaUaGTRTsES^knii!xx85R!78Z{TwZVL6iL7} zQFS3XcXBZ%C6qA}3QS5UW62%I4x2gN8h6-?RS^NB*$Gp-1Cs=kthtaL{1BhP4WF*( zgn@8B>>wsPEZJ0a9@gC0Dc`VKQ=N^O31?eM-S zJj#Su2Fbo5z8od6TEr%~03vX=P+SsA9+N^5NSh1wLq*i530(P#{I!;02A{~?h82+{ z+X_7<)f{{&9K zxRSM0!J5*tUSq7&P*t*e6|75I))~f{K+jbg`xLBcE$em0ItPlG|l%eul??}DXd zZB(#)Eo+9cYEW0Qo>Z`AwJge5SD~R~ZBel9Xj#`7%ZBSQ>lD`GqOiVLJ#JI5?lRUu zG%8b%H;@rlW)oUitppr!V`{d_pHfioYdvfugTPJjVpi6}9SYV1t%p7`(jErTQBt2# zP#j4)3=hc8$re+geH iJ$y+){YC5HL+0Vv@QspnP{I0D%lekF9-)_A@ZG<)@&A|r literal 0 HcmV?d00001 diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 428a6613e44..c4d30bc504d 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3479,6 +3479,7 @@ version = "0.38.3" dependencies = [ "arrow", "arrow-schema", + "async-trait", "chrono", "env_logger", "jni", @@ -3490,12 +3491,14 @@ dependencies = [ "lance-index", "lance-io", "lance-linalg", + "lance-namespace", "log", "object_store", "prost", "prost-types", "roaring", "serde_json", + "snafu", "tokio", "uuid", ] diff --git a/java/lance-jni/Cargo.toml b/java/lance-jni/Cargo.toml index 3fb53021fc4..45ddd020a98 100644 --- a/java/lance-jni/Cargo.toml +++ b/java/lance-jni/Cargo.toml @@ -19,6 +19,7 @@ lance-encoding = { path = "../../rust/lance-encoding" } lance-linalg = { path = "../../rust/lance-linalg" } lance-index = { path = "../../rust/lance-index" } lance-io = { path = "../../rust/lance-io" } +lance-namespace = { path = "../../rust/lance-namespace" } lance-core = { path = "../../rust/lance-core" } lance-file = { path = "../../rust/lance-file" } arrow = { version = "56.1", features = ["ffi"] } @@ -30,6 +31,8 @@ tokio = { version = "1.23", features = [ "fs", "sync", ] } +async-trait = "0.1" +snafu = "0.8" jni = "0.21.1" serde_json = { version = "1" } log = "0.4" diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 301c586501b..bc6b65b6b14 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -3,6 +3,7 @@ use crate::error::{Error, Result}; use crate::ffi::JNIEnvExt; +use crate::storage_options::JavaStorageOptionsProvider; use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, FromJString}; use crate::utils::{ build_compaction_options, extract_storage_options, extract_write_params, @@ -38,6 +39,7 @@ use lance_core::datatypes::Schema as LanceSchema; use lance_index::DatasetIndexExt; use lance_index::{IndexParams, IndexType}; use lance_io::object_store::ObjectStoreRegistry; +use lance_io::object_store::StorageOptionsProvider; use std::collections::HashMap; use std::iter::empty; use std::str::FromStr; @@ -77,6 +79,7 @@ impl BlockingDataset { Ok(Self { inner }) } + #[allow(clippy::too_many_arguments)] pub fn open( uri: &str, version: Option, @@ -85,14 +88,25 @@ impl BlockingDataset { metadata_cache_size_bytes: i64, storage_options: HashMap, serialized_manifest: Option<&[u8]>, + storage_options_provider: Option>, + s3_credentials_refresh_offset_seconds: Option, ) -> Result { + let mut store_params = ObjectStoreParams { + block_size: block_size.map(|size| size as usize), + storage_options: Some(storage_options.clone()), + ..Default::default() + }; + if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds { + store_params.s3_credentials_refresh_offset = + std::time::Duration::from_secs(offset_seconds); + } + if let Some(provider) = storage_options_provider.clone() { + store_params.storage_options_provider = Some(provider); + } let params = ReadParams { index_cache_size_bytes: index_cache_size_bytes as usize, metadata_cache_size_bytes: metadata_cache_size_bytes as usize, - store_options: Some(ObjectStoreParams { - block_size: block_size.map(|size| size as usize), - ..Default::default() - }), + store_options: Some(store_params), ..Default::default() }; @@ -102,6 +116,13 @@ impl BlockingDataset { builder = builder.with_version(ver as u64); } builder = builder.with_storage_options(storage_options); + if let Some(provider) = storage_options_provider { + builder = builder.with_storage_options_provider(provider) + } + if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds { + builder = builder + .with_s3_credentials_refresh_offset(std::time::Duration::from_secs(offset_seconds)); + } if let Some(serialized_manifest) = serialized_manifest { builder = builder.with_serialized_manifest(serialized_manifest)?; @@ -743,8 +764,10 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_openNative<'local>( block_size_obj: JObject, // Optional index_cache_size_bytes: jlong, metadata_cache_size_bytes: jlong, - storage_options_obj: JObject, // Map - serialized_manifest: JObject, // Optional + storage_options_obj: JObject, // Map + serialized_manifest: JObject, // Optional + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -756,7 +779,9 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_openNative<'local>( index_cache_size_bytes, metadata_cache_size_bytes, storage_options_obj, - serialized_manifest + serialized_manifest, + storage_options_provider_obj, + s3_credentials_refresh_offset_seconds_obj ) ) } @@ -769,14 +794,73 @@ fn inner_open_native<'local>( block_size_obj: JObject, // Optional index_cache_size_bytes: jlong, metadata_cache_size_bytes: jlong, - storage_options_obj: JObject, // Map - serialized_manifest: JObject, // Optional + storage_options_obj: JObject, // Map + serialized_manifest: JObject, // Optional + storage_options_provider_obj: JObject, // Optional + s3_credentials_refresh_offset_seconds_obj: JObject, // Optional ) -> Result> { let path_str: String = path.extract(env)?; let version = env.get_int_opt(&version_obj)?; let block_size = env.get_int_opt(&block_size_obj)?; let jmap = JMap::from_env(env, &storage_options_obj)?; let storage_options = to_rust_map(env, &jmap)?; + + // Extract storage options provider first (before get_bytes_opt which borrows env) + let storage_options_provider = if !storage_options_provider_obj.is_null() { + // Check if it's an Optional.empty() + let is_present = env + .call_method(&storage_options_provider_obj, "isPresent", "()Z", &[])? + .z()?; + if is_present { + // Get the value from Optional + let provider_obj = env + .call_method( + &storage_options_provider_obj, + "get", + "()Ljava/lang/Object;", + &[], + )? + .l()?; + Some(JavaStorageOptionsProvider::new(env, provider_obj)?) + } else { + None + } + } else { + None + }; + + let storage_options_provider_arc = + storage_options_provider.map(|v| Arc::new(v) as Arc); + + // Extract s3_credentials_refresh_offset_seconds + let s3_credentials_refresh_offset_seconds = + if !s3_credentials_refresh_offset_seconds_obj.is_null() { + let is_present = env + .call_method( + &s3_credentials_refresh_offset_seconds_obj, + "isPresent", + "()Z", + &[], + )? + .z()?; + if is_present { + let value = env + .call_method( + &s3_credentials_refresh_offset_seconds_obj, + "get", + "()Ljava/lang/Object;", + &[], + )? + .l()?; + let long_value = env.call_method(&value, "longValue", "()J", &[])?.j()?; + Some(long_value as u64) + } else { + None + } + } else { + None + }; + let serialized_manifest = env.get_bytes_opt(&serialized_manifest)?; let dataset = BlockingDataset::open( &path_str, @@ -786,6 +870,8 @@ fn inner_open_native<'local>( metadata_cache_size_bytes, storage_options, serialized_manifest, + storage_options_provider_arc, + s3_credentials_refresh_offset_seconds, )?; dataset.into_java(env) } diff --git a/java/lance-jni/src/lib.rs b/java/lance-jni/src/lib.rs index 9bd8c975075..50cf1379937 100644 --- a/java/lance-jni/src/lib.rs +++ b/java/lance-jni/src/lib.rs @@ -51,6 +51,7 @@ mod merge_insert; mod optimize; mod schema; mod sql; +mod storage_options; pub mod traits; mod transaction; pub mod utils; @@ -58,6 +59,7 @@ pub mod utils; pub use error::Error; pub use error::Result; pub use ffi::JNIEnvExt; +pub use storage_options::JavaStorageOptionsProvider; use env_logger::{Builder, Env}; use std::env; diff --git a/java/lance-jni/src/storage_options.rs b/java/lance-jni/src/storage_options.rs new file mode 100644 index 00000000000..a79a346f751 --- /dev/null +++ b/java/lance-jni/src/storage_options.rs @@ -0,0 +1,197 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use jni::objects::{JMap, JObject, JString}; +use jni::JNIEnv; +use lance_io::object_store::StorageOptionsProvider; + +use crate::error::Result; + +/// Java-implemented storage options provider +/// +/// This wraps a Java object that implements the StorageOptionsProvider interface +/// and forwards get_storage_options() calls to the Java implementation. +pub struct JavaStorageOptionsProvider { + /// GlobalRef to the Java StorageOptionsProvider object + java_provider: jni::objects::GlobalRef, + /// JavaVM for making JNI calls + jvm: Arc, +} + +impl std::fmt::Debug for JavaStorageOptionsProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.provider_id()) + } +} + +impl std::fmt::Display for JavaStorageOptionsProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.provider_id()) + } +} + +impl JavaStorageOptionsProvider { + pub fn new(env: &mut JNIEnv, java_provider: JObject) -> Result { + // Create a global reference to the Java object so it persists + let java_provider = env.new_global_ref(java_provider)?; + + // Get the JavaVM for later JNI calls + let jvm = Arc::new(env.get_java_vm()?); + + Ok(Self { java_provider, jvm }) + } +} + +#[async_trait] +impl StorageOptionsProvider for JavaStorageOptionsProvider { + async fn fetch_storage_options(&self) -> lance_core::Result>> { + // Spawn blocking task to call Java method + let java_provider = self.java_provider.clone(); + let jvm = self.jvm.clone(); + + tokio::task::spawn_blocking(move || { + // Attach current thread to JVM + let mut env = jvm + .attach_current_thread() + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to attach to JVM: {}", + e + ))), + location: snafu::location!(), + })?; + + // Call fetchStorageOptions() method on Java object + // Returns Map with all storage options including optional EXPIRES_AT_MILLIS_KEY + // Or null if no storage options are available + let result = env + .call_method( + &java_provider, + "fetchStorageOptions", + "()Ljava/util/Map;", + &[], + ) + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to call fetchStorageOptions: {}", + e + ))), + location: snafu::location!(), + })?; + + let result_obj = result.l().map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "fetchStorageOptions result is not an object: {}", + e + ))), + location: snafu::location!(), + })?; + + // Check if result is null + if result_obj.is_null() { + return Ok(None); + } + + // Convert Java Map to Rust HashMap + let storage_options_map = + JMap::from_env(&mut env, &result_obj).map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "fetchStorageOptions result is not a Map: {}", + e + ))), + location: snafu::location!(), + })?; + + let mut storage_options = HashMap::new(); + let mut iter = + storage_options_map + .iter(&mut env) + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to iterate storage options: {}", + e + ))), + location: snafu::location!(), + })?; + + while let Some((key, value)) = + iter.next(&mut env).map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to get next storage option entry: {}", + e + ))), + location: snafu::location!(), + })? + { + let key_str: String = env + .get_string(&JString::from(key)) + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "storage option key is not a string: {}", + e + ))), + location: snafu::location!(), + })? + .into(); + + let value_str: String = env + .get_string(&JString::from(value)) + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "storage option value is not a string: {}", + e + ))), + location: snafu::location!(), + })? + .into(); + + storage_options.insert(key_str, value_str); + } + + Ok(Some(storage_options)) + }) + .await + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to spawn blocking task: {}", + e + ))), + location: snafu::location!(), + })? + } + + fn provider_id(&self) -> String { + // Call providerId() method on the Java object + // This should always succeed since StorageOptionsProvider.providerId() has a default implementation + let mut env = self + .jvm + .attach_current_thread() + .expect("Failed to attach to JVM"); + + let result = env + .call_method( + &self.java_provider, + "providerId", + "()Ljava/lang/String;", + &[], + ) + .expect("Failed to call providerId() on Java StorageOptionsProvider"); + + let result_obj = result.l().expect("providerId() did not return an object"); + + if result_obj.is_null() { + panic!("providerId() returned null"); + } + + let jstring = JString::from(result_obj); + let java_string = env + .get_string(&jstring) + .expect("Failed to convert Java string to Rust string"); + + java_string.into() + } +} diff --git a/java/pom.xml b/java/pom.xml index e3729452fb0..97787ae8065 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -105,6 +105,24 @@ guava 33.3.1-jre + + com.lancedb + lance-namespace-core + 0.0.20 + + + + software.amazon.awssdk + s3 + 2.20.26 + test + + + software.amazon.awssdk + auth + 2.20.26 + test + diff --git a/java/src/main/java/com/lancedb/lance/Dataset.java b/java/src/main/java/com/lancedb/lance/Dataset.java index 2a870e15646..f62805ce474 100644 --- a/java/src/main/java/com/lancedb/lance/Dataset.java +++ b/java/src/main/java/com/lancedb/lance/Dataset.java @@ -16,6 +16,7 @@ import com.lancedb.lance.compaction.CompactionOptions; import com.lancedb.lance.index.IndexParams; import com.lancedb.lance.index.IndexType; +import com.lancedb.lance.io.StorageOptionsProvider; import com.lancedb.lance.ipc.DataStatistics; import com.lancedb.lance.ipc.LanceScanner; import com.lancedb.lance.ipc.ScanOptions; @@ -207,7 +208,7 @@ public static Dataset open(BufferAllocator allocator, String path, ReadOptions o * @param options the open options * @return Dataset */ - private static Dataset open( + static Dataset open( BufferAllocator allocator, boolean selfManagedAllocator, String path, ReadOptions options) { Preconditions.checkNotNull(path); Preconditions.checkNotNull(allocator); @@ -220,7 +221,9 @@ private static Dataset open( options.getIndexCacheSizeBytes(), options.getMetadataCacheSizeBytes(), options.getStorageOptions(), - options.getSerializedManifest()); + options.getSerializedManifest(), + options.getStorageOptionsProvider(), + options.getS3CredentialsRefreshOffsetSeconds()); dataset.allocator = allocator; dataset.selfManagedAllocator = selfManagedAllocator; return dataset; @@ -233,7 +236,39 @@ private static native Dataset openNative( long indexCacheSize, long metadataCacheSizeBytes, Map storageOptions, - Optional serializedManifest); + Optional serializedManifest, + Optional storageOptionsProvider, + Optional s3CredentialsRefreshOffsetSeconds); + + /** + * Creates a builder for opening a dataset. + * + *

This builder supports opening datasets either directly from a URI or from a LanceNamespace. + * + *

Example usage with URI: + * + *

{@code
+   * Dataset dataset = Dataset.open()
+   *     .uri("s3://bucket/table.lance")
+   *     .readOptions(options)
+   *     .build();
+   * }
+ * + *

Example usage with namespace: + * + *

{@code
+   * Dataset dataset = Dataset.open()
+   *     .namespace(myNamespace)
+   *     .tableId(Arrays.asList("my_table"))
+   *     .refreshStorageOptions(true)
+   *     .build();
+   * }
+ * + * @return A new OpenDatasetBuilder instance + */ + public static OpenDatasetBuilder open() { + return new OpenDatasetBuilder(); + } /** * Create a new version of dataset. Use {@link Transaction} instead diff --git a/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java b/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java new file mode 100644 index 00000000000..e55c8fbd2b1 --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java @@ -0,0 +1,243 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance; + +import com.lancedb.lance.namespace.LanceNamespace; +import com.lancedb.lance.namespace.LanceNamespaceStorageOptionsProvider; +import com.lancedb.lance.namespace.model.DescribeTableRequest; +import com.lancedb.lance.namespace.model.DescribeTableResponse; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Builder for opening a Dataset. + * + *

This builder provides a fluent API for opening datasets either directly from a URI or from a + * LanceNamespace. When using a namespace, the table location and storage options are automatically + * fetched. + * + *

Example usage with URI: + * + *

{@code
+ * Dataset dataset = Dataset.open()
+ *     .uri("s3://bucket/table.lance")
+ *     .readOptions(options)
+ *     .build();
+ * }
+ * + *

Example usage with namespace: + * + *

{@code
+ * Dataset dataset = Dataset.open()
+ *     .namespace(myNamespace)
+ *     .tableId(Arrays.asList("my_table"))
+ *     .refreshStorageOptions(true)
+ *     .build();
+ * }
+ */ +public class OpenDatasetBuilder { + private BufferAllocator allocator; + private boolean selfManagedAllocator = false; + private String uri; + private LanceNamespace namespace; + private List tableId; + private ReadOptions options = new ReadOptions.Builder().build(); + private boolean refreshStorageOptions = false; + + /** Creates a new builder instance. Package-private, use Dataset.open() instead. */ + OpenDatasetBuilder() {} + + /** + * Sets the buffer allocator. + * + * @param allocator Arrow buffer allocator + * @return this builder instance + */ + public OpenDatasetBuilder allocator(BufferAllocator allocator) { + Preconditions.checkNotNull(allocator); + this.allocator = allocator; + this.selfManagedAllocator = false; + return this; + } + + /** + * Sets the dataset URI. + * + *

Either uri() or namespace()+tableId() must be specified, but not both. + * + * @param uri The dataset URI (e.g., "s3://bucket/table.lance" or "file:///path/to/table.lance") + * @return this builder instance + */ + public OpenDatasetBuilder uri(String uri) { + this.uri = uri; + return this; + } + + /** + * Sets the namespace. + * + *

Must be used together with tableId(). Either uri() or namespace()+tableId() must be + * specified, but not both. + * + * @param namespace The namespace implementation to fetch table info from + * @return this builder instance + */ + public OpenDatasetBuilder namespace(LanceNamespace namespace) { + this.namespace = namespace; + return this; + } + + /** + * Sets the table identifier. + * + *

Must be used together with namespace(). Either uri() or namespace()+tableId() must be + * specified, but not both. + * + * @param tableId The table identifier (e.g., Arrays.asList("my_table")) + * @return this builder instance + */ + public OpenDatasetBuilder tableId(List tableId) { + this.tableId = tableId; + return this; + } + + /** + * Sets the read options. + * + * @param options Read options + * @return this builder instance + */ + public OpenDatasetBuilder readOptions(ReadOptions options) { + this.options = options; + return this; + } + + /** + * Sets whether storage options should be automatically refreshed before they expire. + * + *

This is only applicable when using namespace-based opening. It is currently only used for + * refreshing AWS temporary access credentials. When enabled, the namespace will be queried + * periodically to fetch new temporary credentials before the current ones expire. The new storage + * options will contain updated AWS access credentials with a new expiration time. + * + * @param refreshStorageOptions If true, storage options will be automatically refreshed + * @return this builder instance + */ + public OpenDatasetBuilder refreshStorageOptions(boolean refreshStorageOptions) { + this.refreshStorageOptions = refreshStorageOptions; + return this; + } + + /** + * Opens the dataset with the configured parameters. + * + *

If a namespace is configured, this automatically fetches the table location and storage + * options from the namespace via describe_table(). + * + * @return Dataset + * @throws IllegalArgumentException if required parameters are missing or invalid + */ + public Dataset build() { + // Validate that exactly one of uri or namespace+tableId is provided + boolean hasUri = uri != null; + boolean hasNamespace = namespace != null && tableId != null; + + if (hasUri && hasNamespace) { + throw new IllegalArgumentException( + "Cannot specify both uri and namespace+tableId. Use one or the other."); + } + if (!hasUri && !hasNamespace) { + if (namespace != null) { + throw new IllegalArgumentException( + "namespace is set but tableId is missing. Both namespace and tableId must be" + + " provided together."); + } else if (tableId != null) { + throw new IllegalArgumentException( + "tableId is set but namespace is missing. Both namespace and tableId must be" + + " provided together."); + } else { + throw new IllegalArgumentException("Either uri or namespace+tableId must be provided."); + } + } + + Preconditions.checkNotNull(options, "options must be set"); + + // Create allocator if not provided + if (allocator == null) { + allocator = new RootAllocator(Long.MAX_VALUE); + selfManagedAllocator = true; + } + + // Handle namespace-based opening + if (hasNamespace) { + return buildFromNamespace(); + } + + // Handle URI-based opening + return Dataset.open(allocator, selfManagedAllocator, uri, options); + } + + private Dataset buildFromNamespace() { + // Call describe_table to get location and storage options + DescribeTableRequest request = new DescribeTableRequest(); + request.setId(tableId); + // Only set version if present + options.getVersion().ifPresent(v -> request.setVersion(Long.valueOf(v))); + + DescribeTableResponse response = namespace.describeTable(request); + + // Extract location + String location = response.getLocation(); + if (location == null || location.isEmpty()) { + throw new IllegalArgumentException("Namespace did not return a table location"); + } + + // Build new ReadOptions with initial storage options + ReadOptions.Builder optionsBuilder = + new ReadOptions.Builder() + .setIndexCacheSizeBytes(options.getIndexCacheSizeBytes()) + .setMetadataCacheSizeBytes(options.getMetadataCacheSizeBytes()); + + // Only set storage options provider if refresh is enabled + if (refreshStorageOptions) { + LanceNamespaceStorageOptionsProvider storageOptionsProvider = + new LanceNamespaceStorageOptionsProvider(namespace, tableId); + optionsBuilder.setStorageOptionsProvider(storageOptionsProvider); + } + + // Set optional fields only if present + options.getVersion().ifPresent(optionsBuilder::setVersion); + options.getBlockSize().ifPresent(optionsBuilder::setBlockSize); + options.getSerializedManifest().ifPresent(optionsBuilder::setSerializedManifest); + options + .getS3CredentialsRefreshOffsetSeconds() + .ifPresent(optionsBuilder::setS3CredentialsRefreshOffsetSeconds); + + // Add initial storage options from describe_table response if present + Map storageOptions = new HashMap<>(options.getStorageOptions()); + if (response.getStorageOptions() != null) { + storageOptions.putAll(response.getStorageOptions()); + } + optionsBuilder.setStorageOptions(storageOptions); + + // Open dataset with regular open method + return Dataset.open(allocator, selfManagedAllocator, location, optionsBuilder.build()); + } +} diff --git a/java/src/main/java/com/lancedb/lance/ReadOptions.java b/java/src/main/java/com/lancedb/lance/ReadOptions.java index 45ec5584825..03f34ee224c 100644 --- a/java/src/main/java/com/lancedb/lance/ReadOptions.java +++ b/java/src/main/java/com/lancedb/lance/ReadOptions.java @@ -13,6 +13,8 @@ */ package com.lancedb.lance; +import com.lancedb.lance.io.StorageOptionsProvider; + import com.google.common.base.MoreObjects; import java.nio.ByteBuffer; @@ -29,6 +31,8 @@ public class ReadOptions { private final long metadataCacheSizeBytes; private final Optional serializedManifest; private final Map storageOptions; + private final Optional storageOptionsProvider; + private final Optional s3CredentialsRefreshOffsetSeconds; private ReadOptions(Builder builder) { this.version = builder.version; @@ -37,6 +41,8 @@ private ReadOptions(Builder builder) { this.metadataCacheSizeBytes = builder.metadataCacheSizeBytes; this.storageOptions = builder.storageOptions; this.serializedManifest = builder.serializedManifest; + this.storageOptionsProvider = builder.storageOptionsProvider; + this.s3CredentialsRefreshOffsetSeconds = builder.s3CredentialsRefreshOffsetSeconds; } public Optional getVersion() { @@ -63,6 +69,14 @@ public Optional getSerializedManifest() { return serializedManifest; } + public Optional getStorageOptionsProvider() { + return storageOptionsProvider; + } + + public Optional getS3CredentialsRefreshOffsetSeconds() { + return s3CredentialsRefreshOffsetSeconds; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -85,6 +99,8 @@ public static class Builder { private long metadataCacheSizeBytes = 1024 * 1024 * 1024; // Default to 1 GiB like Rust private Map storageOptions = new HashMap<>(); private Optional serializedManifest = Optional.empty(); + private Optional storageOptionsProvider = Optional.empty(); + private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); /** * Set the version of the dataset to read. If not set, read from latest version. @@ -190,6 +206,37 @@ public Builder setSerializedManifest(ByteBuffer serializedManifest) { return this; } + /** + * Set a custom storage options provider for automatic storage options refresh. + * + *

The storage options provider will be called automatically before storage options expire, + * enabling long-running operations on cloud storage without interruption. This is currently + * only used for refreshing AWS temporary access credentials. + * + * @param storageOptionsProvider the storage options provider implementation + * @return this builder + */ + public Builder setStorageOptionsProvider(StorageOptionsProvider storageOptionsProvider) { + this.storageOptionsProvider = Optional.of(storageOptionsProvider); + return this; + } + + /** + * Set the number of seconds before credential expiration to trigger a refresh. + * + *

Default is 60 seconds. Only applicable when using AWS S3 with temporary credentials. For + * example, if set to 60, credentials will be refreshed when they have less than 60 seconds + * remaining before expiration. This should be set shorter than the credential lifetime to avoid + * using expired credentials. + * + * @param s3CredentialsRefreshOffsetSeconds the refresh offset in seconds + * @return this builder + */ + public Builder setS3CredentialsRefreshOffsetSeconds(long s3CredentialsRefreshOffsetSeconds) { + this.s3CredentialsRefreshOffsetSeconds = Optional.of(s3CredentialsRefreshOffsetSeconds); + return this; + } + public ReadOptions build() { return new ReadOptions(this); } diff --git a/java/src/main/java/com/lancedb/lance/io/StorageOptionsProvider.java b/java/src/main/java/com/lancedb/lance/io/StorageOptionsProvider.java new file mode 100644 index 00000000000..6748e3afb29 --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/io/StorageOptionsProvider.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance.io; + +import java.util.Map; + +/** + * Interface for providing cloud storage options to Lance datasets. + * + *

Storage options providers enable automatic refresh for long-running operations on cloud + * storage (S3, Azure, GCS). This is currently only used for refreshing AWS temporary access + * credentials. Implement this interface to integrate with custom credential management systems such + * as AWS STS, GCP STS, or proprietary credential services. + * + *

The provider is called automatically before storage options expire, ensuring uninterrupted + * access during long-running queries, training jobs, or data processing. + * + *

Example Implementation

+ * + *
{@code
+ * public class MyStorageOptionsProvider implements StorageOptionsProvider {
+ *   public Map fetchStorageOptions() {
+ *     // Fetch from your credential service
+ *     Map credentials = new HashMap<>();
+ *     credentials.put("aws_access_key_id", "ASIA...");
+ *     credentials.put("aws_secret_access_key", "secret");
+ *     credentials.put("aws_session_token", "token");
+ *
+ *     long expiresAtMillis = System.currentTimeMillis() + 3600000L;
+ *     credentials.put("expires_at_millis", String.valueOf(expiresAtMillis));
+ *
+ *     return credentials;
+ *   }
+ * }
+ *
+ * // Use with dataset
+ * StorageOptionsProvider vendor = new MyStorageOptionsProvider();
+ * Dataset dataset = Dataset.open(
+ *     "s3://bucket/table.lance",
+ *     new ReadOptions.Builder()
+ *         .setStorageOptionsProvider(vendor)
+ *         .build()
+ * );
+ * }
+ * + *

Error Handling

+ * + *

If fetchStorageOptions() throws an exception, operations requiring credentials will fail. + * Implementations should handle recoverable errors internally (e.g., retry token refresh) and only + * throw exceptions for unrecoverable errors. + */ +public interface StorageOptionsProvider { + + /** + * Fetch fresh storage credentials. + * + *

This method is called automatically before each request and before existing credentials + * expire. It must return credentials in the format described below. + * + * @return Map of string key-value pairs containing cloud storage credentials and expiration time. + * Required key: + *

    + *
  • "expires_at_millis" (String): Unix timestamp in milliseconds (as string) when + * credentials expire. Lance will automatically call fetchStorageOptions() again before + * this time. + *
+ * Plus provider-specific credential keys: + *
    + *
  • AWS S3: "aws_access_key_id", "aws_secret_access_key", "aws_session_token" (optional) + *
  • Azure Blob Storage: "account_name", "account_key" or "sas_token" + *
  • Google Cloud Storage: "service_account_key" or "token" + *
+ * + * @throws RuntimeException if unable to fetch credentials + */ + Map fetchStorageOptions(); + + /** + * Return a human-readable unique identifier for this provider instance. + * + *

This is used for equality comparison and hashing in the object store registry. Two providers + * with the same ID will be treated as equal and share the same cached ObjectStore instance. + * + *

The default implementation uses the class name and toString() representation. Override this + * method to provide semantic equality based on configuration. + * + * @return A human-readable unique identifier string. For example: "MyProvider { endpoint: + * 'https://api.example.com' }" + */ + default String providerId() { + return this.getClass().getSimpleName() + " { repr: \"" + this.toString() + "\" }"; + } +} diff --git a/java/src/main/java/com/lancedb/lance/namespace/LanceNamespaceStorageOptionsProvider.java b/java/src/main/java/com/lancedb/lance/namespace/LanceNamespaceStorageOptionsProvider.java new file mode 100644 index 00000000000..88d0964933c --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/namespace/LanceNamespaceStorageOptionsProvider.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance.namespace; + +import com.lancedb.lance.io.StorageOptionsProvider; +import com.lancedb.lance.namespace.model.DescribeTableRequest; +import com.lancedb.lance.namespace.model.DescribeTableResponse; + +import java.util.List; +import java.util.Map; + +/** + * Storage options provider that fetches storage options from a LanceNamespace. + * + *

This provider automatically fetches fresh storage options by calling the namespace's + * describeTable() method, which returns both the table location and time-limited storage options. + * This is currently only used for refreshing AWS temporary access credentials. + * + *

This is the recommended approach for LanceDB Cloud and other namespace-based deployments, as + * it handles storage options refresh automatically. + * + *

Example Usage

+ * + *
{@code
+ * // Connect to a namespace (e.g., LanceDB Cloud)
+ * LanceNamespace namespace = LanceNamespaces.connect("rest", Map.of(
+ *     "url", "https://api.lancedb.com",
+ *     "api_key", "your-api-key"
+ * ));
+ *
+ * // Create storage options provider
+ * LanceNamespaceStorageOptionsProvider provider = new LanceNamespaceStorageOptionsProvider(
+ *     namespace,
+ *     Arrays.asList("workspace", "table_name")
+ * );
+ *
+ * // Use with dataset - storage options auto-refresh!
+ * Dataset dataset = Dataset.open(
+ *     "s3://bucket/table.lance",
+ *     new ReadOptions.Builder()
+ *         .setStorageOptionsProvider(provider)
+ *         .build()
+ * );
+ * }
+ */ +public class LanceNamespaceStorageOptionsProvider implements StorageOptionsProvider { + + private final com.lancedb.lance.namespace.LanceNamespace namespace; + private final List tableId; + + /** + * Create a storage options provider that fetches storage options from a LanceNamespace. + * + * @param namespace The namespace instance to fetch storage options from + * @param tableId The table identifier (e.g., ["workspace", "table_name"]) + */ + public LanceNamespaceStorageOptionsProvider( + com.lancedb.lance.namespace.LanceNamespace namespace, List tableId) { + this.namespace = namespace; + this.tableId = tableId; + } + + /** + * Fetch credentials from the namespace. + * + *

This calls namespace.describeTable() to get the latest credentials and their expiration + * time. + * + * @return Flat map of string key-value pairs containing credentials and expires_at_millis + * @throws RuntimeException if the namespace doesn't return storage credentials or expiration time + */ + @Override + public Map fetchStorageOptions() { + // Create describe table request with table ID + DescribeTableRequest request = new DescribeTableRequest(); + request.setId(tableId); + + // Call namespace to describe the table and get credentials + DescribeTableResponse response = namespace.describeTable(request); + + // Extract storage options - should already be a flat Map + Map storageOptions = response.getStorageOptions(); + if (storageOptions == null || storageOptions.isEmpty()) { + throw new RuntimeException( + "Namespace did not return storage_options. " + + "Ensure the namespace supports credential vending."); + } + + // Verify expires_at_millis is present + if (!storageOptions.containsKey("expires_at_millis")) { + throw new RuntimeException( + "Namespace storage_options missing 'expires_at_millis'. " + + "Credential refresh will not work properly."); + } + + // Return storage_options directly - it's already a flat Map + return storageOptions; + } + + /** + * Return a human-readable unique identifier for this provider instance. + * + *

This creates a semantic ID based on the namespace's ID and the table ID, enabling proper + * equality comparison and caching. + * + * @return A human-readable unique identifier string combining namespace and table info + */ + @Override + public String providerId() { + // Call namespaceId() on the namespace (requires lance-namespace >= 0.0.20) + String namespaceId = namespace.namespaceId(); + return String.format( + "LanceNamespaceStorageOptionsProvider { namespace: %s, table_id: %s }", + namespaceId, tableId); + } +} diff --git a/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java b/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java new file mode 100644 index 00000000000..975d54583a0 --- /dev/null +++ b/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java @@ -0,0 +1,405 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance; + +import com.lancedb.lance.namespace.LanceNamespace; +import com.lancedb.lance.namespace.model.DescribeTableRequest; +import com.lancedb.lance.namespace.model.DescribeTableResponse; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.net.URI; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Integration tests for Lance with S3 and credential refresh using StorageOptionsProvider. + * + *

This test simulates a mock credential provider that returns incrementing credentials and + * verifies that the credential refresh mechanism works correctly. + * + *

These tests require LocalStack to be running. Run with: docker compose up -d + * + *

Set LANCE_INTEGRATION_TEST=1 environment variable to enable these tests. + */ +@EnabledIfEnvironmentVariable(named = "LANCE_INTEGRATION_TEST", matches = "1") +public class NamespaceIntegrationTest { + + private static final String ENDPOINT_URL = "http://localhost:4566"; + private static final String REGION = "us-east-1"; + private static final String ACCESS_KEY = "ACCESS_KEY"; + private static final String SECRET_KEY = "SECRET_KEY"; + private static final String BUCKET_NAME = "lance-namespace-integtest-java"; + + private static S3Client s3Client; + + @BeforeAll + static void setup() { + s3Client = + S3Client.builder() + .endpointOverride(URI.create(ENDPOINT_URL)) + .region(Region.of(REGION)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY))) + .forcePathStyle(true) // Required for LocalStack + .build(); + + // Delete bucket if it exists from previous run + try { + deleteBucket(); + } catch (Exception e) { + // Ignore if bucket doesn't exist + } + + // Create test bucket + s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); + } + + @AfterAll + static void tearDown() { + if (s3Client != null) { + try { + deleteBucket(); + } catch (Exception e) { + // Ignore cleanup errors + } + s3Client.close(); + } + } + + private static void deleteBucket() { + // Delete all objects first + List objects = + s3Client + .listObjectsV2(ListObjectsV2Request.builder().bucket(BUCKET_NAME).build()) + .contents(); + for (S3Object obj : objects) { + s3Client.deleteObject( + DeleteObjectRequest.builder().bucket(BUCKET_NAME).key(obj.key()).build()); + } + s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(BUCKET_NAME).build()); + } + + /** + * Mock LanceNamespace implementation for testing. + * + *

This implementation: - Returns table location and storage options via describeTable() - + * Tracks the number of times describeTable has been called - Returns credentials with short + * expiration times for testing refresh + */ + static class MockLanceNamespace implements LanceNamespace { + private final Map tableLocations = new HashMap<>(); + private final Map baseStorageOptions; + private final int credentialExpiresInSeconds; + private final AtomicInteger callCount = new AtomicInteger(0); + + public MockLanceNamespace(Map storageOptions, int credentialExpiresInSeconds) { + this.baseStorageOptions = new HashMap<>(storageOptions); + this.credentialExpiresInSeconds = credentialExpiresInSeconds; + } + + @Override + public void initialize(Map configProperties, BufferAllocator allocator) { + // Not needed for test + } + + public void registerTable(String tableName, String location) { + tableLocations.put(tableName, location); + } + + public int getCallCount() { + return callCount.get(); + } + + @Override + public String namespaceId() { + return "MockLanceNamespace { }"; + } + + @Override + public DescribeTableResponse describeTable(DescribeTableRequest request) { + int count = callCount.incrementAndGet(); + + String tableName = String.join("/", request.getId()); + String location = tableLocations.get(tableName); + if (location == null) { + throw new IllegalArgumentException("Table not found: " + tableName); + } + + // Create storage options with expiration + Map storageOptions = new HashMap<>(baseStorageOptions); + long expiresAtMillis = System.currentTimeMillis() + (credentialExpiresInSeconds * 1000L); + storageOptions.put("expires_at_millis", String.valueOf(expiresAtMillis)); + + DescribeTableResponse response = new DescribeTableResponse(); + response.setLocation(location); + response.setStorageOptions(storageOptions); + if (request.getVersion() != null) { + response.setVersion(request.getVersion()); + } + + return response; + } + } + + @Test + void testOpenDatasetWithoutRefresh() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Create test dataset directly on S3 + String tableName = UUID.randomUUID().toString(); + String tableUri = "s3://" + BUCKET_NAME + "/" + tableName + ".lance"; + + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create schema and write dataset + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + + aVector.set(0, 1); + bVector.set(0, 2); + aVector.set(1, 10); + bVector.set(1, 20); + + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + WriteParams writeParams = + new WriteParams.Builder().withStorageOptions(storageOptions).build(); + + // Create dataset using Dataset.create + try (Dataset dataset = Dataset.create(allocator, tableUri, schema, writeParams)) { + // Add data via fragments + List fragments = + Fragment.create(tableUri, allocator, root, writeParams); + FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments); + try (Dataset updatedDataset = + Dataset.commit(allocator, tableUri, appendOp, Optional.of(1L), storageOptions)) { + assertEquals(2, updatedDataset.version()); + assertEquals(2, updatedDataset.countRows()); + } + } + } + + // Create mock namespace with 60-second expiration (long enough to not expire during test) + MockLanceNamespace namespace = new MockLanceNamespace(storageOptions, 60); + namespace.registerTable(tableName, tableUri); + + // Open dataset through namespace WITH refresh enabled + // Use 10-second refresh offset, so credentials effectively expire at T+50s + ReadOptions readOptions = + new ReadOptions.Builder() + .setS3CredentialsRefreshOffsetSeconds(10) // Refresh 10s before expiration + .build(); + + int callCountBeforeOpen = namespace.getCallCount(); + try (Dataset dsFromNamespace = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .refreshStorageOptions(true) // Refresh enabled + .readOptions(readOptions) + .build()) { + // With the fix, describeTable should only be called once during open + // to get the table location and initial storage options + int callCountAfterOpen = namespace.getCallCount(); + assertEquals( + 1, + callCountAfterOpen - callCountBeforeOpen, + "describeTable should be called exactly once during open, got: " + + (callCountAfterOpen - callCountBeforeOpen)); + + // Verify we can read the data multiple times + assertEquals(2, dsFromNamespace.countRows()); + assertEquals(2, dsFromNamespace.countRows()); + assertEquals(2, dsFromNamespace.countRows()); + + // Perform operations that access S3 + List fragments = dsFromNamespace.getFragments(); + assertEquals(1, fragments.size()); + List versions = dsFromNamespace.listVersions(); + assertEquals(2, versions.size()); + + // With the fix, credentials are cached so no additional calls are made + int finalCallCount = namespace.getCallCount(); + int totalCalls = finalCallCount - callCountBeforeOpen; + assertEquals( + 1, + totalCalls, + "describeTable should only be called once total (credentials are cached), got: " + + totalCalls); + } + } + } + + @Test + void testStorageOptionsProviderWithRefresh() throws Exception { + try (BufferAllocator allocator = new RootAllocator()) { + // Create test dataset + String tableName = UUID.randomUUID().toString(); + String tableUri = "s3://" + BUCKET_NAME + "/" + tableName + ".lance"; + + Map storageOptions = new HashMap<>(); + storageOptions.put("allow_http", "true"); + storageOptions.put("aws_access_key_id", ACCESS_KEY); + storageOptions.put("aws_secret_access_key", SECRET_KEY); + storageOptions.put("aws_endpoint", ENDPOINT_URL); + storageOptions.put("aws_region", REGION); + + // Create schema and write dataset + Schema schema = + new Schema( + Arrays.asList( + new Field("a", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("b", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector aVector = (IntVector) root.getVector("a"); + IntVector bVector = (IntVector) root.getVector("b"); + + aVector.allocateNew(2); + bVector.allocateNew(2); + + aVector.set(0, 1); + bVector.set(0, 2); + aVector.set(1, 10); + bVector.set(1, 20); + + aVector.setValueCount(2); + bVector.setValueCount(2); + root.setRowCount(2); + + WriteParams writeParams = + new WriteParams.Builder().withStorageOptions(storageOptions).build(); + + // Create dataset using Dataset.create + try (Dataset dataset = Dataset.create(allocator, tableUri, schema, writeParams)) { + // Add data via fragments + List fragments = + Fragment.create(tableUri, allocator, root, writeParams); + FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments); + try (Dataset updatedDataset = + Dataset.commit(allocator, tableUri, appendOp, Optional.of(1L), storageOptions)) { + assertEquals(2, updatedDataset.countRows()); + } + } + } + + // Create mock namespace with 5-second expiration for faster testing + MockLanceNamespace namespace = new MockLanceNamespace(storageOptions, 5); + namespace.registerTable(tableName, tableUri); + + // Open dataset through namespace with refresh enabled + // Use 2-second refresh offset so credentials effectively expire at T+3s (5s - 2s) + ReadOptions readOptions = + new ReadOptions.Builder() + .setS3CredentialsRefreshOffsetSeconds(2) // Refresh 2s before expiration + .build(); + + int callCountBeforeOpen = namespace.getCallCount(); + try (Dataset dsFromNamespace = + Dataset.open() + .allocator(allocator) + .namespace(namespace) + .tableId(Arrays.asList(tableName)) + .refreshStorageOptions(true) // Enable automatic refresh + .readOptions(readOptions) + .build()) { + // With the fix, describeTable should only be called once during open + int callCountAfterOpen = namespace.getCallCount(); + assertEquals( + 1, + callCountAfterOpen - callCountBeforeOpen, + "describeTable should be called exactly once during open, got: " + + (callCountAfterOpen - callCountBeforeOpen)); + + // Verify we can read the data + assertEquals(2, dsFromNamespace.countRows()); + + // Record call count after initial reads + int callCountAfterInitialReads = namespace.getCallCount(); + int callsAfterFirstRead = callCountAfterInitialReads - callCountBeforeOpen; + assertEquals( + 1, + callsAfterFirstRead, + "describeTable should still be 1 (credentials are cached), got: " + + callsAfterFirstRead); + + // Wait for credentials to be close to expiring (4 seconds - past the 3s refresh threshold) + Thread.sleep(4000); + + // Perform read operations after expiration + // Access fragments and versions which require S3 access and trigger credential refresh + assertEquals(2, dsFromNamespace.countRows()); + List fragments = dsFromNamespace.getFragments(); + assertEquals(1, fragments.size()); + List versions = dsFromNamespace.listVersions(); + assertEquals(2, versions.size()); + + int finalCallCount = namespace.getCallCount(); + int totalCallsAfterExpiration = finalCallCount - callCountBeforeOpen; + assertEquals( + 2, + totalCallsAfterExpiration, + "Credentials should be refreshed once after expiration. " + + "Expected 2 total calls (1 initial + 1 refresh), got: " + + totalCallsAfterExpiration); + } + } + } +} diff --git a/python/Cargo.lock b/python/Cargo.lock index b7fb9183e52..bce7a2ba5d6 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4191,6 +4191,23 @@ dependencies = [ "snafu", ] +[[package]] +name = "lance-namespace-impls" +version = "0.38.3" +dependencies = [ + "arrow", + "arrow-ipc", + "arrow-schema", + "async-trait", + "bytes", + "lance", + "lance-core", + "lance-io", + "lance-namespace", + "object_store", + "snafu", +] + [[package]] name = "lance-namespace-reqwest-client" version = "0.0.18" @@ -5743,6 +5760,8 @@ dependencies = [ "lance-index", "lance-io", "lance-linalg", + "lance-namespace", + "lance-namespace-impls", "lance-table", "libc", "log", diff --git a/python/Cargo.toml b/python/Cargo.toml index 4044655d916..d3d2777d82e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -44,6 +44,8 @@ lance-index = { path = "../rust/lance-index", features = [ ] } lance-io = { path = "../rust/lance-io" } lance-linalg = { path = "../rust/lance-linalg" } +lance-namespace = { path = "../rust/lance-namespace" } +lance-namespace-impls = { path = "../rust/lance-namespace-impls" } lance-table = { path = "../rust/lance-table" } lance-datafusion = { path = "../rust/lance-datafusion" } libc = "0.2.176" diff --git a/python/pyproject.toml b/python/pyproject.toml index 32168d32e5c..180f7bdaf46 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "pylance" dynamic = ["version"] -dependencies = ["pyarrow>=14", "numpy>=1.22"] +dependencies = ["pyarrow>=14", "numpy>=1.22", "lance_namespace>=0.0.20"] description = "python wrapper for Lance columnar format" authors = [{ name = "Lance Devs", email = "dev@lancedb.com" }] license = { file = "LICENSE" } diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index ca94f80bf52..f77692399be 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -8,7 +8,7 @@ import warnings from typing import TYPE_CHECKING, Dict, Optional, Union -from . import log +from . import io, log from .blob import BlobColumn, BlobFile from .dataset import ( DataStatistics, @@ -32,6 +32,7 @@ bytes_read_counter, iops_counter, ) +from .namespace import LanceNamespaceStorageOptionsProvider from .schema import json_to_schema, schema_to_json from .util import sanitize_ts @@ -57,24 +58,26 @@ "LanceFragment", "LanceOperation", "LanceScanner", + "LanceNamespaceStorageOptionsProvider", "MergeInsertBuilder", "ScanStatistics", "Transaction", "__version__", + "batch_udf", "bytes_read_counter", + "dataset", + "io", "iops_counter", - "write_dataset", - "schema_to_json", "json_to_schema", - "dataset", - "batch_udf", + "schema_to_json", "set_logger", + "write_dataset", "FFILanceTableProvider", ] def dataset( - uri: Union[str, Path], + uri: Optional[Union[str, Path]] = None, version: Optional[int | str] = None, asof: Optional[ts_types] = None, block_size: Optional[int] = None, @@ -86,15 +89,20 @@ def dataset( index_cache_size_bytes: Optional[int] = None, read_params: Optional[Dict[str, any]] = None, session: Optional[Session] = None, + namespace: Optional[any] = None, + table_id: Optional[list] = None, + refresh_storage_options: bool = False, + s3_credentials_refresh_offset_seconds: Optional[int] = None, ) -> LanceDataset: """ Opens the Lance dataset from the address specified. Parameters ---------- - uri : str + uri : str, optional Address to the Lance dataset. It can be a local file path `/tmp/data.lance`, or a cloud object store URI, i.e., `s3://bucket/data.lance`. + Either `uri` or (`namespace` + `table_id`) must be provided, but not both. version : optional, int | str If specified, load a specific version of the Lance dataset. Else, loads the latest version. A version number (`int`) or a tag (`str`) can be provided. @@ -143,7 +151,100 @@ def dataset( session : optional, lance.Session A session to use for this dataset. This contains the caches used by the across multiple datasets. + namespace : optional + A namespace instance from which to fetch table location and storage options. + This can be any object with a describe_table(table_id, version) method + that returns a dict with 'location' and 'storage_options' keys. + For example, use lance_namespace.connect() from the lance_namespace package. + Must be provided together with `table_id`. Cannot be used with `uri`. + When provided, the table location will be fetched automatically from the + namespace via describe_table(). + table_id : optional, list of str + The table identifier when using a namespace (e.g., ["my_table"]). + Must be provided together with `namespace`. Cannot be used with `uri`. + refresh_storage_options : bool, default False + Only applicable when using `namespace` and `table_id`. If True, storage + options will be automatically refreshed from the namespace before they + expire. This is currently only used for refreshing AWS temporary access + credentials. When enabled, the namespace will be queried periodically to + fetch new temporary credentials before the current ones expire. The new + storage options will contain updated AWS access credentials with a new + expiration time. If False (default), only the initial storage options + from describe_table() will be used. + s3_credentials_refresh_offset_seconds : optional, int + The number of seconds before credential expiration to trigger a refresh. + Default is 60 seconds. Only applicable when using AWS S3 with temporary + credentials. For example, if set to 60, credentials will be refreshed + when they have less than 60 seconds remaining before expiration. This + should be set shorter than the credential lifetime to avoid using + expired credentials. + + Notes + ----- + When using `namespace` and `table_id`: + - The `uri` parameter is optional and will be fetched from the namespace + - A `LanceNamespaceStorageOptionsProvider` will be created automatically only + if `refresh_storage_options=True` + - Initial storage options from describe_table() will be merged with + any provided `storage_options` """ + # Validate that user provides either uri OR (namespace + table_id), not both + has_uri = uri is not None + has_namespace = namespace is not None or table_id is not None + + if has_uri and has_namespace: + raise ValueError( + "Cannot specify both 'uri' and 'namespace/table_id'. " + "Please provide either 'uri' or both 'namespace' and 'table_id'." + ) + elif not has_uri and not has_namespace: + raise ValueError( + "Must specify either 'uri' or both 'namespace' and 'table_id'." + ) + + # Handle namespace resolution in Python + storage_options_provider = None + if namespace is not None: + if table_id is None: + raise ValueError( + "Both 'namespace' and 'table_id' must be provided together." + ) + + table_info = namespace.describe_table(table_id=table_id, version=version) + uri = table_info.get("location") + if uri is None: + raise ValueError("Namespace did not return a 'location' for the table") + + # Extract storage options from namespace + namespace_storage_options = table_info.get("storage_options", {}) + + # Create storage options provider if refresh is enabled + if refresh_storage_options: + storage_options_provider = LanceNamespaceStorageOptionsProvider( + namespace=namespace, table_id=table_id + ) + # Merge storage options (namespace takes precedence) + # Pass initial credentials to Rust so it can cache them and avoid + # immediately calling fetch_storage_options() during object store creation + if storage_options is None: + storage_options = namespace_storage_options + else: + # Merge: user options first, then override with namespace options + merged_options = dict(storage_options) + merged_options.update(namespace_storage_options) + storage_options = merged_options + else: + # Without refresh, merge storage options (namespace takes precedence) + if storage_options is None: + storage_options = namespace_storage_options + else: + # Merge: user options first, then override with namespace options + merged_options = dict(storage_options) + merged_options.update(namespace_storage_options) + storage_options = merged_options + elif table_id is not None: + raise ValueError("Both 'namespace' and 'table_id' must be provided together.") + ds = LanceDataset( uri, version, @@ -156,6 +257,8 @@ def dataset( index_cache_size_bytes=index_cache_size_bytes, read_params=read_params, session=session, + storage_options_provider=storage_options_provider, + s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds, ) if version is None and asof is not None: ts_cutoff = sanitize_ts(asof) @@ -179,6 +282,8 @@ def dataset( index_cache_size_bytes=index_cache_size_bytes, read_params=read_params, session=session, + storage_options_provider=storage_options_provider, + s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds, ) else: return ds diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index c1dc97241c1..a9e8bca873b 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -417,6 +417,8 @@ def __init__( index_cache_size_bytes: Optional[int] = None, read_params: Optional[Dict[str, Any]] = None, session: Optional[Session] = None, + storage_options_provider: Optional[Any] = None, + s3_credentials_refresh_offset_seconds: Optional[int] = None, ): uri = os.fspath(uri) if isinstance(uri, Path) else uri self._uri = uri @@ -447,6 +449,8 @@ def __init__( index_cache_size_bytes=index_cache_size_bytes, read_params=read_params, session=session, + storage_options_provider=storage_options_provider, + s3_credentials_refresh_offset_seconds=s3_credentials_refresh_offset_seconds, ) self._default_scan_options = default_scan_options self._read_params = read_params @@ -5074,7 +5078,7 @@ def data_stats(self) -> DataStatistics: def write_dataset( data_obj: ReaderLike, - uri: Union[str, Path, LanceDataset], + uri: Optional[Union[str, Path, LanceDataset]] = None, schema: Optional[pa.Schema] = None, mode: str = "create", *, @@ -5095,6 +5099,8 @@ def write_dataset( transaction_properties: Optional[Dict[str, str]] = None, initial_bases: Optional[List[DatasetBasePath]] = None, target_bases: Optional[List[str]] = None, + namespace: Optional[any] = None, + table_id: Optional[list] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -5104,9 +5110,10 @@ def write_dataset( The data to be written. Acceptable types are: - Pandas DataFrame, Pyarrow Table, Dataset, Scanner, or RecordBatchReader - Huggingface dataset - uri: str, Path, or LanceDataset + uri: str, Path, LanceDataset, or None Where to write the dataset to (directory). If a LanceDataset is passed, the session will be reused. + Either `uri` or (`namespace` + `table_id`) must be provided, but not both. schema: Schema, optional If specified and the input is a pandas DataFrame, use this schema instead of the default pandas to arrow table conversion. @@ -5186,7 +5193,65 @@ def write_dataset( **CREATE mode**: References must match bases in `initial_bases` **APPEND/OVERWRITE modes**: References must match bases in the existing manifest + namespace : optional, any + A namespace instance from which to fetch table location and storage options. + Must be provided together with `table_id`. Cannot be used with `uri`. + When provided, the table location will be fetched automatically from the + namespace via describe_table(). Storage options will be automatically refreshed + before they expire. + table_id : optional, list of str + The table identifier when using a namespace (e.g., ["my_table"]). + Must be provided together with `namespace`. Cannot be used with `uri`. + + Notes + ----- + When using `namespace` and `table_id`: + - The `uri` parameter is optional and will be fetched from the namespace + - A `LanceNamespaceStorageOptionsProvider` will be created automatically for + storage options refresh + - Initial storage options from describe_table() will be merged with + any provided `storage_options` """ + # Validate that user provides either uri OR (namespace + table_id), not both + has_uri = uri is not None + has_namespace = namespace is not None or table_id is not None + + if has_uri and has_namespace: + raise ValueError( + "Cannot specify both 'uri' and 'namespace/table_id'. " + "Please provide either 'uri' or both 'namespace' and 'table_id'." + ) + elif not has_uri and not has_namespace: + raise ValueError( + "Must specify either 'uri' or both 'namespace' and 'table_id'." + ) + + # Handle namespace-based dataset writing + if namespace is not None: + if table_id is None: + raise ValueError( + "Both 'namespace' and 'table_id' must be provided together." + ) + + # Call describe_table to get location and storage options + table_info = namespace.describe_table(table_id=table_id, version=None) + + # Extract location from namespace response + uri = table_info.get("location") + if not uri: + raise ValueError("Namespace did not return a table location") + + # Merge initial storage options from describe_table with user-provided options + namespace_storage_options = table_info.get("storage_options", {}) + if storage_options: + # User-provided options take precedence + merged_storage_options = {**namespace_storage_options, **storage_options} + else: + merged_storage_options = namespace_storage_options + storage_options = merged_storage_options + elif table_id is not None: + raise ValueError("Both 'namespace' and 'table_id' must be provided together.") + if use_legacy_format is not None: warnings.warn( "use_legacy_format is deprecated, use data_storage_version instead", diff --git a/python/python/lance/io.py b/python/python/lance/io.py new file mode 100644 index 00000000000..b12d6dc106f --- /dev/null +++ b/python/python/lance/io.py @@ -0,0 +1,140 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +"""I/O utilities for Lance datasets. + +This module provides utilities for customizing how Lance datasets interact with +cloud storage, including credential management for long-running operations. +""" + +from abc import ABC, abstractmethod +from typing import Dict + + +class StorageOptionsProvider(ABC): + """Abstract base class for providing storage options to Lance datasets. + + Storage options providers enable automatic refresh for long-running operations + on cloud storage (S3, Azure, GCS). This is currently only used for refreshing + AWS temporary access credentials. Implement this interface to integrate with + custom credential management systems such as AWS STS, GCP STS, or + proprietary credential services. + + The provider is called automatically before storage options expire, ensuring + uninterrupted access during long-running queries, training jobs, or data processing. + + Example + ------- + >>> import lance + >>> class MyStorageOptionsProvider(StorageOptionsProvider): + ... def fetch_storage_options(self): + ... # Fetch from your credential service + ... return { + ... "aws_access_key_id": "ASIA...", + ... "aws_secret_access_key": "secret", + ... "aws_session_token": "token", + ... "expires_at_millis": "1234567890000", + ... } + ... + >>> provider = MyStorageOptionsProvider() + >>> dataset = lance.dataset( # doctest: +SKIP + ... "s3://bucket/table.lance", storage_options_provider=provider + ... ) + + Error Handling + -------------- + If fetch_storage_options() raises an exception, operations requiring + credentials will fail. Implementations should handle recoverable errors + internally (e.g., retry token refresh) and only raise exceptions for + unrecoverable errors. + """ + + @abstractmethod + def fetch_storage_options(self) -> Dict[str, str]: + """Get fresh storage credentials. + + This method is called automatically before each request and before existing + credentials expire. It must return credentials in the format below. + + Returns + ------- + Dict[str, str] + Dictionary of string key-value pairs containing cloud storage credentials + and expiration time. Required keys: + + - "expires_at_millis" (str): Unix timestamp in milliseconds (as string) + when credentials expire. Lance will automatically call + fetch_storage_options() again before this time. + + Plus provider-specific credential keys: + + AWS S3: + - "aws_access_key_id" (str): AWS access key + - "aws_secret_access_key" (str): AWS secret key + - "aws_session_token" (str, optional): Session token for temporary + credentials + + Azure Blob Storage: + - "account_name" (str): Storage account name + - "account_key" (str): Storage account key + - Or "sas_token" (str): SAS token + + Google Cloud Storage: + - "service_account_key" (str): Service account JSON key + - Or "token" (str): OAuth token + + Raises + ------ + Exception + If unable to fetch credentials, the exception will be propagated + and operations requiring credentials will fail. + + Example + ------- + >>> def fetch_storage_options(self): + ... # Example: AWS temporary credentials + ... response = sts_client.assume_role( + ... RoleArn='arn:aws:iam::123456789012:role/DataReader', + ... RoleSessionName='lance-session' + ... ) + ... creds = response['Credentials'] + ... expires_at_millis = int(creds['Expiration'].timestamp() * 1000) + ... return { + ... "aws_access_key_id": creds['AccessKeyId'], + ... "aws_secret_access_key": creds['SecretAccessKey'], + ... "aws_session_token": creds['SessionToken'], + ... "expires_at_millis": str(expires_at_millis), + ... } + """ + pass + + def provider_id(self) -> str: + """Return a human-readable unique identifier for this provider instance. + + This is used for equality comparison and hashing in the object store + registry. Two providers with the same ID will be treated as equal and + share the same cached ObjectStore instance. + + The default implementation uses the class name and object's string + representation. Override this method to provide semantic equality based + on configuration. + + Returns + ------- + str + A human-readable unique identifier string. + For example: "MyProvider { endpoint: 'https://api.example.com' }" + + Example + ------- + >>> class MyProvider(StorageOptionsProvider): + ... def __init__(self, endpoint): + ... self.endpoint = endpoint + ... + ... def fetch_storage_options(self): + ... return {"expires_at_millis": "1234567890000"} + ... + ... def provider_id(self): + ... return f"MyProvider {{ endpoint: {self.endpoint!r} }}" + """ + return f"{self.__class__.__name__} {{ repr: {str(self)!r} }}" diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py new file mode 100644 index 00000000000..2f8890f6aa2 --- /dev/null +++ b/python/python/lance/namespace.py @@ -0,0 +1,138 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +"""LanceNamespace storage options integration. + +This module provides storage options integration with LanceNamespace, +enabling automatic storage options refresh for namespace-managed tables. +""" + +from typing import Dict + +from .io import StorageOptionsProvider + + +class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): + """Storage options provider that fetches storage options from a LanceNamespace. + + This provider automatically fetches fresh storage options by calling the + namespace's describe_table() method, which returns both the table location + and time-limited storage options. This is currently only used for refreshing + AWS temporary access credentials. + + This is the recommended approach for LanceDB Cloud and other namespace-based + deployments, as it handles storage options refresh automatically. + + Parameters + ---------- + namespace : any + The namespace instance to fetch storage options from. This can be any + object with a describe_table(table_id, version) method that returns a + dict with 'location' and 'storage_options' keys. For example, use + lance_namespace.connect() from the lance_namespace PyPI package. + table_id : List[str] + The table identifier (e.g., ["workspace", "table_name"]) + + Example + ------- + This example shows how to use the storage options provider with a namespace. + + .. code-block:: python + + import lance + import lance_namespace + + # Connect to a namespace (e.g., LanceDB Cloud) + namespace = lance_namespace.connect("rest", { + "url": "https://api.lancedb.com", + "api_key": "your-api-key" + }) + + # Create storage options provider + provider = lance.LanceNamespaceStorageOptionsProvider( + namespace=namespace, + table_id=["workspace", "table_name"] + ) + + # Use with dataset - storage options auto-refresh! + dataset = lance.dataset( + "s3://bucket/table.lance", + storage_options_provider=provider + ) + """ + + def __init__(self, namespace, table_id: list): + """Initialize with namespace and table ID. + + Parameters + ---------- + namespace : any + The namespace instance with a describe_table() method + table_id : List[str] + The table identifier + """ + self._namespace = namespace + self._table_id = table_id + + def fetch_storage_options(self) -> Dict[str, str]: + """Fetch storage options from the namespace. + + This calls namespace.describe_table() to get the latest storage options + and their expiration time. + + Returns + ------- + Dict[str, str] + Flat dictionary of string key-value pairs containing storage options + and expires_at_millis + + Raises + ------ + RuntimeError + If the namespace doesn't return storage options or expiration time + """ + # Call namespace to describe the table and get storage options + table_info = self._namespace.describe_table( + table_id=self._table_id, version=None + ) + + # Extract storage options - should already be a flat dict of strings + storage_options = table_info.get("storage_options") + if storage_options is None: + raise RuntimeError( + "Namespace did not return storage_options. " + "Ensure the namespace supports storage options providing." + ) + + # Verify expires_at_millis is present + if "expires_at_millis" not in storage_options: + raise RuntimeError( + "Namespace storage_options missing 'expires_at_millis'. " + "Storage options refresh will not work properly." + ) + + # Return the storage_options directly - it's already a flat Map + return storage_options + + def provider_id(self) -> str: + """Return a human-readable unique identifier for this provider instance. + + This creates a semantic ID based on the namespace's ID and the table ID, + enabling proper equality comparison and caching. + + Returns + ------- + str + A human-readable unique identifier string combining namespace and table info + """ + # Try to call namespace_id() if available (lance-namespace >= 0.0.20) + if hasattr(self._namespace, "namespace_id"): + namespace_id = self._namespace.namespace_id() + else: + # Fallback for older namespace versions + namespace_id = str(self._namespace) + + return ( + f"LanceNamespaceStorageOptionsProvider {{ " + f"namespace: {namespace_id}, table_id: {self._table_id!r} }}" + ) diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py new file mode 100644 index 00000000000..8b90e6667f6 --- /dev/null +++ b/python/python/tests/test_namespace_integration.py @@ -0,0 +1,334 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +""" +Integration tests for Lance Namespace with S3 and credential refresh. + +This test simulates a namespace server that returns incrementing credentials +and verifies that the credential refresh mechanism works correctly. + +See DEVELOPMENT.md under heading "Integration Tests" for more information. +""" + +import copy +import time +import uuid +from threading import Lock +from typing import Dict, Optional + +import lance +import pyarrow as pa +import pytest + +# These are all keys that are accepted by storage_options +CONFIG = { + "allow_http": "true", + "aws_access_key_id": "ACCESS_KEY", + "aws_secret_access_key": "SECRET_KEY", + "aws_endpoint": "http://localhost:4566", + "aws_region": "us-east-1", +} + + +def get_boto3_client(*args, **kwargs): + import boto3 + + return boto3.client( + *args, + region_name=CONFIG["aws_region"], + aws_access_key_id=CONFIG["aws_access_key_id"], + aws_secret_access_key=CONFIG["aws_secret_access_key"], + **kwargs, + ) + + +@pytest.fixture(scope="module") +def s3_bucket(): + s3 = get_boto3_client("s3", endpoint_url=CONFIG["aws_endpoint"]) + bucket_name = "lance-namespace-integtest" + # if bucket exists, delete it + try: + delete_bucket(s3, bucket_name) + except s3.exceptions.NoSuchBucket: + pass + s3.create_bucket(Bucket=bucket_name) + yield bucket_name + + delete_bucket(s3, bucket_name) + + +def delete_bucket(s3, bucket_name): + # Delete all objects first + try: + for obj in s3.list_objects(Bucket=bucket_name).get("Contents", []): + s3.delete_object(Bucket=bucket_name, Key=obj["Key"]) + s3.delete_bucket(Bucket=bucket_name) + except Exception: + pass + + +class MockLanceNamespace: + """ + Mock namespace implementation that tracks credential refresh calls. + + Similar to the Rust MockStorageOptionsProvider, this implementation: + - Returns incrementing credentials on each describe_table call + - Tracks the number of times describe_table has been called + - Returns credentials with short expiration times for testing refresh + """ + + def __init__( + self, + bucket_name: str, + storage_options: Dict[str, str], + credential_expires_in_seconds: int = 60, + ): + """ + Initialize the mock namespace. + + Parameters + ---------- + bucket_name : str + The S3 bucket name where tables are stored + storage_options : Dict[str, str] + Base storage options (aws_endpoint, aws_region, etc.) + credential_expires_in_seconds : int + How long credentials should be valid (for testing refresh) + """ + self.bucket_name = bucket_name + self.base_storage_options = storage_options + self.credential_expires_in_seconds = credential_expires_in_seconds + self.call_count = 0 + self.lock = Lock() + self.tables: Dict[str, str] = {} # table_id -> location mapping + + def register_table(self, table_id: list, location: str): + """Register a table in the mock namespace.""" + table_key = "/".join(table_id) + self.tables[table_key] = location + + def get_call_count(self) -> int: + """Get the number of times describe_table has been called.""" + with self.lock: + return self.call_count + + def namespace_id(self) -> str: + """Return a unique identifier for this namespace instance.""" + return "MockLanceNamespace { }" + + def describe_table( + self, table_id: list, version: Optional[int] = None + ) -> Dict[str, any]: + """ + Describe a table and return storage options with incrementing credentials. + + This simulates a namespace server that returns temporary AWS credentials + that expire after a short time. Each call increments the credential counter. + + Parameters + ---------- + table_id : list + The table identifier (e.g., ["my_table"]) + version : Optional[int] + The table version (not used in this mock) + + Returns + ------- + Dict[str, any] + A dictionary with: + - location: The S3 URI of the table + - storage_options: Dict with AWS credentials and expires_at_millis + """ + with self.lock: + self.call_count += 1 + count = self.call_count + + table_key = "/".join(table_id) + if table_key not in self.tables: + raise ValueError(f"Table not found: {table_key}") + + location = self.tables[table_key] + + # Create storage options with incrementing credentials + storage_options = copy.deepcopy(self.base_storage_options) + + # Add incrementing credentials (similar to Rust MockStorageOptionsProvider) + storage_options["aws_access_key_id"] = f"AKID_{count}" + storage_options["aws_secret_access_key"] = f"SECRET_{count}" + storage_options["aws_session_token"] = f"TOKEN_{count}" + + # Add expiration timestamp (current time + expires_in_seconds) + expires_at_millis = int( + (time.time() + self.credential_expires_in_seconds) * 1000 + ) + storage_options["expires_at_millis"] = str(expires_at_millis) + + return { + "location": location, + "storage_options": storage_options, + } + + +@pytest.mark.integration +def test_namespace_open_dataset(s3_bucket: str): + """ + Test opening a dataset through a namespace with credential tracking. + + This test verifies that: + 1. We can create a dataset and register it with a namespace + 2. We can open the dataset through the namespace + 3. The namespace's describe_table method is called to fetch credentials + """ + storage_options = copy.deepcopy(CONFIG) + + # Create a test dataset directly on S3 + table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}]) + table_name = uuid.uuid4().hex + table_uri = f"s3://{s3_bucket}/{table_name}.lance" + + # Write dataset directly to S3 + ds = lance.write_dataset(table1, table_uri, storage_options=storage_options) + assert len(ds.versions()) == 1 + assert ds.count_rows() == 2 + + # Create mock namespace and register the table + namespace = MockLanceNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=60, + ) + namespace.register_table([table_name], table_uri) + + # Open dataset through namespace (without refresh) + # This should call describe_table once + assert namespace.get_call_count() == 0 + + ds_from_namespace = lance.dataset( + namespace=namespace, + table_id=[table_name], + refresh_storage_options=False, + ) + + # Verify describe_table was called once during open + assert namespace.get_call_count() == 1 + + # Verify we can read the data + assert ds_from_namespace.count_rows() == 2 + result = ds_from_namespace.to_table() + assert result == table1 + + +@pytest.mark.integration +def test_namespace_with_refresh(s3_bucket: str): + storage_options = copy.deepcopy(CONFIG) + + # Create a test dataset + table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}]) + table_name = uuid.uuid4().hex + table_uri = f"s3://{s3_bucket}/{table_name}.lance" + + ds = lance.write_dataset(table1, table_uri, storage_options=storage_options) + assert ds.count_rows() == 2 + + # Create mock namespace with very short expiration (2 seconds) + # to simulate credentials that need frequent refresh + namespace = MockLanceNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=2, # Short expiration for testing + ) + namespace.register_table([table_name], table_uri) + + assert namespace.get_call_count() == 0 + + # Open dataset with refresh enabled and short refresh offset + # Set refresh offset to 1 second (shorter than 2s credential lifetime) + ds_from_namespace = lance.dataset( + namespace=namespace, + table_id=[table_name], + refresh_storage_options=True, # Enable automatic refresh + s3_credentials_refresh_offset_seconds=1, # Refresh 1s before expiration + ) + + initial_call_count = namespace.get_call_count() + assert initial_call_count == 1 + + # Verify we can read the data + assert ds_from_namespace.count_rows() == 2 + result = ds_from_namespace.to_table() + assert result == table1 + + # Record call count after initial reads + call_count_after_initial_reads = namespace.get_call_count() + + # Wait for credentials to expire + time.sleep(3) + + # Perform another read operation after expiration + # This should trigger a credential refresh since credentials have expired + assert ds_from_namespace.count_rows() == 2 + result2 = ds_from_namespace.to_table() + assert result2 == table1 + + final_call_count = namespace.get_call_count() + assert final_call_count == call_count_after_initial_reads + 1 + + +@pytest.mark.integration +def test_namespace_append_through_namespace(s3_bucket: str): + """ + Test appending to a dataset opened through a namespace. + + This verifies that write operations work correctly with namespace-managed + credentials. + """ + storage_options = copy.deepcopy(CONFIG) + + # Create initial dataset + table1 = pa.Table.from_pylist([{"a": 1, "b": 2}]) + table_name = uuid.uuid4().hex + table_uri = f"s3://{s3_bucket}/{table_name}.lance" + + ds = lance.write_dataset(table1, table_uri, storage_options=storage_options) + assert ds.count_rows() == 1 + assert len(ds.versions()) == 1 + + # Create namespace and open dataset through it + namespace = MockLanceNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=60, + ) + namespace.register_table([table_name], table_uri) + + # Open through namespace + ds_from_namespace = lance.dataset( + namespace=namespace, + table_id=[table_name], + refresh_storage_options=False, + ) + + assert ds_from_namespace.count_rows() == 1 + initial_call_count = namespace.get_call_count() + assert initial_call_count == 1 + + # Append more data using the URI directly (not through namespace) + table2 = pa.Table.from_pylist([{"a": 10, "b": 20}]) + ds = lance.write_dataset( + table2, table_uri, mode="append", storage_options=storage_options + ) + assert ds.count_rows() == 2 + assert len(ds.versions()) == 2 + + # Re-open through namespace to see updated data + ds_from_namespace = lance.dataset( + namespace=namespace, + table_id=[table_name], + refresh_storage_options=False, + ) + + assert ds_from_namespace.count_rows() == 2 + assert len(ds_from_namespace.versions()) == 2 + + # Describe_table should have been called again + assert namespace.get_call_count() == initial_call_count + 1 diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 72a3b58d459..a0bd8eb0767 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -455,7 +455,7 @@ pub struct Dataset { impl Dataset { #[allow(clippy::too_many_arguments)] #[new] - #[pyo3(signature=(uri, version=None, block_size=None, index_cache_size=None, metadata_cache_size=None, commit_handler=None, storage_options=None, manifest=None, metadata_cache_size_bytes=None, index_cache_size_bytes=None, read_params=None, session=None))] + #[pyo3(signature=(uri, version=None, block_size=None, index_cache_size=None, metadata_cache_size=None, commit_handler=None, storage_options=None, manifest=None, metadata_cache_size_bytes=None, index_cache_size_bytes=None, read_params=None, session=None, storage_options_provider=None, s3_credentials_refresh_offset_seconds=None))] fn new( py: Python, uri: String, @@ -470,6 +470,8 @@ impl Dataset { index_cache_size_bytes: Option, read_params: Option<&Bound>, session: Option, + storage_options_provider: Option, + s3_credentials_refresh_offset_seconds: Option, ) -> PyResult { let mut params = ReadParams::default(); if let Some(metadata_cache_size_bytes) = metadata_cache_size_bytes { @@ -486,12 +488,16 @@ impl Dataset { let index_cache_size_bytes = index_cache_size * 20 * 1024 * 1024; params.index_cache_size_bytes(index_cache_size_bytes); } + // Set up store options (block size and S3 credentials refresh offset) + let mut store_params = params.store_options.take().unwrap_or_default(); if let Some(block_size) = block_size { - params.store_options = Some(ObjectStoreParams { - block_size: Some(block_size), - ..Default::default() - }); - }; + store_params.block_size = Some(block_size); + } + if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds { + store_params.s3_credentials_refresh_offset = + std::time::Duration::from_secs(offset_seconds); + } + params.store_options = Some(store_params); if let Some(commit_handler) = commit_handler { let py_commit_lock = PyCommitLock::new(commit_handler); params.set_commit_lock(Arc::new(py_commit_lock)); @@ -523,6 +529,7 @@ impl Dataset { } let mut builder = DatasetBuilder::from_uri(&uri).with_read_params(params); + if let Some(ver) = version { if let Ok(i) = ver.downcast_bound::(py) { let v: u64 = i.extract()?; @@ -556,6 +563,13 @@ impl Dataset { builder = builder.with_session(session.inner.clone()); } + // Add storage options provider if provided + if let Some(provider_obj) = storage_options_provider { + use crate::storage_options::py_object_to_storage_options_provider; + let provider = py_object_to_storage_options_provider(provider_obj)?; + builder = builder.with_storage_options_provider(provider); + } + let dataset = rt().block_on(Some(py), builder.load())?; match dataset { diff --git a/python/src/lib.rs b/python/src/lib.rs index cd18a52ee48..8d8196bd75d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -76,6 +76,7 @@ pub(crate) mod reader; pub(crate) mod scanner; pub(crate) mod schema; pub(crate) mod session; +pub(crate) mod storage_options; pub(crate) mod tracing; pub(crate) mod transaction; pub(crate) mod utils; diff --git a/python/src/storage_options.rs b/python/src/storage_options.rs new file mode 100644 index 00000000000..3defd74f267 --- /dev/null +++ b/python/src/storage_options.rs @@ -0,0 +1,169 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use lance_io::object_store::StorageOptionsProvider; +use pyo3::prelude::*; +use pyo3::types::PyDict; + +use crate::rt; + +/// Internal wrapper for Python storage options providers +/// +/// This is not exposed to Python. Users pass their Python objects directly +/// to dataset functions, and we wrap them internally with this struct. +pub struct PyStorageOptionsProvider { + /// The Python object implementing get_storage_options() + inner: PyObject, +} + +impl std::fmt::Debug for PyStorageOptionsProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Note: We can't call provider_id() here because this is PyStorageOptionsProvider, + // not PyStorageOptionsProviderWrapper. Just use a simple format. + write!(f, "PyStorageOptionsProvider") + } +} + +impl Clone for PyStorageOptionsProvider { + fn clone(&self) -> Self { + Python::with_gil(|py| Self { + inner: self.inner.clone_ref(py), + }) + } +} + +impl PyStorageOptionsProvider { + pub fn new(obj: PyObject) -> PyResult { + Python::with_gil(|py| { + // Verify the object has a fetch_storage_options method + if !obj.bind(py).hasattr("fetch_storage_options")? { + return Err(pyo3::exceptions::PyTypeError::new_err( + "StorageOptionsProvider must implement fetch_storage_options() method", + )); + } + Ok(Self { inner: obj }) + }) + } +} + +/// Rust wrapper that implements StorageOptionsProvider trait for Python objects +pub struct PyStorageOptionsProviderWrapper { + py_provider: PyStorageOptionsProvider, +} + +impl std::fmt::Debug for PyStorageOptionsProviderWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.provider_id()) + } +} + +impl std::fmt::Display for PyStorageOptionsProviderWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.provider_id()) + } +} + +impl PyStorageOptionsProviderWrapper { + pub fn new(py_provider: PyStorageOptionsProvider) -> Self { + Self { py_provider } + } +} + +#[async_trait] +impl StorageOptionsProvider for PyStorageOptionsProviderWrapper { + async fn fetch_storage_options(&self) -> lance_core::Result>> { + // Call Python method from async context + let py_provider = self.py_provider.clone(); + + rt().runtime + .spawn_blocking(move || { + Python::with_gil(|py| { + // Call the Python fetch_storage_options method + let result = py_provider + .inner + .bind(py) + .call_method0("fetch_storage_options") + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to call fetch_storage_options: {}", + e + ))), + location: snafu::location!(), + })?; + + // If result is None, return None + if result.is_none() { + return Ok(None); + } + + // Extract the result dict - should be a flat Map + let result_dict = result.downcast::().map_err(|_| { + lance_core::Error::InvalidInput { + source: + "fetch_storage_options() must return None or a dict of string key-value pairs" + .into(), + location: snafu::location!(), + } + })?; + + // Convert all entries to HashMap + let mut storage_options = HashMap::new(); + for (key, value) in result_dict.iter() { + let key_str: String = + key.extract().map_err(|e| lance_core::Error::InvalidInput { + source: format!("storage option keys must be strings: {}", e).into(), + location: snafu::location!(), + })?; + let value_str: String = + value + .extract() + .map_err(|e| lance_core::Error::InvalidInput { + source: format!("storage option values must be strings: {}", e) + .into(), + location: snafu::location!(), + })?; + storage_options.insert(key_str, value_str); + } + + Ok(Some(storage_options)) + }) + }) + .await + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to call Python fetch_storage_options: {}", + e + ))), + location: snafu::location!(), + })? + } + + fn provider_id(&self) -> String { + Python::with_gil(|py| { + // Call provider_id() method on the Python object + // This should always succeed since StorageOptionsProvider.provider_id() has a default implementation + let obj = self.py_provider.inner.bind(py); + obj.call_method0("provider_id") + .and_then(|result| result.extract::()) + .unwrap_or_else(|e| { + panic!( + "Failed to call provider_id() on Python StorageOptionsProvider: {}", + e + ) + }) + }) + } +} + +/// Convert a Python object to an Arc +/// This is the main entry point for converting Python storage options providers to Rust +pub fn py_object_to_storage_options_provider( + py_obj: PyObject, +) -> PyResult> { + let py_provider = PyStorageOptionsProvider::new(py_obj)?; + Ok(Arc::new(PyStorageOptionsProviderWrapper::new(py_provider))) +} diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 8cd1562fe3f..d9fcbbc9290 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -34,6 +34,7 @@ use url::Url; use super::local::LocalObjectReader; mod list_retry; pub mod providers; +pub mod storage_options; mod tracing; use crate::object_reader::SmallReader; use crate::object_writer::WriteResult; @@ -61,6 +62,9 @@ pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock = std::sync::LazyLock: pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3; pub use providers::{ObjectStoreProvider, ObjectStoreRegistry}; +pub use storage_options::{ + LanceNamespaceStorageOptionsProvider, StorageOptionsProvider, EXPIRES_AT_MILLIS_KEY, +}; #[async_trait] pub trait ObjectStoreExt { @@ -189,6 +193,8 @@ pub struct ObjectStoreParams { pub aws_credentials: Option, pub object_store_wrapper: Option>, pub storage_options: Option>, + /// Dynamic storage options provider for automatic credential refresh + pub storage_options_provider: Option>, /// Use constant size upload parts for multipart uploads. Only necessary /// for Cloudflare R2, which doesn't support variable size parts. When this /// is false, max upload size is 2.5TB. When this is true, the max size is @@ -208,6 +214,7 @@ impl Default for ObjectStoreParams { aws_credentials: None, object_store_wrapper: None, storage_options: None, + storage_options_provider: None, use_constant_size_upload_parts: false, list_is_lexically_ordered: None, } @@ -218,7 +225,7 @@ impl Default for ObjectStoreParams { impl std::hash::Hash for ObjectStoreParams { #[allow(deprecated)] fn hash(&self, state: &mut H) { - // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper + // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper, and storage options provider self.block_size.hash(state); if let Some((store, url)) = &self.object_store { Arc::as_ptr(store).hash(state); @@ -238,6 +245,9 @@ impl std::hash::Hash for ObjectStoreParams { value.hash(state); } } + if let Some(provider) = &self.storage_options_provider { + provider.provider_id().hash(state); + } self.use_constant_size_upload_parts.hash(state); self.list_is_lexically_ordered.hash(state); } @@ -253,7 +263,8 @@ impl PartialEq for ObjectStoreParams { return false; } - // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper + // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper + // For storage_options_provider, we use provider_id() for semantic equality self.block_size == other.block_size && self .object_store @@ -267,6 +278,14 @@ impl PartialEq for ObjectStoreParams { && self.object_store_wrapper.as_ref().map(Arc::as_ptr) == other.object_store_wrapper.as_ref().map(Arc::as_ptr) && self.storage_options == other.storage_options + && self + .storage_options_provider + .as_ref() + .map(|p| p.provider_id()) + == other + .storage_options_provider + .as_ref() + .map(|p| p.provider_id()) && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts && self.list_is_lexically_ordered == other.list_is_lexically_ordered } @@ -700,6 +719,13 @@ impl StorageOptions { pub fn get(&self, key: &str) -> Option<&String> { self.0.get(key) } + + /// Get the expiration time in milliseconds since epoch, if present + pub fn expires_at_millis(&self) -> Option { + self.0 + .get(EXPIRES_AT_MILLIS_KEY) + .and_then(|s| s.parse::().ok()) + } } impl From> for StorageOptions { diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 87dc0dbe982..f39d18f3128 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -1,9 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, fmt, str::FromStr, sync::Arc, time::Duration}; #[cfg(test)] use mock_instant::thread_local::{SystemTime, UNIX_EPOCH}; @@ -30,8 +28,8 @@ use tokio::sync::RwLock; use url::Url; use crate::object_store::{ - ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, - DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsProvider, + DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, }; use lance_core::error::{Error, Result}; @@ -61,6 +59,8 @@ impl AwsStoreProvider { params.aws_credentials.clone(), Some(&s3_storage_options), region, + params.storage_options_provider.clone(), + storage_options.expires_at_millis(), ) .await?; @@ -225,10 +225,11 @@ async fn resolve_s3_region( /// Build AWS credentials /// /// This resolves credentials from the following sources in order: -/// 1. An explicit `credentials` provider -/// 2. Explicit credentials in storage_options (as in `aws_access_key_id`, +/// 1. An explicit `storage_options_provider` +/// 2. An explicit `credentials` provider +/// 3. Explicit credentials in storage_options (as in `aws_access_key_id`, /// `aws_secret_access_key`, `aws_session_token`) -/// 3. The default credential provider chain from AWS SDK. +/// 4. The default credential provider chain from AWS SDK. /// /// `credentials_refresh_offset` is the amount of time before expiry to refresh credentials. pub async fn build_aws_credential( @@ -236,6 +237,8 @@ pub async fn build_aws_credential( credentials: Option, storage_options: Option<&HashMap>, region: Option, + storage_options_provider: Option>, + expires_at_millis: Option, ) -> Result<(AwsCredentialProvider, String)> { // TODO: make this return no credential provider not using AWS use aws_config::meta::region::RegionProviderChain; @@ -252,9 +255,20 @@ pub async fn build_aws_credential( .unwrap_or(DEFAULT_REGION.to_string()) }; - if let Some(creds) = credentials { + let storage_options_credentials = storage_options.and_then(extract_static_s3_credentials); + if let Some(storage_options_provider) = storage_options_provider { + let creds = build_aws_credential_with_storage_options_provider( + storage_options_provider, + credentials_refresh_offset, + credentials, + storage_options_credentials, + expires_at_millis, + ) + .await?; + Ok((creds, region)) + } else if let Some(creds) = credentials { Ok((creds, region)) - } else if let Some(creds) = storage_options.and_then(extract_static_s3_credentials) { + } else if let Some(creds) = storage_options_credentials { Ok((Arc::new(creds), region)) } else { let credentials_provider = DefaultCredentialsChain::builder().build().await; @@ -269,6 +283,58 @@ pub async fn build_aws_credential( } } +async fn build_aws_credential_with_storage_options_provider( + storage_options_provider: Arc, + credentials_refresh_offset: Duration, + credentials: Option, + storage_options_credentials: Option>, + expires_at_millis: Option, +) -> Result { + match (expires_at_millis, credentials, storage_options_credentials) { + // Case 1: provider + credentials + expiration time + (Some(expires_at), Some(cred), _) => { + Ok(Arc::new( + DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + storage_options_provider, + credentials_refresh_offset, + cred.get_credential().await?, + expires_at, + ), + )) + } + // Case 2: provider + storage_options (with valid credentials) + expiration time + (Some(expires_at), None, Some(cred)) => { + Ok(Arc::new( + DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + storage_options_provider, + credentials_refresh_offset, + cred.get_credential().await?, + expires_at, + ), + )) + } + // Case 3: provider + storage_options without expiration - FAIL + (None, None, Some(_)) => Err(Error::IO { + source: Box::new(std::io::Error::other( + "expires_at_millis is required when using storage_options_provider with storage_options", + )), + location: location!(), + }), + // Case 4: provider + credentials without expiration - FAIL + (None, Some(_), _) => Err(Error::IO { + source: Box::new(std::io::Error::other( + "expires_at_millis is required when using storage_options_provider with credentials", + )), + location: location!(), + }), + // Case 5: provider without credentials/storage_options, or with expiration but no creds/opts + (_, None, None) => Ok(Arc::new(DynamicStorageOptionsCredentialProvider::new( + storage_options_provider, + credentials_refresh_offset, + ))), + } +} + fn extract_static_s3_credentials( options: &HashMap, ) -> Option> { @@ -420,13 +486,180 @@ impl ObjectStoreParams { } } -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicBool, Ordering}; +/// AWS Credential Provider that uses StorageOptionsProvider +/// +/// This adapter converts our generic StorageOptionsProvider trait into +/// AWS-specific credentials that can be used with S3. It caches credentials +/// and automatically refreshes them before they expire. +pub struct DynamicStorageOptionsCredentialProvider { + provider: Arc, + cache: Arc>>, + refresh_offset: Duration, +} - use object_store::path::Path; +impl fmt::Debug for DynamicStorageOptionsCredentialProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DynamicStorageOptionsCredentialProvider") + .field("provider", &self.provider) + .field("refresh_offset", &self.refresh_offset) + .finish() + } +} + +#[derive(Debug, Clone)] +struct CachedCredential { + credential: Arc, + expires_at_millis: Option, +} + +impl DynamicStorageOptionsCredentialProvider { + /// Create a new credential provider without initial credentials + /// + /// # Arguments + /// * `provider` - The storage options provider + /// * `refresh_offset` - Duration before expiry to refresh credentials + pub fn new(provider: Arc, refresh_offset: Duration) -> Self { + Self { + provider, + cache: Arc::new(RwLock::new(None)), + refresh_offset, + } + } + + /// Create a new credential provider with initial credentials from an explicit credential + /// + /// # Arguments + /// * `provider` - The storage options provider + /// * `refresh_offset` - Duration before expiry to refresh credentials + /// * `credential` - Initial credential to cache + /// * `expires_at_millis` - Expiration time in milliseconds since epoch (required for refresh) + pub fn new_with_initial_credential( + provider: Arc, + refresh_offset: Duration, + credential: Arc, + expires_at_millis: u64, + ) -> Self { + Self { + provider, + cache: Arc::new(RwLock::new(Some(CachedCredential { + credential, + expires_at_millis: Some(expires_at_millis), + }))), + refresh_offset, + } + } + + fn needs_refresh(&self, cached: &Option) -> bool { + match cached { + None => true, + Some(cached_cred) => { + if let Some(expires_at_millis) = cached_cred.expires_at_millis { + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_millis() as u64; + + // Refresh if we're within the refresh offset of expiration + let refresh_offset_millis = self.refresh_offset.as_millis() as u64; + now_ms + refresh_offset_millis >= expires_at_millis + } else { + // No expiration means credentials never expire + false + } + } + } + } + + async fn do_get_credential(&self) -> ObjectStoreResult>> { + // Check if we have valid cached credentials with read lock + { + let cached = self.cache.read().await; + if !self.needs_refresh(&cached) { + if let Some(cached_cred) = &*cached { + return Ok(Some(cached_cred.credential.clone())); + } + } + } + + // Try to acquire write lock - if it fails, return None and let caller retry + let Ok(mut cache) = self.cache.try_write() else { + return Ok(None); + }; + + // Double-check if credentials are still stale after acquiring write lock + // (another thread might have refreshed them) + if !self.needs_refresh(&cache) { + if let Some(cached_cred) = &*cache { + return Ok(Some(cached_cred.credential.clone())); + } + } + + let storage_options_map = self + .provider + .fetch_storage_options() + .await + .map_err(|e| object_store::Error::Generic { + store: "DynamicStorageOptionsCredentialProvider", + source: Box::new(e), + })? + .ok_or_else(|| object_store::Error::Generic { + store: "DynamicStorageOptionsCredentialProvider", + source: "No storage options available".into(), + })?; + + let storage_options = StorageOptions(storage_options_map); + let expires_at_millis = storage_options.expires_at_millis(); + let s3_options = storage_options.as_s3_options(); + let static_creds = extract_static_s3_credentials(&s3_options).ok_or_else(|| { + object_store::Error::Generic { + store: "DynamicStorageOptionsCredentialProvider", + source: "Missing required credentials in storage options".into(), + } + })?; + + let credential = + static_creds + .get_credential() + .await + .map_err(|e| object_store::Error::Generic { + store: "DynamicStorageOptionsCredentialProvider", + source: Box::new(e), + })?; + + *cache = Some(CachedCredential { + credential: credential.clone(), + expires_at_millis, + }); + + Ok(Some(credential)) + } +} +#[async_trait::async_trait] +impl CredentialProvider for DynamicStorageOptionsCredentialProvider { + type Credential = ObjectStoreAwsCredential; + + async fn get_credential(&self) -> ObjectStoreResult> { + // Retry loop - if do_get_credential returns None (lock busy), retry from the beginning + loop { + match self.do_get_credential().await? { + Some(cred) => return Ok(cred), + None => { + // Lock was busy, wait 10ms before retrying + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + } + } + } + } +} + +#[cfg(test)] +mod tests { use crate::object_store::ObjectStoreRegistry; + use mock_instant::thread_local::MockClock; + use object_store::path::Path; + use std::sync::atomic::{AtomicBool, Ordering}; use super::*; @@ -555,4 +788,392 @@ mod tests { .unwrap(); assert_eq!(store.scheme, "s3"); } + + #[derive(Debug)] + struct MockStorageOptionsProvider { + call_count: Arc>, + expires_in_millis: Option, + } + + impl MockStorageOptionsProvider { + fn new(expires_in_millis: Option) -> Self { + Self { + call_count: Arc::new(RwLock::new(0)), + expires_in_millis, + } + } + + async fn get_call_count(&self) -> usize { + *self.call_count.read().await + } + } + + #[async_trait::async_trait] + impl StorageOptionsProvider for MockStorageOptionsProvider { + async fn fetch_storage_options(&self) -> Result>> { + let count = { + let mut c = self.call_count.write().await; + *c += 1; + *c + }; + + let mut options = HashMap::from([ + ("aws_access_key_id".to_string(), format!("AKID_{}", count)), + ( + "aws_secret_access_key".to_string(), + format!("SECRET_{}", count), + ), + ("aws_session_token".to_string(), format!("TOKEN_{}", count)), + ]); + + if let Some(expires_in) = self.expires_in_millis { + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let expires_at = now_ms + expires_in; + options.insert("expires_at_millis".to_string(), expires_at.to_string()); + } + + Ok(Some(options)) + } + + fn provider_id(&self) -> String { + let ptr = Arc::as_ptr(&self.call_count) as usize; + format!("MockStorageOptionsProvider {{ id: {} }}", ptr) + } + } + + #[tokio::test] + async fn test_dynamic_credential_provider_with_initial_cache() { + MockClock::set_system_time(Duration::from_secs(100_000)); + + let now_ms = MockClock::system_time().as_millis() as u64; + + // Create a mock provider that returns credentials expiring in 10 minutes + let mock = Arc::new(MockStorageOptionsProvider::new(Some( + 600_000, // Expires in 10 minutes + ))); + + // Create credential provider with initial cached credentials that expire in 10 minutes + let expires_at = now_ms + 600_000; // 10 minutes from now + let initial_cred = Arc::new(ObjectStoreAwsCredential { + key_id: "AKID_CACHED".to_string(), + secret_key: "SECRET_CACHED".to_string(), + token: Some("TOKEN_CACHED".to_string()), + }); + + let provider = DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + mock.clone(), + Duration::from_secs(300), // 5 minute refresh offset + initial_cred, + expires_at, + ); + + // First call should use cached credentials (not expired yet) + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_CACHED"); + assert_eq!(cred.secret_key, "SECRET_CACHED"); + assert_eq!(cred.token, Some("TOKEN_CACHED".to_string())); + + // Should not have called the provider yet + assert_eq!(mock.get_call_count().await, 0); + } + + #[tokio::test] + async fn test_dynamic_credential_provider_with_expired_cache() { + MockClock::set_system_time(Duration::from_secs(100_000)); + + let now_ms = MockClock::system_time().as_millis() as u64; + + // Create a mock provider that returns credentials expiring in 10 minutes + let mock = Arc::new(MockStorageOptionsProvider::new(Some( + 600_000, // Expires in 10 minutes + ))); + + // Create credential provider with initial cached credentials that expired 1 second ago + let expired_time = now_ms - 1_000; // 1 second ago + let initial_cred = Arc::new(ObjectStoreAwsCredential { + key_id: "AKID_EXPIRED".to_string(), + secret_key: "SECRET_EXPIRED".to_string(), + token: None, + }); + + let provider = DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + mock.clone(), + Duration::from_secs(300), // 5 minute refresh offset + initial_cred, + expired_time, + ); + + // First call should fetch new credentials because cached ones are expired + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(cred.secret_key, "SECRET_1"); + assert_eq!(cred.token, Some("TOKEN_1".to_string())); + + // Should have called the provider once + assert_eq!(mock.get_call_count().await, 1); + } + + #[tokio::test] + async fn test_dynamic_credential_provider_refresh_lead_time() { + MockClock::set_system_time(Duration::from_secs(100_000)); + + // Create a mock provider that returns credentials expiring in 4 minutes + let mock = Arc::new(MockStorageOptionsProvider::new(Some( + 240_000, // Expires in 4 minutes + ))); + + // Create credential provider with 5 minute refresh offset + // This means credentials should be refreshed when they have less than 5 minutes left + let provider = DynamicStorageOptionsCredentialProvider::new( + mock.clone(), + Duration::from_secs(300), // 5 minute refresh offset + ); + + // First call should fetch credentials from provider (no initial cache) + // Credentials expire in 4 minutes, which is less than our 5 minute refresh offset, + // so they should be considered "needs refresh" immediately + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(mock.get_call_count().await, 1); + + // Second call should trigger refresh because credentials expire in 4 minutes + // but our refresh lead time is 5 minutes (now + 5min > expires_at) + // The mock will return new credentials (AKID_2) with the same expiration + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_2"); + assert_eq!(mock.get_call_count().await, 2); + } + + #[tokio::test] + async fn test_dynamic_credential_provider_no_initial_cache() { + MockClock::set_system_time(Duration::from_secs(100_000)); + + // Create a mock provider that returns credentials expiring in 10 minutes + let mock = Arc::new(MockStorageOptionsProvider::new(Some( + 600_000, // Expires in 10 minutes + ))); + + // Create credential provider without initial cache + let provider = DynamicStorageOptionsCredentialProvider::new( + mock.clone(), + Duration::from_secs(300), // 5 minute refresh offset + ); + + // First call should fetch from provider (call count = 1) + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(cred.secret_key, "SECRET_1"); + assert_eq!(cred.token, Some("TOKEN_1".to_string())); + assert_eq!(mock.get_call_count().await, 1); + + // Second call should use cached credentials (not expired yet) + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(mock.get_call_count().await, 1); // Still 1, didn't fetch again + + // Advance time to 6 minutes - should trigger refresh (within 5 min refresh offset) + MockClock::set_system_time(Duration::from_secs(100_000 + 360)); + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_2"); + assert_eq!(cred.secret_key, "SECRET_2"); + assert_eq!(cred.token, Some("TOKEN_2".to_string())); + assert_eq!(mock.get_call_count().await, 2); + + // Advance time to 11 minutes total - should trigger another refresh + MockClock::set_system_time(Duration::from_secs(100_000 + 660)); + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_3"); + assert_eq!(cred.secret_key, "SECRET_3"); + assert_eq!(mock.get_call_count().await, 3); + } + + #[tokio::test] + async fn test_dynamic_credential_provider_with_initial_credential() { + MockClock::set_system_time(Duration::from_secs(100_000)); + + let now_ms = MockClock::system_time().as_millis() as u64; + + // Create a mock provider that returns credentials expiring in 10 minutes + let mock = Arc::new(MockStorageOptionsProvider::new(Some( + 600_000, // Expires in 10 minutes + ))); + + // Create an initial credential with expiration in 10 minutes + let expires_at = now_ms + 600_000; // 10 minutes from now + let initial_cred = Arc::new(ObjectStoreAwsCredential { + key_id: "AKID_INITIAL".to_string(), + secret_key: "SECRET_INITIAL".to_string(), + token: Some("TOKEN_INITIAL".to_string()), + }); + + // Create credential provider with initial credential and expiration + let provider = DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + mock.clone(), + Duration::from_secs(300), // 5 minute refresh offset + initial_cred, + expires_at, + ); + + // First call should use the initial credential (not expired yet) + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_INITIAL"); + assert_eq!(cred.secret_key, "SECRET_INITIAL"); + assert_eq!(cred.token, Some("TOKEN_INITIAL".to_string())); + + // Should not have called the provider yet + assert_eq!(mock.get_call_count().await, 0); + + // Advance time to 6 minutes - this should trigger a refresh + // (5 minute refresh offset means we refresh 5 minutes before expiration) + MockClock::set_system_time(Duration::from_secs(100_000 + 360)); + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(cred.secret_key, "SECRET_1"); + assert_eq!(cred.token, Some("TOKEN_1".to_string())); + + // Should have called the provider once + assert_eq!(mock.get_call_count().await, 1); + + // Advance time to 11 minutes total - this should trigger another refresh + MockClock::set_system_time(Duration::from_secs(100_000 + 660)); + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_2"); + assert_eq!(cred.secret_key, "SECRET_2"); + assert_eq!(cred.token, Some("TOKEN_2".to_string())); + + // Should have called the provider twice + assert_eq!(mock.get_call_count().await, 2); + + // Advance time to 16 minutes total - this should trigger yet another refresh + MockClock::set_system_time(Duration::from_secs(100_000 + 960)); + let cred = provider.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_3"); + assert_eq!(cred.secret_key, "SECRET_3"); + assert_eq!(cred.token, Some("TOKEN_3".to_string())); + + // Should have called the provider three times + assert_eq!(mock.get_call_count().await, 3); + } + + #[tokio::test] + async fn test_dynamic_credential_provider_concurrent_access() { + // Create a mock provider with far future expiration + let mock = Arc::new(MockStorageOptionsProvider::new(Some(9999999999999))); + + let provider = Arc::new(DynamicStorageOptionsCredentialProvider::new( + mock.clone(), + Duration::from_secs(300), + )); + + // Spawn 10 concurrent tasks that all try to get credentials at the same time + let mut handles = vec![]; + for i in 0..10 { + let provider = provider.clone(); + let handle = tokio::spawn(async move { + let cred = provider.get_credential().await.unwrap(); + // Verify we got the correct credentials (should all be AKID_1 from first fetch) + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(cred.secret_key, "SECRET_1"); + assert_eq!(cred.token, Some("TOKEN_1".to_string())); + i // Return task number for verification + }); + handles.push(handle); + } + + // Wait for all tasks to complete + let results: Vec<_> = futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.unwrap()) + .collect(); + + // Verify all 10 tasks completed successfully + assert_eq!(results.len(), 10); + for i in 0..10 { + assert!(results.contains(&i)); + } + + // The provider should have been called exactly once (first request triggers fetch, + // subsequent requests use cache) + let call_count = mock.get_call_count().await; + assert_eq!( + call_count, 1, + "Provider should be called exactly once despite concurrent access" + ); + } + + #[tokio::test] + async fn test_dynamic_credential_provider_concurrent_refresh() { + MockClock::set_system_time(Duration::from_secs(100_000)); + + let now_ms = MockClock::system_time().as_millis() as u64; + + // Create initial credentials that expired in the past (1000 seconds ago) + let expires_at = now_ms - 1_000_000; + + let initial_cred = Arc::new(ObjectStoreAwsCredential { + key_id: "AKID_OLD".to_string(), + secret_key: "SECRET_OLD".to_string(), + token: Some("TOKEN_OLD".to_string()), + }); + + // Mock will return credentials expiring in 1 hour + let mock = Arc::new(MockStorageOptionsProvider::new(Some( + 3_600_000, // Expires in 1 hour + ))); + + let provider = Arc::new( + DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + mock.clone(), + Duration::from_secs(300), + initial_cred, + expires_at, + ), + ); + + // Spawn 20 concurrent tasks that all try to get credentials at the same time + // Since the initial credential is expired, they'll all try to refresh + let mut handles = vec![]; + for i in 0..20 { + let provider = provider.clone(); + let handle = tokio::spawn(async move { + let cred = provider.get_credential().await.unwrap(); + // All should get the new credentials (AKID_1 from first fetch) + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(cred.secret_key, "SECRET_1"); + assert_eq!(cred.token, Some("TOKEN_1".to_string())); + i + }); + handles.push(handle); + } + + // Wait for all tasks to complete + let results: Vec<_> = futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.unwrap()) + .collect(); + + // Verify all 20 tasks completed successfully + assert_eq!(results.len(), 20); + + // The provider should have been called at least once, but possibly more times + // due to the try_write mechanism and race conditions + let call_count = mock.get_call_count().await; + assert!( + call_count >= 1, + "Provider should be called at least once, was called {} times", + call_count + ); + + // It shouldn't be called 20 times though - the lock should prevent most concurrent fetches + assert!( + call_count < 10, + "Provider should not be called too many times due to lock contention, was called {} times", + call_count + ); + } } diff --git a/rust/lance-io/src/object_store/storage_options.rs b/rust/lance-io/src/object_store/storage_options.rs new file mode 100644 index 00000000000..9b1acba2b60 --- /dev/null +++ b/rust/lance-io/src/object_store/storage_options.rs @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Storage options provider for dynamic credential fetching +//! +//! This module provides a trait for fetching storage options from various sources +//! (namespace servers, secret managers, etc.) with support for expiration tracking +//! and automatic refresh. + +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +use crate::{Error, Result}; +use async_trait::async_trait; +use lance_namespace::models::DescribeTableRequest; +use lance_namespace::LanceNamespace; +use snafu::location; + +/// Key for the expiration timestamp in storage options HashMap +pub const EXPIRES_AT_MILLIS_KEY: &str = "expires_at_millis"; + +/// Trait for providing storage options with expiration tracking +/// +/// Implementations can fetch storage options from various sources (namespace servers, +/// secret managers, etc.) and are usable from Python/Java. +/// +/// # Equality and Hashing +/// +/// Implementations must provide `provider_id()` which returns a unique identifier for +/// equality and hashing purposes. Two providers with the same ID are considered equal +/// and will share the same cached ObjectStore in the registry. +#[async_trait] +pub trait StorageOptionsProvider: Send + Sync + fmt::Debug { + /// Fetch fresh storage options + /// + /// Returns None if no storage options are available, or Some(HashMap) with the options. + /// If the [`EXPIRES_AT_MILLIS_KEY`] key is present in the HashMap, it should contain the + /// epoch time in milliseconds when the options expire, and credentials will automatically + /// refresh before expiration. + /// If [`EXPIRES_AT_MILLIS_KEY`] is not provided, the options are considered to never expire. + async fn fetch_storage_options(&self) -> Result>>; + + /// Return a human-readable unique identifier for this provider instance + /// + /// This is used for equality comparison and hashing in the object store registry. + /// Two providers with the same ID will be treated as equal and share the same cached + /// ObjectStore. + /// + /// The ID should be human-readable for debugging and logging purposes. + /// For example: `"namespace[dir(root=/data)],table[db$schema$table1]"` + /// + /// The ID should uniquely identify the provider's configuration. + fn provider_id(&self) -> String; +} + +/// StorageOptionsProvider implementation that fetches options from a LanceNamespace +pub struct LanceNamespaceStorageOptionsProvider { + namespace: Arc, + table_id: Vec, +} + +impl fmt::Debug for LanceNamespaceStorageOptionsProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.provider_id()) + } +} + +impl fmt::Display for LanceNamespaceStorageOptionsProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.provider_id()) + } +} + +impl LanceNamespaceStorageOptionsProvider { + /// Create a new LanceNamespaceStorageOptionsProvider + /// + /// # Arguments + /// * `namespace` - The namespace implementation to fetch storage options from + /// * `table_id` - The table identifier + pub fn new(namespace: Arc, table_id: Vec) -> Self { + Self { + namespace, + table_id, + } + } +} + +#[async_trait] +impl StorageOptionsProvider for LanceNamespaceStorageOptionsProvider { + async fn fetch_storage_options(&self) -> Result>> { + let request = DescribeTableRequest { + id: Some(self.table_id.clone()), + version: None, + }; + + let response = self + .namespace + .describe_table(request) + .await + .map_err(|e| Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to fetch storage options: {}", + e + ))), + location: location!(), + })?; + + Ok(response.storage_options) + } + + fn provider_id(&self) -> String { + format!( + "LanceNamespaceStorageOptionsProvider {{ namespace: {}, table_id: {:?} }}", + self.namespace.namespace_id(), + self.table_id + ) + } +} diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 19ced63f482..60d99e84977 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -262,6 +262,18 @@ pub struct DirectoryNamespace { base_path: Path, } +impl std::fmt::Debug for DirectoryNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.namespace_id()) + } +} + +impl std::fmt::Display for DirectoryNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.namespace_id()) + } +} + impl DirectoryNamespace { /// Validate that the namespace ID represents the root namespace fn validate_root_namespace_id(id: &Option>) -> Result<()> { @@ -716,6 +728,10 @@ impl LanceNamespace for DirectoryNamespace { storage_options: self.storage_options.clone(), }) } + + fn namespace_id(&self) -> String { + format!("DirectoryNamespace {{ root: {:?} }}", self.root) + } } #[cfg(test)] diff --git a/rust/lance-namespace-impls/src/rest.rs b/rust/lance-namespace-impls/src/rest.rs index 202fca7b68b..12bea384c1c 100644 --- a/rust/lance-namespace-impls/src/rest.rs +++ b/rust/lance-namespace-impls/src/rest.rs @@ -299,6 +299,18 @@ pub struct RestNamespace { reqwest_config: Configuration, } +impl std::fmt::Debug for RestNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.namespace_id()) + } +} + +impl std::fmt::Display for RestNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.namespace_id()) + } +} + impl RestNamespace { /// Create a new REST namespace from builder pub(crate) fn from_builder(builder: RestNamespaceBuilder) -> Self { @@ -691,6 +703,13 @@ impl LanceNamespace for RestNamespace { .await .map_err(convert_api_error) } + + fn namespace_id(&self) -> String { + format!( + "RestNamespace {{ endpoint: {:?}, delimiter: {:?} }}", + self.reqwest_config.base_path, self.delimiter + ) + } } #[cfg(test)] diff --git a/rust/lance-namespace/src/namespace.rs b/rust/lance-namespace/src/namespace.rs index 67740233fde..ac2d0c8e176 100644 --- a/rust/lance-namespace/src/namespace.rs +++ b/rust/lance-namespace/src/namespace.rs @@ -30,7 +30,7 @@ use lance_namespace_reqwest_client::models::{ /// must provide. Each method corresponds to a specific operation on namespaces /// or tables. #[async_trait] -pub trait LanceNamespace: Send + Sync { +pub trait LanceNamespace: Send + Sync + std::fmt::Debug { /// List namespaces. async fn list_namespaces( &self, @@ -276,4 +276,19 @@ pub trait LanceNamespace: Send + Sync { location: Location::new(file!(), line!(), column!()), }) } + + /// Return a human-readable unique identifier for this namespace instance. + /// + /// This is used for equality comparison and hashing when the namespace is + /// used as part of a storage options provider. Two namespace instances with + /// the same ID are considered equal and will share cached resources. + /// + /// The ID should be human-readable for debugging and logging purposes. + /// For example: + /// - REST namespace: `"rest(endpoint=https://api.example.com)"` + /// - Directory namespace: `"dir(root=/path/to/data)"` + /// + /// Implementations should include all configuration that uniquely identifies + /// the namespace to provide semantic equality. + fn namespace_id(&self) -> String; } diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 156247bd32e..1d534eb4af0 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -731,6 +731,7 @@ pub async fn commit_handler_from_url( let options = options.clone().unwrap_or_default(); let storage_options = StorageOptions(options.storage_options.unwrap_or_default()); let dynamo_endpoint = get_dynamodb_endpoint(&storage_options); + let expires_at_millis = storage_options.expires_at_millis(); let storage_options = storage_options.as_s3_options(); let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned(); @@ -740,6 +741,8 @@ pub async fn commit_handler_from_url( options.aws_credentials.clone(), Some(&storage_options), region, + options.storage_options_provider.clone(), + expires_at_millis, ) .await?; diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index fb7ebf09efe..c84c84c75f0 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -11,8 +11,11 @@ use lance_core::utils::tracing::{DATASET_LOADING_EVENT, TRACE_DATASET_EVENTS}; use lance_file::datatypes::populate_schema_dictionary; use lance_file::v2::reader::FileReaderOptions; use lance_io::object_store::{ - ObjectStore, ObjectStoreParams, StorageOptions, DEFAULT_CLOUD_IO_PARALLELISM, + LanceNamespaceStorageOptionsProvider, ObjectStore, ObjectStoreParams, StorageOptions, + DEFAULT_CLOUD_IO_PARALLELISM, }; +use lance_namespace::models::DescribeTableRequest; +use lance_namespace::LanceNamespace; use lance_table::{ format::Manifest, io::commit::{commit_handler_from_url, CommitHandler}, @@ -26,7 +29,7 @@ use tracing::{info, instrument}; use url::Url; /// builder for loading a [`Dataset`]. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct DatasetBuilder { /// Cache size for index cache. If it is zero, index cache is disabled. index_cache_size_bytes: usize, @@ -41,6 +44,27 @@ pub struct DatasetBuilder { version: Option, table_uri: String, file_reader_options: Option, + /// Storage options that override user-provided options (e.g., from namespace) + storage_options_override: Option>, +} + +impl std::fmt::Debug for DatasetBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DatasetBuilder") + .field("index_cache_size_bytes", &self.index_cache_size_bytes) + .field("metadata_cache_size_bytes", &self.metadata_cache_size_bytes) + .field("manifest", &self.manifest.is_some()) + .field("session", &self.session.is_some()) + .field("commit_handler", &self.commit_handler.is_some()) + .field("version", &self.version) + .field("table_uri", &self.table_uri) + .field("file_reader_options", &self.file_reader_options) + .field( + "storage_options_override", + &self.storage_options_override.is_some(), + ) + .finish() + } } impl DatasetBuilder { @@ -55,8 +79,99 @@ impl DatasetBuilder { version: None, manifest: None, file_reader_options: None, + storage_options_override: None, } } + + /// Create a DatasetBuilder from a LanceNamespace + /// + /// This will automatically fetch the table location and storage options from the namespace + /// via `describe_table()`. + /// + /// Storage options from the namespace will override any user-provided storage options + /// set via `.with_storage_options()`. This ensures the namespace is always the source + /// of truth for storage options. + /// + /// # Arguments + /// * `namespace` - The namespace implementation to fetch table info from + /// * `table_id` - The table identifier (e.g., vec!["my_table"]) + /// * `refresh_storage_options` - If true, storage options will be automatically refreshed + /// from the namespace before they expire. This is currently only used for refreshing AWS + /// temporary access credentials. When enabled, the namespace will be queried periodically + /// to fetch new temporary credentials before the current ones expire. The new storage + /// options will contain updated AWS access credentials with a new expiration time. + /// Defaults to false. + /// + /// # Example + /// ```ignore + /// use lance_namespace_impls::ConnectBuilder; + /// use lance::dataset::DatasetBuilder; + /// + /// // Connect to a REST namespace + /// let namespace = ConnectBuilder::new("rest") + /// .property("uri", "http://localhost:8080") + /// .connect() + /// .await?; + /// + /// // Load a dataset from the namespace without storage options refresh + /// let dataset = DatasetBuilder::from_namespace( + /// namespace.clone(), + /// vec!["my_table".to_string()], + /// false, + /// ) + /// .await? + /// .load() + /// .await?; + /// + /// // Load a dataset with automatic storage options refresh + /// let dataset = DatasetBuilder::from_namespace( + /// namespace, + /// vec!["my_table".to_string()], + /// true, + /// ) + /// .await? + /// .load() + /// .await?; + /// ``` + pub async fn from_namespace( + namespace: Arc, + table_id: Vec, + refresh_storage_options: bool, + ) -> Result { + let request = DescribeTableRequest { + id: Some(table_id.clone()), + version: None, + }; + + let response = namespace + .describe_table(request) + .await + .map_err(|e| Error::Namespace { + source: Box::new(e), + location: location!(), + })?; + + let table_uri = response.location.ok_or_else(|| Error::Namespace { + source: Box::new(std::io::Error::other( + "Table location not found in namespace response", + )), + location: location!(), + })?; + + let mut builder = Self::from_uri(table_uri); + + // Store namespace storage options as override - these will be applied last + builder.storage_options_override = response.storage_options; + + // Set storage options provider if refresh is enabled + if refresh_storage_options { + builder.options.storage_options_provider = Some(Arc::new( + LanceNamespaceStorageOptionsProvider::new(namespace, table_id), + )); + } + + Ok(builder) + } } // Much of this builder is directly inspired from the to delta-rs table builder implementation @@ -192,6 +307,58 @@ impl DatasetBuilder { self } + /// Enable credential vending from a LanceNamespace + /// + /// Credentials will be automatically refreshed from the namespace + /// before they expire. The namespace should return `expires_at_millis` + /// in the storage_options from `describe_table()`. + /// + /// Use `with_s3_credentials_refresh_offset()` to configure how early + /// credentials should be refreshed before they expire (default is 5 minutes). + /// + /// # Arguments + /// * `provider` - The storage options provider to fetch credentials from + /// + /// # Example + /// ```ignore + /// use std::sync::Arc; + /// use std::time::Duration; + /// use lance_namespace_impls::ConnectBuilder; + /// use lance_io::object_store::{StorageOptionsProvider, LanceNamespaceStorageOptionsProvider}; + /// + /// // Connect to a REST namespace + /// let namespace = ConnectBuilder::new("rest") + /// .property("uri", "http://localhost:8080") + /// .connect() + /// .await?; + /// + /// // Create a storage options provider from namespace + /// let provider = Arc::new(LanceNamespaceStorageOptionsProvider::new( + /// namespace, + /// vec!["my_table".to_string()], + /// )); + /// + /// // With default settings (5 minute refresh offset) + /// let dataset = DatasetBuilder::from_uri("s3://bucket/table.lance") + /// .with_storage_options_provider(provider) + /// .load() + /// .await?; + /// ``` + /// + /// // With custom refresh offset (refresh 10 minutes before expiration) + /// let dataset = DatasetBuilder::from_uri("s3://bucket/table.lance") + /// .with_storage_options_provider(provider.clone()) + /// .with_s3_credentials_refresh_offset(Duration::from_secs(600)) + /// .load() + /// .await?; + pub fn with_storage_options_provider( + mut self, + provider: Arc, + ) -> Self { + self.options.storage_options_provider = Some(provider); + self + } + /// Set options based on [ReadParams]. pub fn with_read_params(mut self, read_params: ReadParams) -> Self { self = self @@ -311,6 +478,14 @@ impl DatasetBuilder { } async fn load_impl(mut self) -> Result { + // Apply storage_options_override last to ensure namespace options take precedence + if let Some(override_opts) = self.storage_options_override.take() { + let mut merged_opts = self.options.storage_options.clone().unwrap_or_default(); + // Override with namespace storage options - they take precedence + merged_opts.extend(override_opts); + self.options.storage_options = Some(merged_opts); + } + let session = match self.session.as_ref() { Some(session) => session.clone(), None => Arc::new(Session::new( From 68d951b72027d179a206f7cedcd671a2982f3107 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Fri, 31 Oct 2025 00:15:12 -0700 Subject: [PATCH 2/4] add additional explanations --- .../src/object_store/providers/aws.rs | 17 ++++++++++++++ .../src/object_store/storage_options.rs | 22 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index f39d18f3128..1883336feba 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -231,6 +231,15 @@ async fn resolve_s3_region( /// `aws_secret_access_key`, `aws_session_token`) /// 4. The default credential provider chain from AWS SDK. /// +/// # Initial Credentials with Storage Options Provider +/// +/// When `storage_options_provider` is provided along with `storage_options` and +/// `expires_at_millis`, these serve as **initial values** to avoid redundant calls to +/// fetch new storage options. The provider will use these initial credentials until they +/// expire (based on `expires_at_millis`), then automatically fetch fresh credentials from +/// the provider. Once the initial credentials expire, the passed-in values are no longer +/// used - all future credentials come from the provider's `fetch_storage_options()` method. +/// /// `credentials_refresh_offset` is the amount of time before expiry to refresh credentials. pub async fn build_aws_credential( credentials_refresh_offset: Duration, @@ -491,6 +500,14 @@ impl ObjectStoreParams { /// This adapter converts our generic StorageOptionsProvider trait into /// AWS-specific credentials that can be used with S3. It caches credentials /// and automatically refreshes them before they expire. +/// +/// # Future Work +/// +/// TODO: Support AWS/GCP/Azure together in a unified credential provider. +/// Currently this is AWS-specific. Needs investigation of how GCP and Azure credential +/// refresh mechanisms work and whether they can be unified with AWS's approach. +/// +/// See: pub struct DynamicStorageOptionsCredentialProvider { provider: Arc, cache: Arc>>, diff --git a/rust/lance-io/src/object_store/storage_options.rs b/rust/lance-io/src/object_store/storage_options.rs index 9b1acba2b60..c85611c84d4 100644 --- a/rust/lance-io/src/object_store/storage_options.rs +++ b/rust/lance-io/src/object_store/storage_options.rs @@ -25,6 +25,28 @@ pub const EXPIRES_AT_MILLIS_KEY: &str = "expires_at_millis"; /// Implementations can fetch storage options from various sources (namespace servers, /// secret managers, etc.) and are usable from Python/Java. /// +/// # Current Use Cases +/// +/// - **Temporary Credentials**: Fetch short-lived AWS temporary credentials that expire +/// after a set time period, with automatic refresh before expiration +/// +/// # Future Possible Use Cases +/// +/// - **Dynamic Storage Location Resolution**: Resolve logical names to actual storage +/// locations (bucket aliases, S3 Access Points, region-specific endpoints) that may +/// change based on region, tier, data migration, or failover scenarios +/// - **Runtime S3 Tags Assignment**: Inject cost allocation tags, security labels, or +/// compliance metadata into S3 requests based on the current execution context (user, +/// application, workspace, etc.) +/// - **Dynamic Endpoint Configuration**: Update storage endpoints for disaster recovery, +/// A/B testing, or gradual migration scenarios +/// - **Just-in-time Permission Elevation**: Request elevated permissions only when needed +/// for sensitive operations, then immediately revoke them +/// - **Secret Manager Integration**: Fetch credentials from HashiCorp Vault, AWS Secrets +/// Manager, Azure Key Vault, or Google Secret Manager with automatic rotation +/// - **OIDC/SAML Federation**: Integrate with identity providers to obtain storage +/// credentials based on user identity and group membership +/// /// # Equality and Hashing /// /// Implementations must provide `provider_id()` which returns a unique identifier for From 952eee05f61755deff04bdeffa8b6378782ba5bc Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Fri, 31 Oct 2025 00:16:52 -0700 Subject: [PATCH 3/4] fix typo --- rust/lance-io/src/object_store/storage_options.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-io/src/object_store/storage_options.rs b/rust/lance-io/src/object_store/storage_options.rs index c85611c84d4..9405f95d70c 100644 --- a/rust/lance-io/src/object_store/storage_options.rs +++ b/rust/lance-io/src/object_store/storage_options.rs @@ -42,8 +42,8 @@ pub const EXPIRES_AT_MILLIS_KEY: &str = "expires_at_millis"; /// A/B testing, or gradual migration scenarios /// - **Just-in-time Permission Elevation**: Request elevated permissions only when needed /// for sensitive operations, then immediately revoke them -/// - **Secret Manager Integration**: Fetch credentials from HashiCorp Vault, AWS Secrets -/// Manager, Azure Key Vault, or Google Secret Manager with automatic rotation +/// - **Secret Manager Integration**: Fetch encryption keys from AWS Secrets Manager, +/// Azure Key Vault, or Google Secret Manager with automatic rotation /// - **OIDC/SAML Federation**: Integrate with identity providers to obtain storage /// credentials based on user identity and group membership /// From 7855872dec85bf218d4ad239f5a874434b15f60c Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Fri, 31 Oct 2025 10:15:15 -0700 Subject: [PATCH 4/4] make allow options default and allow user to disable --- .../main/java/com/lancedb/lance/Dataset.java | 1 - .../com/lancedb/lance/OpenDatasetBuilder.java | 32 ++++++--------- .../lance/NamespaceIntegrationTest.java | 2 - python/python/lance/__init__.py | 41 ++++++------------- .../tests/test_namespace_integration.py | 15 ++++--- rust/lance/src/dataset/builder.rs | 26 ++++++------ 6 files changed, 46 insertions(+), 71 deletions(-) diff --git a/java/src/main/java/com/lancedb/lance/Dataset.java b/java/src/main/java/com/lancedb/lance/Dataset.java index f62805ce474..d99cf91dbe4 100644 --- a/java/src/main/java/com/lancedb/lance/Dataset.java +++ b/java/src/main/java/com/lancedb/lance/Dataset.java @@ -260,7 +260,6 @@ private static native Dataset openNative( * Dataset dataset = Dataset.open() * .namespace(myNamespace) * .tableId(Arrays.asList("my_table")) - * .refreshStorageOptions(true) * .build(); * } * diff --git a/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java b/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java index e55c8fbd2b1..1114bdcf876 100644 --- a/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java +++ b/java/src/main/java/com/lancedb/lance/OpenDatasetBuilder.java @@ -48,7 +48,6 @@ * Dataset dataset = Dataset.open() * .namespace(myNamespace) * .tableId(Arrays.asList("my_table")) - * .refreshStorageOptions(true) * .build(); * } */ @@ -59,7 +58,7 @@ public class OpenDatasetBuilder { private LanceNamespace namespace; private List tableId; private ReadOptions options = new ReadOptions.Builder().build(); - private boolean refreshStorageOptions = false; + private boolean ignoreNamespaceTableStorageOptions = false; /** Creates a new builder instance. Package-private, use Dataset.open() instead. */ OpenDatasetBuilder() {} @@ -130,18 +129,15 @@ public OpenDatasetBuilder readOptions(ReadOptions options) { } /** - * Sets whether storage options should be automatically refreshed before they expire. + * Sets whether to ignore storage options from the namespace's describe_table(). * - *

This is only applicable when using namespace-based opening. It is currently only used for - * refreshing AWS temporary access credentials. When enabled, the namespace will be queried - * periodically to fetch new temporary credentials before the current ones expire. The new storage - * options will contain updated AWS access credentials with a new expiration time. - * - * @param refreshStorageOptions If true, storage options will be automatically refreshed + * @param ignoreNamespaceTableStorageOptions If true, storage options returned from + * describe_table() will be ignored (treated as null) * @return this builder instance */ - public OpenDatasetBuilder refreshStorageOptions(boolean refreshStorageOptions) { - this.refreshStorageOptions = refreshStorageOptions; + public OpenDatasetBuilder ignoreNamespaceTableStorageOptions( + boolean ignoreNamespaceTableStorageOptions) { + this.ignoreNamespaceTableStorageOptions = ignoreNamespaceTableStorageOptions; return this; } @@ -203,26 +199,25 @@ private Dataset buildFromNamespace() { DescribeTableResponse response = namespace.describeTable(request); - // Extract location String location = response.getLocation(); if (location == null || location.isEmpty()) { throw new IllegalArgumentException("Namespace did not return a table location"); } - // Build new ReadOptions with initial storage options + Map namespaceStorageOptions = + ignoreNamespaceTableStorageOptions ? null : response.getStorageOptions(); + ReadOptions.Builder optionsBuilder = new ReadOptions.Builder() .setIndexCacheSizeBytes(options.getIndexCacheSizeBytes()) .setMetadataCacheSizeBytes(options.getMetadataCacheSizeBytes()); - // Only set storage options provider if refresh is enabled - if (refreshStorageOptions) { + if (namespaceStorageOptions != null && !namespaceStorageOptions.isEmpty()) { LanceNamespaceStorageOptionsProvider storageOptionsProvider = new LanceNamespaceStorageOptionsProvider(namespace, tableId); optionsBuilder.setStorageOptionsProvider(storageOptionsProvider); } - // Set optional fields only if present options.getVersion().ifPresent(optionsBuilder::setVersion); options.getBlockSize().ifPresent(optionsBuilder::setBlockSize); options.getSerializedManifest().ifPresent(optionsBuilder::setSerializedManifest); @@ -230,10 +225,9 @@ private Dataset buildFromNamespace() { .getS3CredentialsRefreshOffsetSeconds() .ifPresent(optionsBuilder::setS3CredentialsRefreshOffsetSeconds); - // Add initial storage options from describe_table response if present Map storageOptions = new HashMap<>(options.getStorageOptions()); - if (response.getStorageOptions() != null) { - storageOptions.putAll(response.getStorageOptions()); + if (namespaceStorageOptions != null) { + storageOptions.putAll(namespaceStorageOptions); } optionsBuilder.setStorageOptions(storageOptions); diff --git a/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java b/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java index 975d54583a0..039fbbce662 100644 --- a/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java +++ b/java/src/test/java/com/lancedb/lance/NamespaceIntegrationTest.java @@ -252,7 +252,6 @@ void testOpenDatasetWithoutRefresh() throws Exception { .allocator(allocator) .namespace(namespace) .tableId(Arrays.asList(tableName)) - .refreshStorageOptions(true) // Refresh enabled .readOptions(readOptions) .build()) { // With the fix, describeTable should only be called once during open @@ -357,7 +356,6 @@ void testStorageOptionsProviderWithRefresh() throws Exception { .allocator(allocator) .namespace(namespace) .tableId(Arrays.asList(tableName)) - .refreshStorageOptions(true) // Enable automatic refresh .readOptions(readOptions) .build()) { // With the fix, describeTable should only be called once during open diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index f77692399be..9f6016b8603 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -91,7 +91,7 @@ def dataset( session: Optional[Session] = None, namespace: Optional[any] = None, table_id: Optional[list] = None, - refresh_storage_options: bool = False, + ignore_namespace_table_storage_options: bool = False, s3_credentials_refresh_offset_seconds: Optional[int] = None, ) -> LanceDataset: """ @@ -162,15 +162,12 @@ def dataset( table_id : optional, list of str The table identifier when using a namespace (e.g., ["my_table"]). Must be provided together with `namespace`. Cannot be used with `uri`. - refresh_storage_options : bool, default False + ignore_namespace_table_storage_options : bool, default False Only applicable when using `namespace` and `table_id`. If True, storage - options will be automatically refreshed from the namespace before they - expire. This is currently only used for refreshing AWS temporary access - credentials. When enabled, the namespace will be queried periodically to - fetch new temporary credentials before the current ones expire. The new - storage options will contain updated AWS access credentials with a new - expiration time. If False (default), only the initial storage options - from describe_table() will be used. + options returned from the namespace's describe_table() will be ignored + (treated as None). If False (default), storage options from describe_table() + will be used and a dynamic storage options provider will be created to + automatically refresh credentials before they expire. s3_credentials_refresh_offset_seconds : optional, int The number of seconds before credential expiration to trigger a refresh. Default is 60 seconds. Only applicable when using AWS S3 with temporary @@ -183,8 +180,8 @@ def dataset( ----- When using `namespace` and `table_id`: - The `uri` parameter is optional and will be fetched from the namespace - - A `LanceNamespaceStorageOptionsProvider` will be created automatically only - if `refresh_storage_options=True` + - Storage options from describe_table() will be used unless + `ignore_namespace_table_storage_options=True` - Initial storage options from describe_table() will be merged with any provided `storage_options` """ @@ -215,30 +212,18 @@ def dataset( if uri is None: raise ValueError("Namespace did not return a 'location' for the table") - # Extract storage options from namespace - namespace_storage_options = table_info.get("storage_options", {}) + if ignore_namespace_table_storage_options: + namespace_storage_options = None + else: + namespace_storage_options = table_info.get("storage_options") - # Create storage options provider if refresh is enabled - if refresh_storage_options: + if namespace_storage_options: storage_options_provider = LanceNamespaceStorageOptionsProvider( namespace=namespace, table_id=table_id ) - # Merge storage options (namespace takes precedence) - # Pass initial credentials to Rust so it can cache them and avoid - # immediately calling fetch_storage_options() during object store creation - if storage_options is None: - storage_options = namespace_storage_options - else: - # Merge: user options first, then override with namespace options - merged_options = dict(storage_options) - merged_options.update(namespace_storage_options) - storage_options = merged_options - else: - # Without refresh, merge storage options (namespace takes precedence) if storage_options is None: storage_options = namespace_storage_options else: - # Merge: user options first, then override with namespace options merged_options = dict(storage_options) merged_options.update(namespace_storage_options) storage_options = merged_options diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 8b90e6667f6..0a275880217 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -199,14 +199,14 @@ def test_namespace_open_dataset(s3_bucket: str): ) namespace.register_table([table_name], table_uri) - # Open dataset through namespace (without refresh) + # Open dataset through namespace (ignoring storage options from namespace) # This should call describe_table once assert namespace.get_call_count() == 0 ds_from_namespace = lance.dataset( namespace=namespace, table_id=[table_name], - refresh_storage_options=False, + ignore_namespace_table_storage_options=True, ) # Verify describe_table was called once during open @@ -241,13 +241,12 @@ def test_namespace_with_refresh(s3_bucket: str): assert namespace.get_call_count() == 0 - # Open dataset with refresh enabled and short refresh offset - # Set refresh offset to 1 second (shorter than 2s credential lifetime) + # Open dataset with short refresh offset + # Storage options from namespace are used by default ds_from_namespace = lance.dataset( namespace=namespace, table_id=[table_name], - refresh_storage_options=True, # Enable automatic refresh - s3_credentials_refresh_offset_seconds=1, # Refresh 1s before expiration + s3_credentials_refresh_offset_seconds=1, ) initial_call_count = namespace.get_call_count() @@ -305,7 +304,7 @@ def test_namespace_append_through_namespace(s3_bucket: str): ds_from_namespace = lance.dataset( namespace=namespace, table_id=[table_name], - refresh_storage_options=False, + ignore_namespace_table_storage_options=True, ) assert ds_from_namespace.count_rows() == 1 @@ -324,7 +323,7 @@ def test_namespace_append_through_namespace(s3_bucket: str): ds_from_namespace = lance.dataset( namespace=namespace, table_id=[table_name], - refresh_storage_options=False, + ignore_namespace_table_storage_options=True, ) assert ds_from_namespace.count_rows() == 2 diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index c84c84c75f0..1820aaa03e6 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -95,12 +95,8 @@ impl DatasetBuilder { /// # Arguments /// * `namespace` - The namespace implementation to fetch table info from /// * `table_id` - The table identifier (e.g., vec!["my_table"]) - /// * `refresh_storage_options` - If true, storage options will be automatically refreshed - /// from the namespace before they expire. This is currently only used for refreshing AWS - /// temporary access credentials. When enabled, the namespace will be queried periodically - /// to fetch new temporary credentials before the current ones expire. The new storage - /// options will contain updated AWS access credentials with a new expiration time. - /// Defaults to false. + /// * `ignore_namespace_table_storage_options` - If true, storage options returned from + /// the namespace's `describe_table()` will be ignored (treated as None). Defaults to false. /// /// # Example /// ```ignore @@ -113,7 +109,7 @@ impl DatasetBuilder { /// .connect() /// .await?; /// - /// // Load a dataset from the namespace without storage options refresh + /// // Load a dataset using storage options from namespace /// let dataset = DatasetBuilder::from_namespace( /// namespace.clone(), /// vec!["my_table".to_string()], @@ -123,7 +119,7 @@ impl DatasetBuilder { /// .load() /// .await?; /// - /// // Load a dataset with automatic storage options refresh + /// // Load a dataset ignoring namespace storage options /// let dataset = DatasetBuilder::from_namespace( /// namespace, /// vec!["my_table".to_string()], @@ -136,7 +132,7 @@ impl DatasetBuilder { pub async fn from_namespace( namespace: Arc, table_id: Vec, - refresh_storage_options: bool, + ignore_namespace_table_storage_options: bool, ) -> Result { let request = DescribeTableRequest { id: Some(table_id.clone()), @@ -160,11 +156,15 @@ impl DatasetBuilder { let mut builder = Self::from_uri(table_uri); - // Store namespace storage options as override - these will be applied last - builder.storage_options_override = response.storage_options; + let namespace_storage_options = if ignore_namespace_table_storage_options { + None + } else { + response.storage_options + }; + + builder.storage_options_override = namespace_storage_options.clone(); - // Set storage options provider if refresh is enabled - if refresh_storage_options { + if namespace_storage_options.is_some() { builder.options.storage_options_provider = Some(Arc::new( LanceNamespaceStorageOptionsProvider::new(namespace, table_id), ));