Skip to content

Conversation

@ChaladiMohanVamsi
Copy link
Contributor

Proposed Change

Add support to refresh and consume vended storage credentials for ADLSFileIO.

New Azure properties

Requirements of credential config

  • Required to have Sas token expiration time as part of credential config along with Sas token.
  • Expected prefix for the expiration time is adls.sas-token-expire-at-ms. similar to existing sas token prefix adls.sas-token.
  • Expected to have only one ADLS credential per storage-account prefix.

Similar PRs for other FileIOs

@ChaladiMohanVamsi ChaladiMohanVamsi marked this pull request as draft November 18, 2024 11:26
@ChaladiMohanVamsi ChaladiMohanVamsi marked this pull request as ready for review November 18, 2024 11:47
@ChaladiMohanVamsi
Copy link
Contributor Author

cc// @nastra @jackye1995 @amogh-jahagirdar @munendrasn Can you please help with the review.

@amogh-jahagirdar amogh-jahagirdar self-requested a review November 19, 2024 13:24
private LoadCredentialsResponse fetchCredentials() {
Map<String, String> headers =
RESTUtil.merge(
configHeaders(properties),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate why this is needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to implement the similar behaviour present in RESTSessionCatalog, where catalog can be configured to pass explicit headers to server by setting the configuration with header. prefix.

private static final long MIN_REFRESH_WAIT_MILLIS = 10;

public AzureSasCredentialRefresher(
Supplier<Pair<String, Long>> sasTokenWithExpirationSupplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this actually need to be a supplier given that it's being immediately used in L40?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the supplier is being used during initialization and also during each scheduled refresh to fetch the new credentials. Supplier holds the logic to fetch new credentials from the API endpoint, since we are going to use it multiple times I modelled it as supplier instead of single method call. Please suggest if there is a cleaner way to achieve the same.

.isEqualTo(credential.config().get(ADLS_SAS_TOKEN_PREFIX + STORAGE_ACCOUNT));

Thread.sleep(20);
// Since expiration time past to current time, the refresh will fall back at minimum 10ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this comment is trying to say. Also what happens if you remove the sleep time above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Since the credential refresh implementation is based on scheduling a new refresh using the server provided expiration time,
  • this test is covering the scenario where server responds with already expired token (expires-at-ms < current-time).
  • The scheduling delay for consequent refresh is minimum delay of 10ms as per the logic in refreshDelayMillis()
  • If we remove the sleep time, the credentials will be fetched from API only once. Depending on the test case execution time (> 10ms) there can be second credential fetch. To keep the test case behaviour consistent I used thread sleep.

@nastra
Copy link
Contributor

nastra commented Dec 3, 2024

@ChaladiMohanVamsi thanks for working on this. Do you have a way of actually testing this PR with an ADLS environment and see whether the refreshes work?

@ChaladiMohanVamsi
Copy link
Contributor Author

@nastra @amogh-jahagirdar Thanks for the review and suggestions. I have addressed the review comments.

I was able to test the credentials refresh logic in ADLS environment, since I don't have the new credentials API endpoint spec implementation in my env I used loadTable endpoint for testing by tweaking few parts of this PR.

if (null == client) {
synchronized (this) {
if (null == client) {
DefaultAuthSession authSession =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to change with the introduction of the new Auth manager, so I would wait until that goes in then switch over to using that approach. See how this is done for S3 here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated the Auth manager releated changes referring to the changes for S3 FileIO and GCS FileIO.

cc// @adutra can you please review the changes.

@nastra nastra added this to the Iceberg 1.9.0 milestone Mar 25, 2025
@ajantha-bhat
Copy link
Member

@ChaladiMohanVamsi: Are you still working on this PR? @nastra has added 1.9.0 milestone for this. I was thinking a 1.9.0 release cut tomorrow evening if everything is merged.

@ChaladiMohanVamsi
Copy link
Contributor Author

@ChaladiMohanVamsi: Are you still working on this PR? @nastra has added 1.9.0 milestone for this. I was thinking a 1.9.0 release cut tomorrow evening if everything is merged.

Yes @ajantha-bhat, I will try to address the comments by today.

@ChaladiMohanVamsi
Copy link
Contributor Author

@nastra @danielcweeks can you please review on the latest changes handling review comments.

Map<String, String> credentialProviderProperties = Maps.newHashMap(allProperties);
credentialProviderProperties.put(
VendedAdlsCredentialProvider.URI, adlsRefreshCredentialsEndpoint);
Optional.ofNullable(allProperties.get(OAuth2Properties.TOKEN))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this, since we're already copying over allProperties into credentialProviderProperties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed redundant operation of setting token.

public void close() {
if (vendedAdlsCredentialProvider != null) {
vendedAdlsCredentialProvider.close();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline after }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied


public VendedAdlsCredentialProvider(Map<String, String> properties) {
Preconditions.checkArgument(null != properties, "Invalid properties: null");
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
Copy link
Contributor

@nastra nastra Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please apply the below diff to this class?

--- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/VendedAdlsCredentialProvider.java
+++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/VendedAdlsCredentialProvider.java
@@ -28,6 +28,7 @@ import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.azure.AzureProperties;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -48,6 +49,8 @@ public class VendedAdlsCredentialProvider implements Serializable, AutoCloseable
   public static final String URI = "credentials.uri";

   private final SerializableMap<String, String> properties;
+  private final String credentialsEndpoint;
+  private final String catalogEndpoint;
   private transient volatile Map<String, SimpleTokenCache> sasCredentialByAccount;
   private transient volatile HTTPClient client;
   private transient AuthManager authManager;
@@ -55,8 +58,12 @@ public class VendedAdlsCredentialProvider implements Serializable, AutoCloseable

   public VendedAdlsCredentialProvider(Map<String, String> properties) {
     Preconditions.checkArgument(null != properties, "Invalid properties: null");
-    Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
+    Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials endpoint: null");
+    Preconditions.checkArgument(
+        null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null");
     this.properties = SerializableMap.copyOf(properties);
+    this.credentialsEndpoint = properties.get(URI);
+    this.catalogEndpoint = properties.get(CatalogProperties.URI);
   }

   String credentialForAccount(String storageAccount) {
@@ -117,7 +124,7 @@ public class VendedAdlsCredentialProvider implements Serializable, AutoCloseable
       synchronized (this) {
         if (null == client) {
           authManager = AuthManagers.loadAuthManager("adls-credentials-refresh", properties);
-          HTTPClient httpClient = HTTPClient.builder(properties).uri(properties.get(URI)).build();
+          HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build();
           authSession = authManager.catalogSession(httpClient, properties);
           client = httpClient.withAuthSession(authSession);
         }
@@ -130,7 +137,7 @@ public class VendedAdlsCredentialProvider implements Serializable, AutoCloseable
   private LoadCredentialsResponse fetchCredentials() {
     return httpClient()
         .get(
-            properties.get(URI),
+            credentialsEndpoint,
             null,

This is fixing an issue that we're currently also fixing for S3 / GCP in #12612 / #12638. You will also have to update the tests in a similar way as in those PRs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the patch and associated test cases.

import com.azure.core.http.policy.HttpPipelinePolicy;
import reactor.core.publisher.Mono;

public class VendedAzureSasCredentialPolicy implements HttpPipelinePolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should be able to make this non-public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it to non-public.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM once the remaining comments have been applied

Copy link
Contributor

@danielcweeks danielcweeks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending checks. Thanks @ChaladiMohanVamsi !

@nastra
Copy link
Contributor

nastra commented Mar 28, 2025

thanks @ChaladiMohanVamsi

@nastra nastra merged commit 68f8053 into apache:main Mar 28, 2025
43 checks passed
vendedAdlsCredentialProvider.close();
}

DelegateFileIO.super.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be enclosed in finally block so that it is always run ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants