diff --git a/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java b/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java index f8a92936666..fb65e235c36 100644 --- a/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java +++ b/java/src/main/java/org/lance/namespace/LanceNamespaceStorageOptionsProvider.java @@ -73,11 +73,13 @@ public LanceNamespaceStorageOptionsProvider(LanceNamespace namespace, ListThis calls namespace.describeTable() to get the latest credentials and their expiration - * time. + *

This calls namespace.describeTable() to get the latest credentials and optionally their + * expiration time. * - * @return Flat map of string key-value pairs containing credentials and expires_at_millis - * @throws RuntimeException if the namespace doesn't return storage credentials or expiration time + * @return Flat map of string key-value pairs containing credentials. May optionally include + * expires_at_millis. If expires_at_millis is not provided, credentials are treated as + * non-expiring and will not be automatically refreshed. + * @throws RuntimeException if the namespace doesn't return storage credentials */ @Override public Map fetchStorageOptions() { @@ -96,14 +98,9 @@ public Map fetchStorageOptions() { + "Ensure the namespace supports credential vending."); } - // Verify expires_at_millis is present - if (!storageOptions.containsKey("expires_at_millis")) { - throw new RuntimeException( - "Namespace storage_options missing 'expires_at_millis'. " - + "Credential refresh will not work properly."); - } - // Return storage_options directly - it's already a flat Map + // Note: expires_at_millis is optional. If not provided, credentials are treated + // as non-expiring and will not be automatically refreshed. return storageOptions; } diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index 426c7176d74..3e89ab0cfe9 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -448,18 +448,20 @@ def fetch_storage_options(self) -> Dict[str, str]: """Fetch storage options from the namespace. This calls namespace.describe_table() to get the latest storage options - and their expiration time. + and optionally their expiration time. Returns ------- Dict[str, str] - Flat dictionary of string key-value pairs containing storage options - and expires_at_millis + Flat dictionary of string key-value pairs containing storage options. + May optionally include expires_at_millis. If expires_at_millis is not + provided, credentials are treated as non-expiring and will not be + automatically refreshed. Raises ------ RuntimeError - If the namespace doesn't return storage options or expiration time + If the namespace doesn't return storage options """ request = DescribeTableRequest(id=self._table_id, version=None) response = self._namespace.describe_table(request) @@ -470,14 +472,9 @@ def fetch_storage_options(self) -> Dict[str, str]: "Ensure the namespace supports storage options providing." ) - # Verify expires_at_millis is present - if "expires_at_millis" not in storage_options: - raise RuntimeError( - "Namespace storage_options missing 'expires_at_millis'. " - "Storage options refresh will not work properly." - ) - # Return the storage_options directly - it's already a flat Map + # Note: expires_at_millis is optional. If not provided, credentials are treated + # as non-expiring and will not be automatically refreshed. return storage_options def provider_id(self) -> str: diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 9bd93bf029a..d5648d67c19 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -300,45 +300,24 @@ async fn build_aws_credential_with_storage_options_provider( storage_options_credentials: Option>, expires_at_millis: Option, ) -> Result { - match (expires_at_millis, credentials, storage_options_credentials) { - // Case 1: provider + credentials + expiration time - (Some(expires_at), Some(cred), _) => { - Ok(Arc::new( - DynamicStorageOptionsCredentialProvider::new_with_initial_credential( - storage_options_provider, - credentials_refresh_offset, - cred.get_credential().await?, - expires_at, - ), - )) - } - // Case 2: provider + storage_options (with valid credentials) + expiration time - (Some(expires_at), None, Some(cred)) => { - Ok(Arc::new( - DynamicStorageOptionsCredentialProvider::new_with_initial_credential( - storage_options_provider, - credentials_refresh_offset, - cred.get_credential().await?, - expires_at, - ), - )) - } - // Case 3: provider + storage_options without expiration - FAIL - (None, None, Some(_)) => Err(Error::IO { - source: Box::new(std::io::Error::other( - "expires_at_millis is required when using storage_options_provider with storage_options", - )), - location: location!(), - }), - // Case 4: provider + credentials without expiration - FAIL - (None, Some(_), _) => Err(Error::IO { - source: Box::new(std::io::Error::other( - "expires_at_millis is required when using storage_options_provider with credentials", - )), - location: location!(), - }), - // Case 5: provider without credentials/storage_options, or with expiration but no creds/opts - (_, None, None) => Ok(Arc::new(DynamicStorageOptionsCredentialProvider::new( + match (credentials, storage_options_credentials) { + // Case 1: Explicit aws_credentials provider takes precedence - use it directly + // without wrapping in DynamicStorageOptionsCredentialProvider, as the user's + // provider should handle its own credential refresh logic. + (Some(cred), _) => Ok(cred), + // Case 2: storage_options credentials - wrap with DynamicStorageOptionsCredentialProvider + // to enable auto-refresh from the namespace + (None, Some(cred)) => Ok(Arc::new( + DynamicStorageOptionsCredentialProvider::new_with_initial_credential( + storage_options_provider, + credentials_refresh_offset, + cred.get_credential().await?, + expires_at_millis, + ), + )), + // Case 3: No initial credentials - DynamicStorageOptionsCredentialProvider will + // fetch credentials from the namespace on first use + (None, None) => Ok(Arc::new(DynamicStorageOptionsCredentialProvider::new( storage_options_provider, credentials_refresh_offset, ))), @@ -550,18 +529,19 @@ impl DynamicStorageOptionsCredentialProvider { /// * `provider` - The storage options provider /// * `refresh_offset` - Duration before expiry to refresh credentials /// * `credential` - Initial credential to cache - /// * `expires_at_millis` - Expiration time in milliseconds since epoch (required for refresh) + /// * `expires_at_millis` - Expiration time in milliseconds since epoch. If None, credentials + /// are treated as non-expiring and will not be automatically refreshed. pub fn new_with_initial_credential( provider: Arc, refresh_offset: Duration, credential: Arc, - expires_at_millis: u64, + expires_at_millis: Option, ) -> Self { Self { provider, cache: Arc::new(RwLock::new(Some(CachedCredential { credential, - expires_at_millis: Some(expires_at_millis), + expires_at_millis, }))), refresh_offset, } @@ -908,7 +888,7 @@ mod tests { mock.clone(), Duration::from_secs(300), // 5 minute refresh offset initial_cred, - expires_at, + Some(expires_at), ); // First call should use cached credentials (not expired yet) @@ -944,7 +924,7 @@ mod tests { mock.clone(), Duration::from_secs(300), // 5 minute refresh offset initial_cred, - expired_time, + Some(expired_time), ); // First call should fetch new credentials because cached ones are expired @@ -1055,7 +1035,7 @@ mod tests { mock.clone(), Duration::from_secs(300), // 5 minute refresh offset initial_cred, - expires_at, + Some(expires_at), ); // First call should use the initial credential (not expired yet) @@ -1171,7 +1151,7 @@ mod tests { mock.clone(), Duration::from_secs(300), initial_cred, - expires_at, + Some(expires_at), ), ); @@ -1217,4 +1197,92 @@ mod tests { call_count ); } + + #[tokio::test] + async fn test_explicit_aws_credentials_takes_precedence_over_storage_options_provider() { + // Create a mock storage options provider that should NOT be called + let mock_storage_provider = Arc::new(MockStorageOptionsProvider::new(Some(600_000))); + + // Create an explicit AWS credentials provider + let explicit_cred_provider = Arc::new(MockAwsCredentialsProvider::default()); + + // Build credentials with both aws_credentials AND storage_options_provider + // The explicit aws_credentials should take precedence + let result = build_aws_credential_with_storage_options_provider( + mock_storage_provider.clone(), + Duration::from_secs(300), + Some(explicit_cred_provider.clone() as AwsCredentialProvider), + None, // no storage_options credentials + Some(1000), + ) + .await + .unwrap(); + + // Get credential from the result + let cred = result.get_credential().await.unwrap(); + + // The explicit provider should have been called (it returns empty strings) + assert!(explicit_cred_provider.called.load(Ordering::Relaxed)); + + // The storage options provider should NOT have been called + assert_eq!( + mock_storage_provider.get_call_count().await, + 0, + "Storage options provider should not be called when explicit aws_credentials is provided" + ); + + // Verify we got credentials from the explicit provider (empty strings) + assert_eq!(cred.key_id, ""); + assert_eq!(cred.secret_key, ""); + } + + #[tokio::test] + async fn test_storage_options_credentials_uses_dynamic_provider_when_no_explicit_aws_credentials( + ) { + MockClock::set_system_time(Duration::from_secs(100_000)); + + let now_ms = MockClock::system_time().as_millis() as u64; + + // Create a mock storage options provider + let mock_storage_provider = Arc::new(MockStorageOptionsProvider::new(Some(600_000))); + + // Create storage_options credentials (simulating credentials from storage_options map) + let storage_options_cred = StaticCredentialProvider::new(ObjectStoreAwsCredential { + key_id: "AKID_FROM_STORAGE_OPTIONS".to_string(), + secret_key: "SECRET_FROM_STORAGE_OPTIONS".to_string(), + token: Some("TOKEN_FROM_STORAGE_OPTIONS".to_string()), + }); + + let expires_at = now_ms + 600_000; // 10 minutes from now + + // Build credentials with storage_options_credentials but NO explicit aws_credentials + let result = build_aws_credential_with_storage_options_provider( + mock_storage_provider.clone(), + Duration::from_secs(300), + None, // no explicit aws_credentials + Some(storage_options_cred), + Some(expires_at), + ) + .await + .unwrap(); + + // Get credential - should use the initial storage_options credentials + let cred = result.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_FROM_STORAGE_OPTIONS"); + assert_eq!(cred.secret_key, "SECRET_FROM_STORAGE_OPTIONS"); + + // Storage options provider should NOT have been called yet (using cached initial creds) + assert_eq!(mock_storage_provider.get_call_count().await, 0); + + // Advance time to trigger refresh (past the 5 minute refresh offset) + MockClock::set_system_time(Duration::from_secs(100_000 + 360)); + + // Get credential again - should now fetch from provider + let cred = result.get_credential().await.unwrap(); + assert_eq!(cred.key_id, "AKID_1"); + assert_eq!(cred.secret_key, "SECRET_1"); + + // Storage options provider should have been called once + assert_eq!(mock_storage_provider.get_call_count().await, 1); + } }