Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.ParameterizedTypeName;
import com.squareup.javapoet.TypeSpec;
import com.squareup.javapoet.WildcardTypeName;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
Expand All @@ -58,9 +60,11 @@
import software.amazon.awssdk.core.endpointdiscovery.EndpointDiscoveryRefreshCache;
import software.amazon.awssdk.core.endpointdiscovery.EndpointDiscoveryRequest;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;

public class SyncClientClass extends SyncClientInterface {
Expand Down Expand Up @@ -258,11 +262,18 @@ private List<MethodSpec> operationMethodSpecs(OperationModel opModel) {
method.addStatement("$T cachedEndpoint = null", URI.class);
method.beginControlFlow("if (endpointDiscoveryEnabled)");

method.addCode("$T key = $N.overrideConfiguration()", String.class, opModel.getInput().getVariableName())
ParameterizedTypeName identityFutureTypeName =
ParameterizedTypeName.get(ClassName.get(CompletableFuture.class),
WildcardTypeName.subtypeOf(AwsCredentialsIdentity.class));
method.addCode("$T identityFuture = $N.overrideConfiguration()",
identityFutureTypeName,
opModel.getInput().getVariableName())
.addCode(" .flatMap($T::credentialsIdentityProvider)", AwsRequestOverrideConfiguration.class)
.addCode(" .orElseGet(() -> clientConfiguration.option($T.CREDENTIALS_IDENTITY_PROVIDER))",
AwsClientOption.class)
.addCode(" .resolveIdentity().join().accessKeyId();");
.addCode(" .resolveIdentity();");

method.addCode("$T key = $T.joinLikeSync(identityFuture).accessKeyId();", String.class, CompletableFutureUtils.class);

method.addCode("$1T endpointDiscoveryRequest = $1T.builder()", EndpointDiscoveryRequest.class)
.addCode(" .required($L)", opModel.getInputShape().getEndpointDiscovery().isRequired())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.Generated;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
Expand All @@ -19,6 +20,7 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
Expand All @@ -39,6 +41,7 @@
import software.amazon.awssdk.services.endpointdiscoverytest.transform.TestDiscoveryIdentifiersRequiredRequestMarshaller;
import software.amazon.awssdk.services.endpointdiscoverytest.transform.TestDiscoveryOptionalRequestMarshaller;
import software.amazon.awssdk.services.endpointdiscoverytest.transform.TestDiscoveryRequiredRequestMarshaller;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;

/**
Expand Down Expand Up @@ -154,10 +157,12 @@ public TestDiscoveryIdentifiersRequiredResponse testDiscoveryIdentifiersRequired
}
URI cachedEndpoint = null;
if (endpointDiscoveryEnabled) {
String key = testDiscoveryIdentifiersRequiredRequest.overrideConfiguration()
.flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
.resolveIdentity().join().accessKeyId();
CompletableFuture<? extends AwsCredentialsIdentity> identityFuture =
testDiscoveryIdentifiersRequiredRequest.overrideConfiguration()
.flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
.resolveIdentity();
String key = CompletableFutureUtils.joinLikeSync(identityFuture).accessKeyId();
EndpointDiscoveryRequest endpointDiscoveryRequest = EndpointDiscoveryRequest.builder().required(true)
.defaultEndpoint(clientConfiguration.option(SdkClientOption.ENDPOINT))
.overrideConfiguration(testDiscoveryIdentifiersRequiredRequest.overrideConfiguration().orElse(null)).build();
Expand Down Expand Up @@ -211,10 +216,12 @@ public TestDiscoveryOptionalResponse testDiscoveryOptional(TestDiscoveryOptional
boolean endpointOverridden = clientConfiguration.option(SdkClientOption.ENDPOINT_OVERRIDDEN) == Boolean.TRUE;
URI cachedEndpoint = null;
if (endpointDiscoveryEnabled) {
String key = testDiscoveryOptionalRequest.overrideConfiguration()
.flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
.resolveIdentity().join().accessKeyId();
CompletableFuture<? extends AwsCredentialsIdentity> identityFuture =
testDiscoveryOptionalRequest.overrideConfiguration()
.flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
.resolveIdentity();
String key = CompletableFutureUtils.joinLikeSync(identityFuture).accessKeyId();
EndpointDiscoveryRequest endpointDiscoveryRequest = EndpointDiscoveryRequest.builder().required(false)
.defaultEndpoint(clientConfiguration.option(SdkClientOption.ENDPOINT))
.overrideConfiguration(testDiscoveryOptionalRequest.overrideConfiguration().orElse(null)).build();
Expand Down Expand Up @@ -275,10 +282,12 @@ public TestDiscoveryRequiredResponse testDiscoveryRequired(TestDiscoveryRequired
}
URI cachedEndpoint = null;
if (endpointDiscoveryEnabled) {
String key = testDiscoveryRequiredRequest.overrideConfiguration()
.flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
.resolveIdentity().join().accessKeyId();
CompletableFuture<? extends AwsCredentialsIdentity> identityFuture =
testDiscoveryRequiredRequest.overrideConfiguration()
.flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
.resolveIdentity();
String key = CompletableFutureUtils.joinLikeSync(identityFuture).accessKeyId();
EndpointDiscoveryRequest endpointDiscoveryRequest = EndpointDiscoveryRequest.builder().required(true)
.defaultEndpoint(clientConfiguration.option(SdkClientOption.ENDPOINT))
.overrideConfiguration(testDiscoveryRequiredRequest.overrideConfiguration().orElse(null)).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.SdkAutoCloseable;
Expand Down Expand Up @@ -99,15 +100,13 @@ public static AwsCredentialsProviderChain of(IdentityProvider<? extends AwsCrede
@Override
public AwsCredentials resolveCredentials() {
if (reuseLastProviderEnabled && lastUsedProvider != null) {
// TODO: Exception handling for join?
return CredentialUtils.toCredentials(lastUsedProvider.resolveIdentity().join());
return CredentialUtils.toCredentials(CompletableFutureUtils.joinLikeSync(lastUsedProvider.resolveIdentity()));
}

List<String> exceptionMessages = null;
for (IdentityProvider<? extends AwsCredentialsIdentity> provider : credentialsProviders) {
try {
// TODO: Exception handling for join?
AwsCredentialsIdentity credentials = provider.resolveIdentity().join();
AwsCredentialsIdentity credentials = CompletableFutureUtils.joinLikeSync(provider.resolveIdentity());

log.debug(() -> "Loading credentials from " + provider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.AwsSessionCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.utils.CompletableFutureUtils;

@SdkProtectedApi
public final class CredentialUtils {
Expand Down Expand Up @@ -101,8 +102,8 @@ public static AwsCredentialsProvider toCredentialsProvider(
return (AwsCredentialsProvider) identityProvider;
}
return () -> {
// TODO: Exception handling for CompletionException thrown from join?
AwsCredentialsIdentity awsCredentialsIdentity = identityProvider.resolveIdentity().join();
AwsCredentialsIdentity awsCredentialsIdentity =
CompletableFutureUtils.joinLikeSync(identityProvider.resolveIdentity());
return toCredentials(awsCredentialsIdentity);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.identity.spi.TokenIdentity;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.SdkAutoCloseable;
Expand Down Expand Up @@ -95,15 +96,13 @@ public static SdkTokenProviderChain of(IdentityProvider<? extends TokenIdentity>
@Override
public SdkToken resolveToken() {
if (reuseLastProviderEnabled && lastUsedProvider != null) {
// TODO: Exception handling for join?
return TokenUtils.toSdkToken(lastUsedProvider.resolveIdentity().join());
return TokenUtils.toSdkToken(CompletableFutureUtils.joinLikeSync(lastUsedProvider.resolveIdentity()));
}

List<String> exceptionMessages = null;
for (IdentityProvider<? extends TokenIdentity> provider : sdkTokenProviders) {
try {
// TODO: Exception handling for join?
TokenIdentity token = provider.resolveIdentity().join();
TokenIdentity token = CompletableFutureUtils.joinLikeSync(provider.resolveIdentity());

log.debug(() -> "Loading token from " + provider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import software.amazon.awssdk.services.polly.presigner.PollyPresigner;
import software.amazon.awssdk.services.polly.presigner.model.PresignedSynthesizeSpeechRequest;
import software.amazon.awssdk.services.polly.presigner.model.SynthesizeSpeechPresignRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Validate;

Expand Down Expand Up @@ -212,7 +213,7 @@ private IdentityProvider<? extends AwsCredentialsIdentity> resolveCredentialsPro
}

private AwsCredentialsIdentity resolveCredentials(IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
return credentialsProvider.resolveIdentity().join();
return CompletableFutureUtils.joinLikeSync(credentialsProvider.resolveIdentity());
}

private Presigner resolvePresigner(PollyRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.rds.model.GenerateAuthenticationTokenRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.StringUtils;

@Immutable
Expand Down Expand Up @@ -111,11 +112,12 @@ private Region resolveRegion(GenerateAuthenticationTokenRequest request) {
// TODO: update this to use AwsCredentialsIdentity when we migrate Signers to accept the new type.
private AwsCredentials resolveCredentials(GenerateAuthenticationTokenRequest request) {
if (request.credentialsIdentityProvider() != null) {
return CredentialUtils.toCredentials(request.credentialsIdentityProvider().resolveIdentity().join());
return CredentialUtils.toCredentials(
CompletableFutureUtils.joinLikeSync(request.credentialsIdentityProvider().resolveIdentity()));
}

if (this.credentialsProvider != null) {
return CredentialUtils.toCredentials(this.credentialsProvider.resolveIdentity().join());
return CredentialUtils.toCredentials(CompletableFutureUtils.joinLikeSync(this.credentialsProvider.resolveIdentity()));
}

throw new IllegalArgumentException("CredentialProvider should be provided either in GenerateAuthenticationTokenRequest " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.AwsSessionCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/**
Expand All @@ -45,8 +46,8 @@ public CrtCredentialsProviderAdapter(IdentityProvider<? extends AwsCredentialsId
return Credentials.createAnonymousCredentials();
}

// TODO: Exception handling for join?
AwsCredentialsIdentity sdkCredentials = credentialsProvider.resolveIdentity().join();
AwsCredentialsIdentity sdkCredentials =
CompletableFutureUtils.joinLikeSync(credentialsProvider.resolveIdentity());
byte[] accessKey = sdkCredentials.accessKeyId().getBytes(StandardCharsets.UTF_8);
byte[] secreteKey = sdkCredentials.secretAccessKey().getBytes(StandardCharsets.UTF_8);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,22 @@ public static void joinInterruptiblyIgnoringFailures(CompletableFuture<?> future
// Ignore
}
}

/**
* Joins (interruptibly) on the future, and re-throws any RuntimeExceptions or Errors just like the async task would have
* thrown if it was executed synchronously.
*/
public static <T> T joinLikeSync(CompletableFuture<T> future) {
try {
return joinInterruptibly(future);
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
// Make sure we don't lose the context of where the join is in the stack...
cause.addSuppressed(new RuntimeException("Task failed."));
throw (RuntimeException) cause;
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.fail;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.AfterClass;
Expand Down Expand Up @@ -159,4 +161,49 @@ public void allOfExceptionForwarded_allFutureSucceed_shouldComplete() {
assertThat(resultFuture.isDone()).isTrue();
assertThat(resultFuture.isCompletedExceptionally()).isFalse();
}
}

@Test(timeout = 1000)
public void joinLikeSync_completesExceptionally_throwsUnderlyingException() {
Exception e = new RuntimeException("BOOM");
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);

assertThatThrownBy(() -> CompletableFutureUtils.joinLikeSync(future))
.hasSuppressedException(new RuntimeException("Task failed."))
.isEqualTo(e);
}

@Test(timeout = 1000)
public void joinLikeSync_completesExceptionallyChecked_throwsCompletionException() {
Exception e = new Exception("BOOM");
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);

assertThatThrownBy(() -> CompletableFutureUtils.joinLikeSync(future))
.hasNoSuppressedExceptions()
.hasCause(e)
.isInstanceOf(CompletionException.class);
}

@Test(timeout = 1000)
public void joinLikeSync_completesExceptionallyWithError_throwsError() {
Error e = new Error("BOOM");
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(e);

assertThatThrownBy(() -> CompletableFutureUtils.joinLikeSync(future))
.hasNoSuppressedExceptions()
.isEqualTo(e);
}

@Test(timeout = 1000)
public void joinLikeSync_canceled_throwsCancellationException() {
Exception e = new Exception("BOOM");
CompletableFuture future = new CompletableFuture();
future.cancel(false);

assertThatThrownBy(() -> CompletableFutureUtils.joinLikeSync(future))
.hasNoSuppressedExceptions()
.hasNoCause()
.isInstanceOf(CancellationException.class);
}}