Skip to content

Commit

Permalink
Remove deprecated AWS 2 client providers (closes apache#26681)
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack authored and cushon committed May 24, 2024
1 parent 07a63a7 commit ab2c5ea
Show file tree
Hide file tree
Showing 30 changed files with 89 additions and 1,621 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

* Passing a tag into MultiProcessShared is now required in the Python SDK ([#26168](https://github.com/apache/beam/issues/26168)).
* CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)).
* AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)).

## Deprecations

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import static java.util.stream.Collectors.toList;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
Expand All @@ -43,7 +39,6 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.common.RetryConfiguration;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -169,9 +164,7 @@ public static <T> Write<T> write() {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

abstract @Nullable ClientConfiguration getClientConfiguration();

abstract @Nullable DynamoDbClientProvider getDynamoDbClientProvider();
abstract ClientConfiguration getClientConfiguration();

abstract @Nullable SerializableFunction<Void, ScanRequest> getScanRequestFn();

Expand All @@ -188,8 +181,6 @@ abstract static class Builder<T> {

abstract Builder<T> setClientConfiguration(ClientConfiguration config);

abstract Builder<T> setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider);

abstract Builder<T> setScanRequestFn(SerializableFunction<Void, ScanRequest> fn);

abstract Builder<T> setSegmentId(Integer segmentId);
Expand All @@ -201,49 +192,8 @@ abstract static class Builder<T> {
abstract Read<T> build();
}

/**
* @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively
* you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}.
*/
@Deprecated
public Read<T> withDynamoDbClientProvider(DynamoDbClientProvider clientProvider) {
checkArgument(clientProvider != null, "DynamoDbClientProvider cannot be null");
return toBuilder()
.setClientConfiguration(null)
.setDynamoDbClientProvider(clientProvider)
.build();
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Read<T> withDynamoDbClientProvider(
AwsCredentialsProvider credentials, String region, URI endpoint) {
return updateClientConfig(
b ->
b.credentialsProvider(credentials)
.region(Region.of(region))
.endpoint(endpoint)
.build());
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Read<T> withDynamoDbClientProvider(AwsCredentialsProvider credentials, String region) {
return updateClientConfig(
b -> b.credentialsProvider(credentials).region(Region.of(region)).build());
}

/** Configuration of DynamoDB client. */
public Read<T> withClientConfiguration(ClientConfiguration config) {
return updateClientConfig(ignore -> config);
}

private Read<T> updateClientConfig(
Function<ClientConfiguration.Builder, ClientConfiguration> fn) {
checkState(
getDynamoDbClientProvider() == null,
"Legacy DynamoDbClientProvider is set, but incompatible with ClientConfiguration.");
ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder());
checkArgument(config != null, "ClientConfiguration cannot be null");
return toBuilder().setClientConfiguration(config).build();
}
Expand Down Expand Up @@ -288,11 +238,8 @@ public PCollection<T> expand(PBegin input) {
(scanRequest.totalSegments() != null && scanRequest.totalSegments() > 0),
"TotalSegments is required with withScanRequestFn() and greater zero");

if (getDynamoDbClientProvider() == null) {
checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null");
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());
}
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());

PCollection<Read<T>> splits =
input.apply("Create", Create.of(this)).apply("Split", ParDo.of(new SplitFn<>()));
Expand Down Expand Up @@ -320,10 +267,6 @@ public void processElement(@Element Read<T> spec, OutputReceiver<Read<T>> out) {
/** A {@link DoFn} executing the ScanRequest to read from DynamoDb. */
private static class ReadFn<T> extends DoFn<Read<T>, T> {
private DynamoDbClient buildClient(Read<T> spec, AwsOptions opts) {
if (spec.getDynamoDbClientProvider() != null) {
// build client using legacy DynamoDbClientProvider
return spec.getDynamoDbClientProvider().getDynamoDbClient();
}
return ClientBuilderFactory.buildClient(
opts, DynamoDbClient.builder(), spec.getClientConfiguration());
}
Expand Down Expand Up @@ -364,61 +307,11 @@ public List<Map<String, AttributeValue>> apply(@Nullable ScanResponse scanRespon
}
}

/**
* Legacy retry configuration.
*
* <p><b>Warning</b>: Max accumulative retry latency is silently ignored as it is not supported by
* the AWS SDK.
*
* @deprecated Use {@link org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to
* delegate retries to the AWS SDK.
*/
@AutoValue
@Deprecated
public abstract static class RetryConfiguration implements Serializable {
abstract int getMaxAttempts();

abstract Duration getMaxDuration();

public static Builder builder() {
return new AutoValue_DynamoDBIO_RetryConfiguration.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setMaxAttempts(int maxAttempts);

/**
* @deprecated <b>Warning</b>, max accumulative retry latency is silently ignored as it is not
* supported by the AWS SDK.
*/
@Deprecated
public abstract Builder setMaxDuration(Duration maxDuration);

abstract RetryConfiguration autoBuild();

public RetryConfiguration build() {
RetryConfiguration config = autoBuild();
checkArgument(config.getMaxAttempts() > 0, "maxAttempts should be greater than 0");
return config;
}
}

org.apache.beam.sdk.io.aws2.common.RetryConfiguration convertLegacyConfig() {
int totalAttempts = getMaxAttempts() * 3; // 3 SDK attempts per user attempt
return org.apache.beam.sdk.io.aws2.common.RetryConfiguration.builder()
.numRetries(totalAttempts - 1)
.build();
}
}

/** Write a PCollection<T> data into DynamoDB. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PCollection<Void>> {

abstract @Nullable ClientConfiguration getClientConfiguration();

abstract @Nullable DynamoDbClientProvider getDynamoDbClientProvider();
abstract ClientConfiguration getClientConfiguration();

abstract @Nullable SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn();

Expand All @@ -430,8 +323,6 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PCollec
abstract static class Builder<T> {
abstract Builder<T> setClientConfiguration(ClientConfiguration config);

abstract Builder<T> setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider);

abstract Builder<T> setWriteItemMapperFn(
SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn);

Expand All @@ -440,65 +331,12 @@ abstract Builder<T> setWriteItemMapperFn(
abstract Write<T> build();
}

/**
* @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. Alternatively
* you can configure a custom {@link ClientBuilderFactory} in {@link AwsOptions}.
*/
@Deprecated
public Write<T> withDynamoDbClientProvider(DynamoDbClientProvider clientProvider) {
checkArgument(clientProvider != null, "DynamoDbClientProvider cannot be null");
return toBuilder()
.setClientConfiguration(null)
.setDynamoDbClientProvider(clientProvider)
.build();
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Write<T> withDynamoDbClientProvider(
AwsCredentialsProvider credentials, String region, URI endpoint) {
return updateClientConfig(
b ->
b.credentialsProvider(credentials)
.region(Region.of(region))
.endpoint(endpoint)
.build());
}

/** @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} instead. */
@Deprecated
public Write<T> withDynamoDbClientProvider(AwsCredentialsProvider credentials, String region) {
return updateClientConfig(
b -> b.credentialsProvider(credentials).region(Region.of(region)).build());
}

/** Configuration of DynamoDB client. */
public Write<T> withClientConfiguration(ClientConfiguration config) {
return updateClientConfig(ignore -> config);
}

private Write<T> updateClientConfig(
Function<ClientConfiguration.Builder, ClientConfiguration> fn) {
checkState(
getDynamoDbClientProvider() == null,
"Legacy DynamoDbClientProvider is set, but incompatible with ClientConfiguration.");
ClientConfiguration config = fn.apply(getClientConfiguration().toBuilder());
checkArgument(config != null, "ClientConfiguration cannot be null");
return toBuilder().setClientConfiguration(config).build();
}

/**
* Retry configuration of DynamoDB client.
*
* @deprecated Use {@link #withClientConfiguration(ClientConfiguration)} with {@link
* org.apache.beam.sdk.io.aws2.common.RetryConfiguration} instead to delegate retries to the
* AWS SDK.
*/
@Deprecated
public Write<T> withRetryConfiguration(RetryConfiguration retry) {
return updateClientConfig(b -> b.retry(retry.convertLegacyConfig()).build());
}

public Write<T> withWriteRequestMapperFn(
SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn) {
return toBuilder().setWriteItemMapperFn(writeItemMapperFn).build();
Expand All @@ -510,11 +348,9 @@ public Write<T> withDeduplicateKeys(List<String> deduplicateKeys) {

@Override
public PCollection<Void> expand(PCollection<T> input) {
if (getDynamoDbClientProvider() == null) {
checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null");
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());
}
checkNotNull(getClientConfiguration(), "clientConfiguration cannot be null");
AwsOptions awsOptions = input.getPipeline().getOptions().as(AwsOptions.class);
ClientBuilderFactory.validate(awsOptions, getClientConfiguration());

return input.apply(ParDo.of(new WriteFn<>(this)));
}
Expand Down Expand Up @@ -544,14 +380,8 @@ static class WriteFn<T> extends DoFn<T, Void> {
@Setup
public void setup(PipelineOptions options) {
ClientConfiguration clientConfig = spec.getClientConfiguration();
if (spec.getDynamoDbClientProvider() != null) {
// build client using legacy DynamoDbClientProvider
client = spec.getDynamoDbClientProvider().getDynamoDbClient();
} else {
AwsOptions awsOpts = options.as(AwsOptions.class);
client =
ClientBuilderFactory.buildClient(awsOpts, DynamoDbClient.builder(), clientConfig);
}
AwsOptions awsOpts = options.as(AwsOptions.class);
client = ClientBuilderFactory.buildClient(awsOpts, DynamoDbClient.builder(), clientConfig);

// resume from partial failures
resumeBackoff = FluentBackoff.DEFAULT.withMaxRetries(BATCH_SIZE);
Expand Down
Loading

0 comments on commit ab2c5ea

Please sign in to comment.