-
Notifications
You must be signed in to change notification settings - Fork 3k
Azure: Support vended credentials refresh in ADLSFileIO. #11577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a63232a
7cb7e72
e19f7e8
4b869c5
6b0f859
54112b9
c92479d
bc08205
5913fd1
7fb5ff3
016347d
a135ca0
b97ee0c
0a2f094
9fc4f23
a89980e
b398961
3f33808
0f491c8
28e7ba6
d419ecb
da31d42
ec3e679
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| import com.azure.storage.file.datalake.models.ListPathsOptions; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import org.apache.iceberg.azure.AzureProperties; | ||
| import org.apache.iceberg.common.DynConstructors; | ||
|
|
@@ -55,6 +56,7 @@ public class ADLSFileIO implements DelegateFileIO { | |
| private AzureProperties azureProperties; | ||
| private MetricsContext metrics = MetricsContext.nullMetrics(); | ||
| private SerializableMap<String, String> properties; | ||
| private VendedAdlsCredentialProvider vendedAdlsCredentialProvider; | ||
|
|
||
| /** | ||
| * No-arg constructor to load the FileIO dynamically. | ||
|
|
@@ -111,6 +113,9 @@ DataLakeFileSystemClient client(ADLSLocation location) { | |
| new DataLakeFileSystemClientBuilder().httpClient(HTTP); | ||
|
|
||
| location.container().ifPresent(clientBuilder::fileSystemName); | ||
| Optional.ofNullable(vendedAdlsCredentialProvider) | ||
| .map(p -> new VendedAzureSasCredentialPolicy(location.host(), p)) | ||
| .ifPresent(clientBuilder::addPolicy); | ||
| azureProperties.applyClientConfiguration(location.host(), clientBuilder); | ||
|
|
||
| return clientBuilder.buildClient(); | ||
|
|
@@ -126,6 +131,9 @@ public void initialize(Map<String, String> props) { | |
| this.properties = SerializableMap.copyOf(props); | ||
| this.azureProperties = new AzureProperties(properties); | ||
| initMetrics(properties); | ||
| this.azureProperties | ||
| .vendedAdlsCredentialProvider() | ||
| .ifPresent((provider -> this.vendedAdlsCredentialProvider = provider)); | ||
| } | ||
|
|
||
| @SuppressWarnings("CatchBlockLogException") | ||
|
|
@@ -212,4 +220,13 @@ public void deletePrefix(String prefix) { | |
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (vendedAdlsCredentialProvider != null) { | ||
| vendedAdlsCredentialProvider.close(); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newline after }
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. applied |
||
|
|
||
| DelegateFileIO.super.close(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be enclosed in finally block so that it is always run ? |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.azure.adlsv2; | ||
|
|
||
| import com.azure.core.credential.AccessToken; | ||
| import com.azure.core.credential.SimpleTokenCache; | ||
| import java.io.IOException; | ||
| import java.io.Serializable; | ||
| import java.io.UncheckedIOException; | ||
| import java.time.Instant; | ||
| import java.time.ZoneOffset; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.CatalogProperties; | ||
| import org.apache.iceberg.azure.AzureProperties; | ||
| import org.apache.iceberg.io.CloseableGroup; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.rest.ErrorHandlers; | ||
| import org.apache.iceberg.rest.HTTPClient; | ||
| import org.apache.iceberg.rest.RESTClient; | ||
| import org.apache.iceberg.rest.auth.AuthManager; | ||
| import org.apache.iceberg.rest.auth.AuthManagers; | ||
| import org.apache.iceberg.rest.auth.AuthSession; | ||
| import org.apache.iceberg.rest.credentials.Credential; | ||
| import org.apache.iceberg.rest.responses.LoadCredentialsResponse; | ||
| import org.apache.iceberg.util.SerializableMap; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| public class VendedAdlsCredentialProvider implements Serializable, AutoCloseable { | ||
ChaladiMohanVamsi marked this conversation as resolved.
Show resolved
Hide resolved
ChaladiMohanVamsi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public static final String URI = "credentials.uri"; | ||
|
|
||
| private final SerializableMap<String, String> properties; | ||
| private final String credentialsEndpoint; | ||
| private final String catalogEndpoint; | ||
| private transient volatile Map<String, SimpleTokenCache> sasCredentialByAccount; | ||
| private transient volatile HTTPClient client; | ||
| private transient AuthManager authManager; | ||
| private transient AuthSession authSession; | ||
|
|
||
| public VendedAdlsCredentialProvider(Map<String, String> properties) { | ||
| Preconditions.checkArgument(null != properties, "Invalid properties: null"); | ||
| Preconditions.checkArgument(null != properties.get(URI), "Invalid credentials endpoint: null"); | ||
| Preconditions.checkArgument( | ||
| null != properties.get(CatalogProperties.URI), "Invalid catalog endpoint: null"); | ||
| this.properties = SerializableMap.copyOf(properties); | ||
| this.credentialsEndpoint = properties.get(URI); | ||
| this.catalogEndpoint = properties.get(CatalogProperties.URI); | ||
| } | ||
|
|
||
| String credentialForAccount(String storageAccount) { | ||
| return sasCredentialByAccount() | ||
| .computeIfAbsent( | ||
| storageAccount, | ||
| ignored -> | ||
| new SimpleTokenCache( | ||
| () -> Mono.fromSupplier(() -> sasTokenForAccount(storageAccount)))) | ||
| .getToken() | ||
| .map(AccessToken::getToken) | ||
| .block(); | ||
| } | ||
|
|
||
| private AccessToken sasTokenForAccount(String storageAccount) { | ||
| LoadCredentialsResponse response = fetchCredentials(); | ||
| List<Credential> adlsCredentials = | ||
| response.credentials().stream() | ||
| .filter(c -> c.prefix().contains(storageAccount)) | ||
| .collect(Collectors.toList()); | ||
| Preconditions.checkState( | ||
| !adlsCredentials.isEmpty(), | ||
| String.format("Invalid ADLS Credentials for storage-account %s: empty", storageAccount)); | ||
| Preconditions.checkState( | ||
| adlsCredentials.size() == 1, | ||
| "Invalid ADLS Credentials: only one ADLS credential should exist per storage-account"); | ||
|
|
||
| Credential adlsCredential = adlsCredentials.get(0); | ||
| checkCredential(adlsCredential, AzureProperties.ADLS_SAS_TOKEN_PREFIX + storageAccount); | ||
| checkCredential( | ||
| adlsCredential, AzureProperties.ADLS_SAS_TOKEN_EXPIRES_AT_MS_PREFIX + storageAccount); | ||
|
|
||
| String sasToken = | ||
| adlsCredential.config().get(AzureProperties.ADLS_SAS_TOKEN_PREFIX + storageAccount); | ||
| Instant tokenExpiresAt = | ||
| Instant.ofEpochMilli( | ||
| Long.parseLong( | ||
| adlsCredential | ||
| .config() | ||
| .get(AzureProperties.ADLS_SAS_TOKEN_EXPIRES_AT_MS_PREFIX + storageAccount))); | ||
|
|
||
| return new AccessToken(sasToken, tokenExpiresAt.atOffset(ZoneOffset.UTC)); | ||
| } | ||
|
|
||
| private Map<String, SimpleTokenCache> sasCredentialByAccount() { | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (this.sasCredentialByAccount == null) { | ||
| synchronized (this) { | ||
| if (this.sasCredentialByAccount == null) { | ||
| this.sasCredentialByAccount = Maps.newHashMap(); | ||
| } | ||
| } | ||
| } | ||
| return this.sasCredentialByAccount; | ||
| } | ||
|
|
||
| private RESTClient httpClient() { | ||
ChaladiMohanVamsi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (null == client) { | ||
| synchronized (this) { | ||
| if (null == client) { | ||
| authManager = AuthManagers.loadAuthManager("adls-credentials-refresh", properties); | ||
| HTTPClient httpClient = HTTPClient.builder(properties).uri(catalogEndpoint).build(); | ||
| authSession = authManager.catalogSession(httpClient, properties); | ||
| client = httpClient.withAuthSession(authSession); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return client; | ||
| } | ||
|
|
||
| private LoadCredentialsResponse fetchCredentials() { | ||
| return httpClient() | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .get( | ||
| credentialsEndpoint, | ||
| null, | ||
| LoadCredentialsResponse.class, | ||
| Map.of(), | ||
| ErrorHandlers.defaultErrorHandler()); | ||
| } | ||
|
|
||
| private void checkCredential(Credential credential, String property) { | ||
| Preconditions.checkState( | ||
| credential.config().containsKey(property), | ||
| "Invalid ADLS Credentials: %s not set", | ||
| property); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| CloseableGroup closeableGroup = new CloseableGroup(); | ||
| closeableGroup.addCloseable(authSession); | ||
| closeableGroup.addCloseable(authManager); | ||
| closeableGroup.addCloseable(client); | ||
| closeableGroup.setSuppressCloseFailure(true); | ||
| try { | ||
| closeableGroup.close(); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to close the VendedAdlsCredentialProvider", e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.azure.adlsv2; | ||
|
|
||
| import com.azure.core.credential.AzureSasCredential; | ||
| import com.azure.core.http.HttpPipelineCallContext; | ||
| import com.azure.core.http.HttpPipelineNextPolicy; | ||
| import com.azure.core.http.HttpPipelineNextSyncPolicy; | ||
| import com.azure.core.http.HttpResponse; | ||
| import com.azure.core.http.policy.AzureSasCredentialPolicy; | ||
| import com.azure.core.http.policy.HttpPipelinePolicy; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| class VendedAzureSasCredentialPolicy implements HttpPipelinePolicy { | ||
| private final String account; | ||
| private final VendedAdlsCredentialProvider vendedAdlsCredentialProvider; | ||
| private AzureSasCredential azureSasCredential; | ||
| private AzureSasCredentialPolicy azureSasCredentialPolicy; | ||
|
|
||
| VendedAzureSasCredentialPolicy( | ||
| String account, VendedAdlsCredentialProvider vendedAdlsCredentialProvider) { | ||
| this.account = account; | ||
| this.vendedAdlsCredentialProvider = vendedAdlsCredentialProvider; | ||
| } | ||
|
|
||
| @Override | ||
| public Mono<HttpResponse> process( | ||
| HttpPipelineCallContext httpPipelineCallContext, | ||
| HttpPipelineNextPolicy httpPipelineNextPolicy) { | ||
| maybeUpdateCredential(); | ||
| return azureSasCredentialPolicy.process(httpPipelineCallContext, httpPipelineNextPolicy); | ||
| } | ||
|
|
||
| @Override | ||
| public HttpResponse processSync( | ||
| HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next) { | ||
| maybeUpdateCredential(); | ||
| return azureSasCredentialPolicy.processSync(context, next); | ||
| } | ||
|
|
||
| private void maybeUpdateCredential() { | ||
| String sasToken = vendedAdlsCredentialProvider.credentialForAccount(account); | ||
| if (azureSasCredential == null) { | ||
| this.azureSasCredential = new AzureSasCredential(sasToken); | ||
| this.azureSasCredentialPolicy = new AzureSasCredentialPolicy(azureSasCredential, false); | ||
| } else { | ||
| azureSasCredential.update(sasToken); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.