Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ public LanceNamespaceStorageOptionsProvider(LanceNamespace namespace, List<Strin
/**
* Fetch credentials from the namespace.
*
* <p>This calls namespace.describeTable() to get the latest credentials and their expiration
* time.
* <p>This calls namespace.describeTable() to get the latest credentials and optionally their
* expiration time.
*
* @return Flat map of string key-value pairs containing credentials and expires_at_millis
* @throws RuntimeException if the namespace doesn't return storage credentials or expiration time
* @return Flat map of string key-value pairs containing credentials. May optionally include
* expires_at_millis. If expires_at_millis is not provided, credentials are treated as
* non-expiring and will not be automatically refreshed.
* @throws RuntimeException if the namespace doesn't return storage credentials
*/
@Override
public Map<String, String> fetchStorageOptions() {
Expand All @@ -96,14 +98,9 @@ public Map<String, String> fetchStorageOptions() {
+ "Ensure the namespace supports credential vending.");
}

// Verify expires_at_millis is present
if (!storageOptions.containsKey("expires_at_millis")) {
throw new RuntimeException(
"Namespace storage_options missing 'expires_at_millis'. "
+ "Credential refresh will not work properly.");
}

// Return storage_options directly - it's already a flat Map<String, String>
// Note: expires_at_millis is optional. If not provided, credentials are treated
// as non-expiring and will not be automatically refreshed.
return storageOptions;
}

Expand Down
19 changes: 8 additions & 11 deletions python/python/lance/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,18 +448,20 @@ def fetch_storage_options(self) -> Dict[str, str]:
"""Fetch storage options from the namespace.

This calls namespace.describe_table() to get the latest storage options
and their expiration time.
and optionally their expiration time.

Returns
-------
Dict[str, str]
Flat dictionary of string key-value pairs containing storage options
and expires_at_millis
Flat dictionary of string key-value pairs containing storage options.
May optionally include expires_at_millis. If expires_at_millis is not
provided, credentials are treated as non-expiring and will not be
automatically refreshed.

Raises
------
RuntimeError
If the namespace doesn't return storage options or expiration time
If the namespace doesn't return storage options
"""
request = DescribeTableRequest(id=self._table_id, version=None)
response = self._namespace.describe_table(request)
Expand All @@ -470,14 +472,9 @@ def fetch_storage_options(self) -> Dict[str, str]:
"Ensure the namespace supports storage options providing."
)

# Verify expires_at_millis is present
if "expires_at_millis" not in storage_options:
raise RuntimeError(
"Namespace storage_options missing 'expires_at_millis'. "
"Storage options refresh will not work properly."
)

# Return the storage_options directly - it's already a flat Map<String, String>
# Note: expires_at_millis is optional. If not provided, credentials are treated
# as non-expiring and will not be automatically refreshed.
return storage_options

def provider_id(self) -> str:
Expand Down
160 changes: 114 additions & 46 deletions rust/lance-io/src/object_store/providers/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,45 +300,24 @@ async fn build_aws_credential_with_storage_options_provider(
storage_options_credentials: Option<StaticCredentialProvider<ObjectStoreAwsCredential>>,
expires_at_millis: Option<u64>,
) -> Result<AwsCredentialProvider> {
match (expires_at_millis, credentials, storage_options_credentials) {
// Case 1: provider + credentials + expiration time
(Some(expires_at), Some(cred), _) => {
Ok(Arc::new(
DynamicStorageOptionsCredentialProvider::new_with_initial_credential(
storage_options_provider,
credentials_refresh_offset,
cred.get_credential().await?,
expires_at,
),
))
}
// Case 2: provider + storage_options (with valid credentials) + expiration time
(Some(expires_at), None, Some(cred)) => {
Ok(Arc::new(
DynamicStorageOptionsCredentialProvider::new_with_initial_credential(
storage_options_provider,
credentials_refresh_offset,
cred.get_credential().await?,
expires_at,
),
))
}
// Case 3: provider + storage_options without expiration - FAIL
(None, None, Some(_)) => Err(Error::IO {
source: Box::new(std::io::Error::other(
"expires_at_millis is required when using storage_options_provider with storage_options",
)),
location: location!(),
}),
// Case 4: provider + credentials without expiration - FAIL
(None, Some(_), _) => Err(Error::IO {
source: Box::new(std::io::Error::other(
"expires_at_millis is required when using storage_options_provider with credentials",
)),
location: location!(),
}),
// Case 5: provider without credentials/storage_options, or with expiration but no creds/opts
(_, None, None) => Ok(Arc::new(DynamicStorageOptionsCredentialProvider::new(
match (credentials, storage_options_credentials) {
// Case 1: Explicit aws_credentials provider takes precedence - use it directly
// without wrapping in DynamicStorageOptionsCredentialProvider, as the user's
// provider should handle its own credential refresh logic.
(Some(cred), _) => Ok(cred),
// Case 2: storage_options credentials - wrap with DynamicStorageOptionsCredentialProvider
// to enable auto-refresh from the namespace
(None, Some(cred)) => Ok(Arc::new(
DynamicStorageOptionsCredentialProvider::new_with_initial_credential(
storage_options_provider,
credentials_refresh_offset,
cred.get_credential().await?,
expires_at_millis,
),
)),
// Case 3: No initial credentials - DynamicStorageOptionsCredentialProvider will
// fetch credentials from the namespace on first use
(None, None) => Ok(Arc::new(DynamicStorageOptionsCredentialProvider::new(
storage_options_provider,
credentials_refresh_offset,
))),
Expand Down Expand Up @@ -550,18 +529,19 @@ impl DynamicStorageOptionsCredentialProvider {
/// * `provider` - The storage options provider
/// * `refresh_offset` - Duration before expiry to refresh credentials
/// * `credential` - Initial credential to cache
/// * `expires_at_millis` - Expiration time in milliseconds since epoch (required for refresh)
/// * `expires_at_millis` - Expiration time in milliseconds since epoch. If None, credentials
/// are treated as non-expiring and will not be automatically refreshed.
pub fn new_with_initial_credential(
provider: Arc<dyn StorageOptionsProvider>,
refresh_offset: Duration,
credential: Arc<ObjectStoreAwsCredential>,
expires_at_millis: u64,
expires_at_millis: Option<u64>,
) -> Self {
Self {
provider,
cache: Arc::new(RwLock::new(Some(CachedCredential {
credential,
expires_at_millis: Some(expires_at_millis),
expires_at_millis,
}))),
refresh_offset,
}
Expand Down Expand Up @@ -908,7 +888,7 @@ mod tests {
mock.clone(),
Duration::from_secs(300), // 5 minute refresh offset
initial_cred,
expires_at,
Some(expires_at),
);

// First call should use cached credentials (not expired yet)
Expand Down Expand Up @@ -944,7 +924,7 @@ mod tests {
mock.clone(),
Duration::from_secs(300), // 5 minute refresh offset
initial_cred,
expired_time,
Some(expired_time),
);

// First call should fetch new credentials because cached ones are expired
Expand Down Expand Up @@ -1055,7 +1035,7 @@ mod tests {
mock.clone(),
Duration::from_secs(300), // 5 minute refresh offset
initial_cred,
expires_at,
Some(expires_at),
);

// First call should use the initial credential (not expired yet)
Expand Down Expand Up @@ -1171,7 +1151,7 @@ mod tests {
mock.clone(),
Duration::from_secs(300),
initial_cred,
expires_at,
Some(expires_at),
),
);

Expand Down Expand Up @@ -1217,4 +1197,92 @@ mod tests {
call_count
);
}

#[tokio::test]
async fn test_explicit_aws_credentials_takes_precedence_over_storage_options_provider() {
// Create a mock storage options provider that should NOT be called
let mock_storage_provider = Arc::new(MockStorageOptionsProvider::new(Some(600_000)));

// Create an explicit AWS credentials provider
let explicit_cred_provider = Arc::new(MockAwsCredentialsProvider::default());

// Build credentials with both aws_credentials AND storage_options_provider
// The explicit aws_credentials should take precedence
let result = build_aws_credential_with_storage_options_provider(
mock_storage_provider.clone(),
Duration::from_secs(300),
Some(explicit_cred_provider.clone() as AwsCredentialProvider),
None, // no storage_options credentials
Some(1000),
)
.await
.unwrap();

// Get credential from the result
let cred = result.get_credential().await.unwrap();

// The explicit provider should have been called (it returns empty strings)
assert!(explicit_cred_provider.called.load(Ordering::Relaxed));

// The storage options provider should NOT have been called
assert_eq!(
mock_storage_provider.get_call_count().await,
0,
"Storage options provider should not be called when explicit aws_credentials is provided"
);

// Verify we got credentials from the explicit provider (empty strings)
assert_eq!(cred.key_id, "");
assert_eq!(cred.secret_key, "");
}

#[tokio::test]
async fn test_storage_options_credentials_uses_dynamic_provider_when_no_explicit_aws_credentials(
) {
MockClock::set_system_time(Duration::from_secs(100_000));

let now_ms = MockClock::system_time().as_millis() as u64;

// Create a mock storage options provider
let mock_storage_provider = Arc::new(MockStorageOptionsProvider::new(Some(600_000)));

// Create storage_options credentials (simulating credentials from storage_options map)
let storage_options_cred = StaticCredentialProvider::new(ObjectStoreAwsCredential {
key_id: "AKID_FROM_STORAGE_OPTIONS".to_string(),
secret_key: "SECRET_FROM_STORAGE_OPTIONS".to_string(),
token: Some("TOKEN_FROM_STORAGE_OPTIONS".to_string()),
});

let expires_at = now_ms + 600_000; // 10 minutes from now

// Build credentials with storage_options_credentials but NO explicit aws_credentials
let result = build_aws_credential_with_storage_options_provider(
mock_storage_provider.clone(),
Duration::from_secs(300),
None, // no explicit aws_credentials
Some(storage_options_cred),
Some(expires_at),
)
.await
.unwrap();

// Get credential - should use the initial storage_options credentials
let cred = result.get_credential().await.unwrap();
assert_eq!(cred.key_id, "AKID_FROM_STORAGE_OPTIONS");
assert_eq!(cred.secret_key, "SECRET_FROM_STORAGE_OPTIONS");

// Storage options provider should NOT have been called yet (using cached initial creds)
assert_eq!(mock_storage_provider.get_call_count().await, 0);

// Advance time to trigger refresh (past the 5 minute refresh offset)
MockClock::set_system_time(Duration::from_secs(100_000 + 360));

// Get credential again - should now fetch from provider
let cred = result.get_credential().await.unwrap();
assert_eq!(cred.key_id, "AKID_1");
assert_eq!(cred.secret_key, "SECRET_1");

// Storage options provider should have been called once
assert_eq!(mock_storage_provider.get_call_count().await, 1);
}
}
Loading