From 80779541a68bee0dd719bba0b738a1177299f6f3 Mon Sep 17 00:00:00 2001 From: Ankush Chatterjee Date: Tue, 27 Jun 2023 11:58:48 +0530 Subject: [PATCH] Migrate kinesis connector to use AWS SDK v2 Co-authored-by: Karan Makhija --- plugin/trino-kinesis/pom.xml | 214 +++++++-- .../plugin/kinesis/KinesisClientManager.java | 58 ++- .../plugin/kinesis/KinesisClientProvider.java | 12 +- .../plugin/kinesis/KinesisRecordSet.java | 64 +-- .../kinesis/KinesisShardCheckpointer.java | 62 +-- .../plugin/kinesis/KinesisSplitManager.java | 36 +- .../kinesis/s3config/S3TableConfigClient.java | 80 ++-- .../kinesis/TestMinimalFunctionality.java | 28 +- .../plugin/kinesis/TestRecordAccess.java | 40 +- .../s3config/TestS3TableConfigClient.java | 45 +- .../kinesis/util/EmbeddedKinesisStream.java | 47 +- .../util/KinesisTestClientManager.java | 28 +- .../kinesis/util/MockKinesisClient.java | 405 ++++++++++-------- .../tableDescriptions/EmptyTable.json | 22 +- pom.xml | 4 +- 15 files changed, 686 insertions(+), 459 deletions(-) diff --git a/plugin/trino-kinesis/pom.xml b/plugin/trino-kinesis/pom.xml index c35b179e89a3..3dd3ea40636b 100644 --- a/plugin/trino-kinesis/pom.xml +++ b/plugin/trino-kinesis/pom.xml @@ -17,52 +17,6 @@ - - com.amazonaws - amazon-kinesis-client - - - - com.amazonaws - aws-java-sdk-core - - - commons-logging - commons-logging - - - joda-time - joda-time - - - - - - com.amazonaws - aws-java-sdk-dynamodb - - - - com.amazonaws - aws-java-sdk-kinesis - - - - com.amazonaws - aws-java-sdk-s3 - ${dep.aws-sdk.version} - - - commons-logging - commons-logging - - - joda-time - joda-time - - - - com.google.guava guava @@ -118,6 +72,155 @@ jakarta.validation-api + + software.amazon.awssdk + apache-client + + + + commons-logging + commons-logging + + + joda-time + joda-time + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + auth + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + aws-core + + + + software.amazon.awssdk + dynamodb + + + commons-logging + commons-logging + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + http-client-spi + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + kinesis + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + netty-nio-client + + + joda-time + joda-time + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + regions + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + s3 + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.awssdk + sdk-core + + + org.reactivestreams + reactive-streams + + + + + + software.amazon.kinesis + amazon-kinesis-client + 2.4.5 + + + com.google.re2j + re2j + + + com.squareup.wire + wire-compiler + + + com.squareup.wire + wire-runtime + + + com.squareup.wire + wire-schema + + + commons-logging + commons-logging + + + + com.fasterxml.jackson.core jackson-annotations @@ -173,6 +276,12 @@ test + + io.trino + trino-testing-services + test + + org.assertj assertj-core @@ -198,6 +307,7 @@ **/TestS3TableConfigClient.java + us-east-1 ACCESS-KEY SECRET-KEY s3://S3-LOC @@ -211,9 +321,21 @@ google/protobuf/.*\.proto$ + AUTHORS + wire/extensions.proto + checkstyle.xml + + org.apache.maven.plugins + maven-dependency-plugin + + + software.amazon.kinesis:kinesis-client + + + diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java index 591775001a1b..343a61d8ee37 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java @@ -13,12 +13,17 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.s3.AmazonS3Client; import com.google.inject.Inject; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.s3.S3Client; import static com.google.common.base.Strings.isNullOrEmpty; @@ -31,44 +36,55 @@ public class KinesisClientManager implements KinesisClientProvider { - private final AmazonKinesisClient client; - private final AmazonS3Client amazonS3Client; - private final AmazonDynamoDBClient dynamoDbClient; // for Checkpointing + private final KinesisClient client; + private final S3Client amazonS3Client; + private final DynamoDbAsyncClient dynamoDbClient; // for Checkpointing @Inject public KinesisClientManager(KinesisConfig config) { + AwsCredentialsProvider credentialsProvider; if (!isNullOrEmpty(config.getAccessKey()) && !isNullOrEmpty(config.getSecretKey())) { - BasicAWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey()); - this.client = new AmazonKinesisClient(awsCredentials); - this.amazonS3Client = new AmazonS3Client(awsCredentials); - this.dynamoDbClient = new AmazonDynamoDBClient(awsCredentials); + AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey()); + credentialsProvider = StaticCredentialsProvider.create(awsCredentials); } else { - DefaultAWSCredentialsProviderChain defaultChain = new DefaultAWSCredentialsProviderChain(); - this.client = new AmazonKinesisClient(defaultChain); - this.amazonS3Client = new AmazonS3Client(defaultChain); - this.dynamoDbClient = new AmazonDynamoDBClient(defaultChain); + credentialsProvider = DefaultCredentialsProvider.create(); } - this.client.setEndpoint("kinesis." + config.getAwsRegion() + ".amazonaws.com"); - this.dynamoDbClient.setEndpoint("dynamodb." + config.getAwsRegion() + ".amazonaws.com"); + this.client = KinesisClient.builder() + .credentialsProvider(credentialsProvider) + .region(Region.of(config.getAwsRegion())) + .httpClient(ApacheHttpClient.create()) + .build(); + this.amazonS3Client = S3Client.builder() + .credentialsProvider(credentialsProvider) + .region(Region.of(config.getAwsRegion())) + .crossRegionAccessEnabled(true) + .httpClient(ApacheHttpClient.create()) + .build(); + + this.dynamoDbClient = DynamoDbAsyncClient.builder() + .credentialsProvider(credentialsProvider) + .region(Region.of(config.getAwsRegion())) + .httpClient(NettyNioAsyncHttpClient.create()) + .build(); } @Override - public AmazonKinesisClient getClient() + public KinesisClient getClient() { return client; } @Override - public AmazonDynamoDBClient getDynamoDbClient() + public DynamoDbAsyncClient getDynamoDbClient() { return dynamoDbClient; } @Override - public AmazonS3Client getS3Client() + public S3Client getS3Client() { return amazonS3Client; } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java index 67d2609a2fbb..b3b455b6a172 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java @@ -13,9 +13,9 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.s3.AmazonS3Client; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.s3.S3Client; /** * Interface to a client manager that provides the AWS clients needed. @@ -23,9 +23,9 @@ //TODO: This interface needs to be removed and abstraction in unneccesary public interface KinesisClientProvider { - AmazonKinesisClient getClient(); + KinesisClient getClient(); - AmazonDynamoDBClient getDynamoDbClient(); + DynamoDbAsyncClient getDynamoDbClient(); - AmazonS3Client getS3Client(); + S3Client getS3Client(); } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java index c566e1632d0e..8adfd65bf8e2 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisRecordSet.java @@ -13,13 +13,6 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.google.common.collect.ImmutableList; import io.airlift.log.Logger; import io.airlift.slice.Slice; @@ -34,11 +27,18 @@ import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.RecordSet; import io.trino.spi.type.Type; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Date; +import java.time.Instant; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -143,7 +143,7 @@ public class KinesisRecordSet // Initialize checkpoint related code if (checkpointEnabled) { - AmazonDynamoDBClient dynamoDBClient = clientManager.getDynamoDbClient(); + DynamoDbAsyncClient dynamoDBClient = clientManager.getDynamoDbClient(); String dynamoDBTable = split.getStreamName(); int curIterationNumber = getIterationNumber(session); String sessionLogicalName = getCheckpointLogicalName(session); @@ -197,7 +197,7 @@ public class KinesisRecordCursor private List kinesisRecords; private Iterator listIterator; private GetRecordsRequest getRecordsRequest; - private GetRecordsResult getRecordsResult; + private GetRecordsResponse getRecordsResult; @Override public long getCompletedBytes() @@ -283,21 +283,21 @@ private void getKinesisRecords() throw new RuntimeException("thread interrupted"); } } - getRecordsRequest = new GetRecordsRequest(); - getRecordsRequest.setShardIterator(shardIterator); - getRecordsRequest.setLimit(batchSize); + getRecordsRequest = GetRecordsRequest.builder() + .shardIterator(shardIterator) + .limit(batchSize).build(); getRecordsResult = clientManager.getClient().getRecords(getRecordsRequest); lastReadTime = System.nanoTime(); - shardIterator = getRecordsResult.getNextShardIterator(); - kinesisRecords = getRecordsResult.getRecords(); + shardIterator = getRecordsResult.nextShardIterator(); + kinesisRecords = getRecordsResult.records(); if (isLogBatches) { log.info("(%s:%s) Fetched %d records from Kinesis. MillisBehindLatest=%d Attempt=%d", split.getStreamName(), split.getShardId(), kinesisRecords.size(), - getRecordsResult.getMillisBehindLatest(), + getRecordsResult.millisBehindLatest(), attempts); } @@ -313,14 +313,14 @@ private void getKinesisRecords() private boolean nextRow() { Record currentRecord = listIterator.next(); - String partitionKey = currentRecord.getPartitionKey(); + String partitionKey = currentRecord.partitionKey(); log.debug("(%s:%s) Reading record with partition key %s", split.getStreamName(), split.getShardId(), partitionKey); byte[] messageData = EMPTY_BYTE_ARRAY; - ByteBuffer message = currentRecord.getData(); + ByteBuffer message = currentRecord.data().asByteBuffer(); if (message != null) { messageData = new byte[message.remaining()]; message.get(messageData); @@ -351,7 +351,7 @@ private boolean nextRow() currentRowValuesMap.put(columnHandle, longValueProvider(totalMessages)); break; case SHARD_SEQUENCE_ID_FIELD: - currentRowValuesMap.put(columnHandle, bytesValueProvider(currentRecord.getSequenceNumber().getBytes(UTF_8))); + currentRowValuesMap.put(columnHandle, bytesValueProvider(currentRecord.sequenceNumber().getBytes(UTF_8))); break; case MESSAGE_FIELD: currentRowValuesMap.put(columnHandle, bytesValueProvider(messageData)); @@ -407,8 +407,8 @@ private Optional> decodeMessage(byt */ private long getMillisBehindLatest() { - if (getRecordsResult != null && getRecordsResult.getMillisBehindLatest() != null) { - return getRecordsResult.getMillisBehindLatest(); + if (getRecordsResult != null && getRecordsResult.millisBehindLatest() != null) { + return getRecordsResult.millisBehindLatest(); } return MILLIS_BEHIND_LIMIT + 1; } @@ -481,9 +481,9 @@ private void checkFieldType(int field, Class expected) private void getIterator() throws ResourceNotFoundException { - GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); - getShardIteratorRequest.setStreamName(split.getStreamName()); - getShardIteratorRequest.setShardId(split.getShardId()); + GetShardIteratorRequest.Builder getShardIteratorRequest = GetShardIteratorRequest.builder() + .streamName(split.getStreamName()) + .shardId(split.getShardId()); // Explanation: when we have a sequence number from a prior read or checkpoint, always use it. // Otherwise, decide if starting at a timestamp or the trim horizon based on configuration. @@ -491,27 +491,27 @@ private void getIterator() // fallback on starting at STARTING_OFFSET_SECONDS from timestamp. if (lastReadSequenceNumber == null) { if (isIteratorFromTimestamp(session)) { - getShardIteratorRequest.setShardIteratorType("AT_TIMESTAMP"); + getShardIteratorRequest.shardIteratorType("AT_TIMESTAMP"); long iteratorStartTimestamp = getIteratorStartTimestamp(session); if (iteratorStartTimestamp == 0) { long startTimestamp = System.currentTimeMillis() - (getIteratorOffsetSeconds(session) * 1000); - getShardIteratorRequest.setTimestamp(new Date(startTimestamp)); + getShardIteratorRequest.timestamp(Instant.ofEpochMilli(startTimestamp)); } else { - getShardIteratorRequest.setTimestamp(new Date(iteratorStartTimestamp)); + getShardIteratorRequest.timestamp(Instant.ofEpochMilli(iteratorStartTimestamp)); } } else { - getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); + getShardIteratorRequest.shardIteratorType("TRIM_HORIZON"); } } else { - getShardIteratorRequest.setShardIteratorType("AFTER_SEQUENCE_NUMBER"); - getShardIteratorRequest.setStartingSequenceNumber(lastReadSequenceNumber); + getShardIteratorRequest.shardIteratorType("AFTER_SEQUENCE_NUMBER"); + getShardIteratorRequest.startingSequenceNumber(lastReadSequenceNumber); } - GetShardIteratorResult getShardIteratorResult = clientManager.getClient().getShardIterator(getShardIteratorRequest); - shardIterator = getShardIteratorResult.getShardIterator(); + GetShardIteratorResponse getShardIteratorResult = clientManager.getClient().getShardIterator(getShardIteratorRequest.build()); + shardIterator = getShardIteratorResult.shardIterator(); } } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java index f07718b049b9..6f511f2a518a 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisShardCheckpointer.java @@ -13,26 +13,30 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; -import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import io.airlift.log.Logger; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseManagementConfig; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; +import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; +import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class KinesisShardCheckpointer { private static final Logger log = Logger.get(KinesisShardCheckpointer.class); - private KinesisClientLeaseManager leaseManager; - private KinesisSplit kinesisSplit; - private String logicalProcessName; - private int currentIterationNumber; - private KinesisClientLease kinesisClientLease; + private final DynamoDBLeaseRefresher dynamoDBLeaseRefresher; + private final KinesisSplit kinesisSplit; + private final String logicalProcessName; + private final int currentIterationNumber; + private final Lease kinesisClientLease; public KinesisShardCheckpointer( - AmazonDynamoDB dynamoDBClient, + DynamoDbAsyncClient dynamoDBClient, String dynamoDBTable, KinesisSplit kinesisSplit, String logicalProcessName, @@ -40,7 +44,13 @@ public KinesisShardCheckpointer( long dynamoReadCapacity, long dynamoWriteCapacity) { - this(new KinesisClientLeaseManager(dynamoDBTable, dynamoDBClient), + this(new DynamoDBLeaseRefresher(dynamoDBTable, + dynamoDBClient, + new DynamoDBLeaseSerializer(), + false, + TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK, + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, + BillingMode.PAY_PER_REQUEST), kinesisSplit, logicalProcessName, currentIterationNumber, @@ -49,28 +59,28 @@ public KinesisShardCheckpointer( } public KinesisShardCheckpointer( - KinesisClientLeaseManager leaseManager, + DynamoDBLeaseRefresher dynamoDBLeaseRefresher, KinesisSplit kinesisSplit, String logicalProcessName, int currentIterationNumber, long dynamoReadCapacity, long dynamoWriteCapacity) { - this.leaseManager = leaseManager; + this.dynamoDBLeaseRefresher = dynamoDBLeaseRefresher; this.kinesisSplit = kinesisSplit; this.logicalProcessName = logicalProcessName; this.currentIterationNumber = currentIterationNumber; try { - this.leaseManager.createLeaseTableIfNotExists(dynamoReadCapacity, dynamoWriteCapacity); + this.dynamoDBLeaseRefresher.createLeaseTableIfNotExists(dynamoReadCapacity, dynamoWriteCapacity); - KinesisClientLease oldLease = this.leaseManager.getLease(createCheckpointKey(currentIterationNumber)); + Lease oldLease = this.dynamoDBLeaseRefresher.getLease(createCheckpointKey(currentIterationNumber)); if (oldLease != null) { this.kinesisClientLease = oldLease; } else { - this.kinesisClientLease = new KinesisClientLease(); - this.kinesisClientLease.setLeaseKey(createCheckpointKey(currentIterationNumber)); + this.kinesisClientLease = new Lease(); + this.kinesisClientLease.leaseKey(createCheckpointKey(currentIterationNumber)); } } catch (ProvisionedThroughputException | InvalidStateException | DependencyException e) { @@ -96,9 +106,9 @@ public void checkpoint(String lastReadSequenceNumber) log.info("Trying to checkpoint at %s", lastReadSequenceNumber); try { ExtendedSequenceNumber esn = new ExtendedSequenceNumber(lastReadSequenceNumber); - kinesisClientLease.setCheckpoint(esn); - leaseManager.createLeaseIfNotExists(kinesisClientLease); - if (!leaseManager.updateLease(kinesisClientLease)) { + kinesisClientLease.checkpoint(esn); + dynamoDBLeaseRefresher.createLeaseIfNotExists(kinesisClientLease); + if (!dynamoDBLeaseRefresher.updateLease(kinesisClientLease)) { log.warn("Checkpointing unsuccessful"); } } @@ -112,16 +122,16 @@ public String getLastReadSeqNumber() { String lastReadSeqNumber = null; if (currentIterationNumber > 0) { - KinesisClientLease oldLease; + Lease oldLease; try { - oldLease = leaseManager.getLease(createCheckpointKey(currentIterationNumber - 1)); + oldLease = dynamoDBLeaseRefresher.getLease(createCheckpointKey(currentIterationNumber - 1)); } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { throw new RuntimeException(e); } if (oldLease != null) { // ExtendedSequenceNumber type in latest API: - lastReadSeqNumber = oldLease.getCheckpoint().toString(); + lastReadSeqNumber = oldLease.checkpoint().toString(); } } if (lastReadSeqNumber == null) { diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java index 9ffc74655c19..34eaa4566c0c 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java @@ -13,10 +13,6 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.Shard; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.airlift.units.Duration; @@ -29,6 +25,10 @@ import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.Shard; import java.util.ArrayList; import java.util.Collections; @@ -113,9 +113,9 @@ public ConnectorSplitSource getSplits( kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), kinesisTableHandle.getCompressionCodec(), - shard.getShardId(), - shard.getSequenceNumberRange().getStartingSequenceNumber(), - shard.getSequenceNumberRange().getEndingSequenceNumber()); + shard.shardId(), + shard.sequenceNumberRange().startingSequenceNumber(), + shard.sequenceNumberRange().endingSequenceNumber()); builder.add(split); } @@ -133,25 +133,25 @@ protected InternalStreamDescription getStreamDescription(String streamName) if (internalStreamDescription == null || System.currentTimeMillis() - internalStreamDescription.getCreateTimeStamp() >= MAX_CACHE_AGE_MILLIS) { internalStreamDescription = new InternalStreamDescription(streamName); - DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); + DescribeStreamRequest.Builder describeStreamRequest = DescribeStreamRequest.builder() + .streamName(streamName); // Collect shards from Kinesis String exclusiveStartShardId = null; - List shards = new ArrayList<>(); + do { - describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); - DescribeStreamResult describeStreamResult = clientManager.getClient().describeStream(describeStreamRequest); + describeStreamRequest.exclusiveStartShardId(exclusiveStartShardId); + DescribeStreamResponse describeStreamResult = clientManager.getClient().describeStream(describeStreamRequest.build()); - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); + String streamStatus = describeStreamResult.streamDescription().streamStatusAsString(); if (!streamStatus.equals("ACTIVE") && !streamStatus.equals("UPDATING")) { - throw new ResourceNotFoundException("Stream not Active"); + throw ResourceNotFoundException.builder().message("Stream not Active").build(); } - internalStreamDescription.addAllShards(describeStreamResult.getStreamDescription().getShards()); - - if (describeStreamResult.getStreamDescription().getHasMoreShards() && (shards.size() > 0)) { - exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); + internalStreamDescription.addAllShards(describeStreamResult.streamDescription().shards()); + int shardSize = internalStreamDescription.getShards().size(); + if (describeStreamResult.streamDescription().hasMoreShards() && (shardSize > 0)) { + exclusiveStartShardId = internalStreamDescription.getShards().get(shardSize - 1).shardId(); } else { exclusiveStartShardId = null; diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java index 6806bd63a65f..0859211b5831 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java @@ -13,14 +13,6 @@ */ package io.trino.plugin.kinesis.s3config; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.AmazonS3URI; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableMap; import com.google.common.io.CharStreams; import com.google.inject.Inject; @@ -32,10 +24,20 @@ import io.trino.plugin.kinesis.KinesisStreamDescription; import io.trino.spi.connector.SchemaTableName; import jakarta.annotation.PostConstruct; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Uri; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -67,7 +69,7 @@ public class S3TableConfigClient private final KinesisClientProvider clientManager; private final JsonCodec streamDescriptionCodec; private final Duration tableDescriptionRefreshInterval; - private final Optional bucketUrl; + private final Optional bucketS3Uri; private volatile long lastCheck; private volatile ScheduledFuture updateTaskHandle; @@ -85,17 +87,18 @@ public S3TableConfigClient( // If using S3 start thread that periodically looks for updates if (connectorConfig.getTableDescriptionLocation().startsWith("s3://")) { - this.bucketUrl = Optional.of(connectorConfig.getTableDescriptionLocation()); + String bucketURL = connectorConfig.getTableDescriptionLocation(); + this.bucketS3Uri = Optional.of(clientManager.getS3Client().utilities().parseUri(URI.create(bucketURL))); } else { - this.bucketUrl = Optional.empty(); + this.bucketS3Uri = Optional.empty(); } } @PostConstruct protected void startS3Updates() { - if (this.bucketUrl.isPresent()) { + if (this.bucketS3Uri.isPresent()) { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); this.updateTaskHandle = scheduler.scheduleAtFixedRate(this::updateTablesFromS3, 5_000, tableDescriptionRefreshInterval.toMillis(), TimeUnit.MILLISECONDS); } @@ -106,7 +109,7 @@ protected void startS3Updates() */ public boolean isUsingS3() { - return bucketUrl.isPresent() && (bucketUrl.get().startsWith("s3://")); + return bucketS3Uri.isPresent() && (bucketS3Uri.get().uri().toString().startsWith("s3://")); } /** @@ -139,32 +142,32 @@ public void run() *

* This is an object list request to AWS in the given "directory". */ - private List getObjectSummaries() + private List getS3Objects() { - AmazonS3Client s3client = clientManager.getS3Client(); - AmazonS3URI directoryURI = new AmazonS3URI(bucketUrl.get()); + S3Client s3client = clientManager.getS3Client(); + S3Uri directoryURI = bucketS3Uri.get(); - List result = new ArrayList<>(); + List result = new ArrayList<>(); try { - log.info("Getting the listing of objects in the S3 table config directory: bucket %s prefix %s :", directoryURI.getBucket(), directoryURI.getKey()); - ListObjectsRequest request = new ListObjectsRequest() - .withBucketName(directoryURI.getBucket()) - .withPrefix(directoryURI.getKey() + "/") - .withDelimiter("/") - .withMaxKeys(25); - ObjectListing response; + log.info("Getting the listing of objects in the S3 table config directory: bucket %s prefix %s :", directoryURI.bucket().get(), directoryURI.key().get()); + ListObjectsRequest request = ListObjectsRequest.builder() + .bucket(directoryURI.bucket().get()) + .prefix(directoryURI.key().get() + "/") + .delimiter("/") + .maxKeys(25).build(); + ListObjectsResponse response; do { response = s3client.listObjects(request); - result.addAll(response.getObjectSummaries()); - request.setMarker(response.getNextMarker()); + result.addAll(response.contents()); + request = request.toBuilder().marker(response.nextMarker()).build(); } while (response.isTruncated()); log.info("Completed getting S3 object listing."); } - catch (AmazonClientException e) { + catch (SdkException e) { log.error("Skipping update as faced error fetching table descriptions from S3 %s", e); } return result; @@ -178,22 +181,25 @@ private void updateTablesFromS3() { long now = System.currentTimeMillis(); - AmazonS3Client s3client = clientManager.getS3Client(); + S3Client s3client = clientManager.getS3Client(); + String bucket = bucketS3Uri.get().bucket().get(); - for (S3ObjectSummary summary : getObjectSummaries()) { - if (!descriptors.containsKey(summary.getKey()) || summary.getLastModified().getTime() >= lastCheck) { + for (S3Object s3Object : getS3Objects()) { + if (!descriptors.containsKey(s3Object.key()) || s3Object.lastModified().toEpochMilli() >= lastCheck) { // New or updated file, so we must read from AWS - if (summary.getKey().endsWith("/")) { + if (s3Object.key().endsWith("/")) { continue; } - log.info("Getting : %s - %s", summary.getBucketName(), summary.getKey()); - S3Object object = s3client.getObject(new GetObjectRequest(summary.getBucketName(), summary.getKey())); - - try (BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent(), UTF_8))) { + log.info("Getting : %s - %s", bucket, s3Object.key()); + GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket) + .key(s3Object.key()).build(); + ResponseInputStream object = s3client.getObject(request); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(object, UTF_8))) { KinesisStreamDescription table = streamDescriptionCodec.fromJson(CharStreams.toString(reader)); - descriptors.put(summary.getKey(), table); - log.info("Put table description into the map from %s", summary.getKey()); + descriptors.put(s3Object.key(), table); + log.info("Put table description into the map from %s", s3Object.key()); } catch (IOException iox) { log.error(iox, "Problem reading input stream from object."); diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestMinimalFunctionality.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestMinimalFunctionality.java index 584950e46d6f..dfa198a37e2c 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestMinimalFunctionality.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestMinimalFunctionality.java @@ -13,8 +13,6 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; import io.trino.Session; import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.SessionPropertyManager; @@ -32,9 +30,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Parameters; import org.testng.annotations.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; import java.io.File; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -123,18 +123,19 @@ public void spinUp(String accessKey, String secretKey) private void createMessages(String streamName, long count) { - PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - putRecordsRequest.setStreamName(streamName); + PutRecordsRequest.Builder putRecordsRequest = PutRecordsRequest.builder() + .streamName(streamName); List putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < count; i++) { - PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); - putRecordsRequestEntry.setData(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes(UTF_8))); - putRecordsRequestEntry.setPartitionKey(Long.toString(i)); + PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder() + .data(SdkBytes.fromByteArray(UUID.randomUUID().toString().getBytes(UTF_8))) + .partitionKey(Long.toString(i)) + .build(); putRecordsRequestEntryList.add(putRecordsRequestEntry); } - putRecordsRequest.setRecords(putRecordsRequestEntryList); - embeddedKinesisStream.getKinesisClient().putRecords(putRecordsRequest); + putRecordsRequest.records(putRecordsRequestEntryList); + embeddedKinesisStream.getKinesisClient().putRecords(putRecordsRequest.build()); } @Test @@ -154,13 +155,13 @@ public void testStreamExists() public void testStreamHasData() { assertThat(assertions.query("SELECT COUNT(1) FROM " + streamName)) - .matches("VALUES 0"); + .matches("VALUES cast(0 as bigint)"); long count = 500L; createMessages(streamName, count); - assertThat(assertions.query("SELECT COUNT(1) FROM " + streamName)) - .matches("VALUES %s".formatted(count)); + assertThat(assertions.query("SELECT COUNT(1) FROM " + streamName)).skippingTypesCheck() + .matches("VALUES cast(%s as bigint)".formatted(count)); } @AfterMethod(alwaysRun = true) @@ -169,5 +170,6 @@ public void tearDown() embeddedKinesisStream.deleteStream(streamName); queryRunner.close(); queryRunner = null; + assertions.close(); } } diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestRecordAccess.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestRecordAccess.java index 420615d2acd3..c873502d8550 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestRecordAccess.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestRecordAccess.java @@ -13,8 +13,6 @@ */ package io.trino.plugin.kinesis; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; import io.airlift.log.Logger; import io.trino.Session; import io.trino.metadata.QualifiedObjectName; @@ -31,10 +29,12 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -88,49 +88,51 @@ public void stop() { queryRunner.close(); queryRunner = null; + mockClient.close(); } private void createDummyMessages(String streamName, int count) { - PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - putRecordsRequest.setStreamName(streamName); + PutRecordsRequest.Builder putRecordsRequest = PutRecordsRequest.builder(); + putRecordsRequest.streamName(streamName); List putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < count; i++) { - PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); - putRecordsRequestEntry.setData(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes(UTF_8))); - putRecordsRequestEntry.setPartitionKey(Long.toString(i)); + PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder() + .data(SdkBytes.fromByteArray(UUID.randomUUID().toString().getBytes(UTF_8))) + .partitionKey(Long.toString(i)) + .build(); putRecordsRequestEntryList.add(putRecordsRequestEntry); } - putRecordsRequest.setRecords(putRecordsRequestEntryList); - mockClient.putRecords(putRecordsRequest); + putRecordsRequest.records(putRecordsRequestEntryList); + mockClient.putRecords(putRecordsRequest.build()); } private void createJsonMessages(String streamName, int count, int idStart, boolean compress) { String jsonFormat = "{\"id\" : %d, \"name\" : \"%s\"}"; - PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - putRecordsRequest.setStreamName(streamName); + PutRecordsRequest.Builder putRecordsRequest = PutRecordsRequest.builder() + .streamName(streamName); List putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < count; i++) { - PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); + PutRecordsRequestEntry.Builder putRecordsRequestEntry = PutRecordsRequestEntry.builder(); long id = idStart + i; String name = UUID.randomUUID().toString(); String jsonVal = format(jsonFormat, id, name); // ? with StandardCharsets.UTF_8 if (compress) { - putRecordsRequestEntry.setData(ByteBuffer.wrap(compressMessage(jsonVal.getBytes(UTF_8)))); + putRecordsRequestEntry.data(SdkBytes.fromByteArray(compressMessage(jsonVal.getBytes(UTF_8)))); } else { - putRecordsRequestEntry.setData(ByteBuffer.wrap(jsonVal.getBytes(UTF_8))); + putRecordsRequestEntry.data(SdkBytes.fromByteArray(jsonVal.getBytes(UTF_8))); } - putRecordsRequestEntry.setPartitionKey(Long.toString(id)); - putRecordsRequestEntryList.add(putRecordsRequestEntry); + putRecordsRequestEntry.partitionKey(Long.toString(id)); + putRecordsRequestEntryList.add(putRecordsRequestEntry.build()); } - putRecordsRequest.setRecords(putRecordsRequestEntryList); - mockClient.putRecords(putRecordsRequest); + putRecordsRequest.records(putRecordsRequestEntryList); + mockClient.putRecords(putRecordsRequest.build()); } private static byte[] compressMessage(byte[] data) diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java index cb2c19cc3444..918a67e2c867 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.kinesis.s3config; -import com.amazonaws.services.s3.AmazonS3URI; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; import io.trino.plugin.kinesis.KinesisConnector; @@ -26,13 +25,14 @@ import io.trino.spi.connector.SchemaTableName; import org.testng.annotations.Parameters; import org.testng.annotations.Test; +import software.amazon.awssdk.services.s3.S3Uri; +import java.net.URI; import java.util.Map; import static io.trino.testing.TestingConnectorSession.SESSION; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) public class TestS3TableConfigClient @@ -43,37 +43,38 @@ public class TestS3TableConfigClient public void testS3URIValues() { // Verify that S3URI values will work: - AmazonS3URI uri1 = new AmazonS3URI("s3://our.data.warehouse/prod/client_actions"); - assertNotNull(uri1.getKey()); - assertNotNull(uri1.getBucket()); + URI uri1 = URI.create("s3://our.data.warehouse/prod/client_actions"); + S3Uri s3Uri1 = S3Uri.builder().uri(uri1).build(); + assertNotNull(s3Uri1.bucket().get()); + assertNotNull(s3Uri1.key().get()); - assertEquals(uri1.toString(), "s3://our.data.warehouse/prod/client_actions"); - assertEquals(uri1.getBucket(), "our.data.warehouse"); - assertEquals(uri1.getKey(), "prod/client_actions"); - assertTrue(uri1.getRegion() == null); + assertEquals(s3Uri1.toString(), "s3://our.data.warehouse/prod/client_actions"); + assertEquals(s3Uri1.bucket().get(), "our.data.warehouse"); + assertEquals(s3Uri1.key().get(), "prod/client_actions"); // show info: - log.info("Tested out URI1 : %s", uri1); + log.info("Tested out URI1 : %s", s3Uri1.toString()); - AmazonS3URI uri2 = new AmazonS3URI("s3://some.big.bucket/long/complex/path"); - assertNotNull(uri2.getKey()); - assertNotNull(uri2.getBucket()); + URI uri2 = URI.create("s3://some.big.bucket/long/complex/path"); + S3Uri s3Uri2 = S3Uri.builder().uri(uri2).build(); + assertNotNull(s3Uri2.bucket().get()); + assertNotNull(s3Uri2.key().get()); - assertEquals(uri2.toString(), "s3://some.big.bucket/long/complex/path"); - assertEquals(uri2.getBucket(), "some.big.bucket"); - assertEquals(uri2.getKey(), "long/complex/path"); - assertTrue(uri2.getRegion() == null); + assertEquals(s3Uri2.toString(), "s3://some.big.bucket/long/complex/path"); + assertEquals(s3Uri2.bucket().get(), "some.big.bucket"); + assertEquals(s3Uri2.key().get(), "long/complex/path"); // info: log.info("Tested out URI2 : %s", uri2); - AmazonS3URI uri3 = new AmazonS3URI("s3://trino.kinesis.config/unit-test/trino-kinesis"); - assertNotNull(uri3.getKey()); - assertNotNull(uri3.getBucket()); + URI uri3 = URI.create("s3://trino.kinesis.config/unit-test/trino-kinesis"); + S3Uri s3Uri3 = S3Uri.builder().uri(uri3).build(); + assertNotNull(s3Uri3.bucket().get()); + assertNotNull(s3Uri3.key().get()); assertEquals(uri3.toString(), "s3://trino.kinesis.config/unit-test/trino-kinesis"); - assertEquals(uri3.getBucket(), "trino.kinesis.config"); - assertEquals(uri3.getKey(), "unit-test/trino-kinesis"); + assertEquals(uri3.getHost(), "trino.kinesis.config"); + assertEquals(uri3.getPath().substring(1), "unit-test/trino-kinesis"); } @Parameters({ diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/EmbeddedKinesisStream.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/EmbeddedKinesisStream.java index 58e72b5b6c2a..e3e7d23ff3ae 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/EmbeddedKinesisStream.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/EmbeddedKinesisStream.java @@ -13,12 +13,14 @@ */ package io.trino.plugin.kinesis.util; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.CreateStreamRequest; -import com.amazonaws.services.kinesis.model.DeleteStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.StreamDescription; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.StreamDescription; import java.io.Closeable; @@ -27,11 +29,16 @@ public class EmbeddedKinesisStream implements Closeable { - private AmazonKinesisClient amazonKinesisClient; + private KinesisClient amazonKinesisClient; public EmbeddedKinesisStream(String accessKey, String secretKey) { - this.amazonKinesisClient = new AmazonKinesisClient(new BasicAWSCredentials(accessKey, secretKey)); + AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(accessKey, secretKey); + + this.amazonKinesisClient = KinesisClient.builder() + .httpClient(ApacheHttpClient.create()) + .credentialsProvider(StaticCredentialsProvider.create(awsCredentials)) + .build(); } @Override @@ -39,18 +46,21 @@ public void close() {} private String checkStreamStatus(String streamName) { - DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); + DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() + .streamName(streamName) + .build(); - StreamDescription streamDescription = amazonKinesisClient.describeStream(describeStreamRequest).getStreamDescription(); - return streamDescription.getStreamStatus(); + StreamDescription streamDescription = amazonKinesisClient.describeStream(describeStreamRequest).streamDescription(); + return streamDescription.streamStatus().toString(); } public void createStream(int shardCount, String streamName) { - CreateStreamRequest createStreamRequest = new CreateStreamRequest(); - createStreamRequest.setStreamName(streamName); - createStreamRequest.setShardCount(shardCount); + CreateStreamRequest createStreamRequest = CreateStreamRequest + .builder() + .streamName(streamName) + .shardCount(shardCount) + .build(); amazonKinesisClient.createStream(createStreamRequest); try { @@ -62,15 +72,16 @@ public void createStream(int shardCount, String streamName) } } - public AmazonKinesisClient getKinesisClient() + public KinesisClient getKinesisClient() { return amazonKinesisClient; } public void deleteStream(String streamName) { - DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest(); - deleteStreamRequest.setStreamName(streamName); + DeleteStreamRequest deleteStreamRequest = DeleteStreamRequest.builder() + .streamName(streamName) + .build(); amazonKinesisClient.deleteStream(deleteStreamRequest); } } diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/KinesisTestClientManager.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/KinesisTestClientManager.java index 4ad4d4c1520a..967f0d162356 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/KinesisTestClientManager.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/KinesisTestClientManager.java @@ -13,10 +13,12 @@ */ package io.trino.plugin.kinesis.util; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.s3.AmazonS3Client; import io.trino.plugin.kinesis.KinesisClientProvider; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.s3.S3Client; /** * Test implementation of KinesisClientProvider that incorporates a mock Kinesis client. @@ -24,30 +26,34 @@ public class KinesisTestClientManager implements KinesisClientProvider { - private AmazonKinesisClient client = new MockKinesisClient(); - private final AmazonDynamoDBClient dynamoDBClient; - private final AmazonS3Client amazonS3Client; + private KinesisClient client = new MockKinesisClient(); + private final DynamoDbAsyncClient dynamoDBClient; + private final S3Client amazonS3Client; public KinesisTestClientManager() { - this.dynamoDBClient = new AmazonDynamoDBClient(); - this.amazonS3Client = new AmazonS3Client(); + this.dynamoDBClient = DynamoDbAsyncClient.builder() + .httpClient(NettyNioAsyncHttpClient.create()) + .build(); + this.amazonS3Client = S3Client.builder() + .httpClient(ApacheHttpClient.create()) + .build(); } @Override - public AmazonKinesisClient getClient() + public KinesisClient getClient() { return client; } @Override - public AmazonDynamoDBClient getDynamoDbClient() + public DynamoDbAsyncClient getDynamoDbClient() { return this.dynamoDBClient; } @Override - public AmazonS3Client getS3Client() + public S3Client getS3Client() { return amazonS3Client; } diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java index 8d762746ed96..f5812cd8f408 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/util/MockKinesisClient.java @@ -13,37 +13,36 @@ */ package io.trino.plugin.kinesis.util; -import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.AmazonWebServiceRequest; -import com.amazonaws.ResponseMetadata; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.CreateStreamRequest; -import com.amazonaws.services.kinesis.model.CreateStreamResult; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.ListStreamsRequest; -import com.amazonaws.services.kinesis.model.ListStreamsResult; -import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest; -import com.amazonaws.services.kinesis.model.ListTagsForStreamResult; -import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.amazonaws.services.kinesis.model.PutRecordResult; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; -import com.amazonaws.services.kinesis.model.PutRecordsResult; -import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.SequenceNumberRange; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.StreamDescription; - -import java.nio.ByteBuffer; +import io.trino.testing.ResourcePresence; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.HashKeyRange; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.StreamDescription; + +import java.time.Instant; import java.util.ArrayList; -import java.util.Date; import java.util.List; import static java.lang.Integer.parseInt; @@ -58,23 +57,121 @@ *

*/ public class MockKinesisClient - extends AmazonKinesisClient + implements KinesisClient { private List streams = new ArrayList<>(); + /* + Shard can not be extended in the new AWS SDK v2 as it is declared final. + Create our own class and convert to Shard whenever needed. + */ public static class InternalShard - extends Shard { + private String shardId; + private String parentShardId; + private String adjacentParentShardId; + private HashKeyRange hashKeyRange; + private SequenceNumberRange sequenceNumberRange; private List recs = new ArrayList<>(); private int index; public InternalShard(String streamName, int index) { - super(); this.index = index; this.setShardId(streamName + "_" + this.index); } + public static Shard convertInternalShardToShard(InternalShard internalShard) + { + return Shard.builder() + .shardId(internalShard.getShardId()) + .parentShardId(internalShard.getParentShardId()) + .adjacentParentShardId(internalShard.getAdjacentParentShardId()) + .hashKeyRange(internalShard.getHashKeyRange()) + .sequenceNumberRange(internalShard.getSequenceNumberRange()) + .build(); + } + + public void setShardId(String shardId) + { + this.shardId = shardId; + } + + public String getShardId() + { + return this.shardId; + } + + public InternalShard withShardId(String shardId) + { + this.setShardId(shardId); + return this; + } + + public void setParentShardId(String parentShardId) + { + this.parentShardId = parentShardId; + } + + public String getParentShardId() + { + return this.parentShardId; + } + + public InternalShard withParentShardId(String parentShardId) + { + this.setParentShardId(parentShardId); + return this; + } + + public void setAdjacentParentShardId(String adjacentParentShardId) + { + this.adjacentParentShardId = adjacentParentShardId; + } + + public String getAdjacentParentShardId() + { + return this.adjacentParentShardId; + } + + public InternalShard withAdjacentParentShardId(String adjacentParentShardId) + { + this.setAdjacentParentShardId(adjacentParentShardId); + return this; + } + + public void setHashKeyRange(HashKeyRange hashKeyRange) + { + this.hashKeyRange = hashKeyRange; + } + + public HashKeyRange getHashKeyRange() + { + return this.hashKeyRange; + } + + public InternalShard withHashKeyRange(HashKeyRange hashKeyRange) + { + this.setHashKeyRange(hashKeyRange); + return this; + } + + public void setSequenceNumberRange(SequenceNumberRange sequenceNumberRange) + { + this.sequenceNumberRange = sequenceNumberRange; + } + + public SequenceNumberRange getSequenceNumberRange() + { + return this.sequenceNumberRange; + } + + public InternalShard withSequenceNumberRange(SequenceNumberRange sequenceNumberRange) + { + this.setSequenceNumberRange(sequenceNumberRange); + return this; + } + public List getRecords() { return recs; @@ -85,7 +182,7 @@ public List getRecordsFrom(ShardIterator iterator) List returnRecords = new ArrayList<>(); for (Record record : this.recs) { - if (parseInt(record.getSequenceNumber()) >= iterator.recordIndex) { + if (parseInt(record.sequenceNumber()) >= iterator.recordIndex) { returnRecords.add(record); } } @@ -109,6 +206,12 @@ public void clearRecords() } } + @Override + public String serviceName() + { + return "MockKinesisClient"; + } + public static class InternalStream { private final String streamName; @@ -128,7 +231,7 @@ public InternalStream(String streamName, int shardCount, boolean isActive) for (int i = 0; i < shardCount; i++) { InternalShard newShard = new InternalShard(this.streamName, i); - newShard.setSequenceNumberRange((new SequenceNumberRange()).withStartingSequenceNumber("100").withEndingSequenceNumber("999")); + newShard.setSequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("100").endingSequenceNumber("999").build()); this.shards.add(newShard); } } @@ -172,14 +275,17 @@ public List getShardsFrom(String afterShardId) return new ArrayList<>(); } - public PutRecordResult putRecord(ByteBuffer data, String partitionKey) + public PutRecordResponse putRecord(SdkBytes data, String partitionKey) { // Create record and insert into the shards. Initially just do it // on a round robin basis. long timestamp = System.currentTimeMillis() - 50000; - Record record = new Record(); - record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber(String.valueOf(sequenceNo)); - record.setApproximateArrivalTimestamp(new Date(timestamp)); + Record record = Record.builder() + .data(data) + .partitionKey(partitionKey) + .sequenceNumber(String.valueOf(sequenceNo)) + .approximateArrivalTimestamp(Instant.ofEpochMilli(timestamp)) + .build(); if (nextShard == shards.size()) { nextShard = 0; @@ -187,9 +293,9 @@ record = record.withData(data).withPartitionKey(partitionKey).withSequenceNumber InternalShard shard = shards.get(nextShard); shard.addRecord(record); - PutRecordResult result = new PutRecordResult(); - result.setSequenceNumber(String.valueOf(sequenceNo)); - result.setShardId(shard.getShardId()); + PutRecordResponse result = PutRecordResponse.builder() + .sequenceNumber(String.valueOf(sequenceNo)) + .shardId(shard.getShardId()).build(); nextShard++; sequenceNo++; @@ -241,11 +347,6 @@ public static ShardIterator fromString(String input) } } - public MockKinesisClient() - { - super(); - } - protected InternalStream getStream(String name) { InternalStream foundStream = null; @@ -262,7 +363,7 @@ protected List getShards(InternalStream theStream) { List externalList = new ArrayList<>(); for (InternalShard intshard : theStream.getShards()) { - externalList.add(intshard); + externalList.add(InternalShard.convertInternalShardToShard(intshard)); } return externalList; @@ -272,105 +373,102 @@ protected List getShards(InternalStream theStream, String fromShardId) { List externalList = new ArrayList<>(); for (InternalShard intshard : theStream.getShardsFrom(fromShardId)) { - externalList.add(intshard); + externalList.add(InternalShard.convertInternalShardToShard(intshard)); } return externalList; } @Override - public PutRecordResult putRecord(PutRecordRequest putRecordRequest) - throws AmazonClientException + public PutRecordResponse putRecord(PutRecordRequest putRecordRequest) + throws SdkException { // Setup method to add a new record: - InternalStream theStream = this.getStream(putRecordRequest.getStreamName()); + InternalStream theStream = this.getStream(putRecordRequest.streamName()); if (theStream != null) { - return theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey()); + return theStream.putRecord(putRecordRequest.data(), putRecordRequest.partitionKey()); } - throw new AmazonClientException("This stream does not exist!"); + throw SdkException.create("This stream does not exist!", null); } @Override - public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) - throws AmazonClientException + public CreateStreamResponse createStream(CreateStreamRequest createStreamRequest) + throws SdkException { // Setup method to create a new stream: - InternalStream stream = new InternalStream(createStreamRequest.getStreamName(), createStreamRequest.getShardCount(), true); + InternalStream stream = new InternalStream(createStreamRequest.streamName(), createStreamRequest.shardCount(), true); this.streams.add(stream); - return new CreateStreamResult(); + return CreateStreamResponse.builder().build(); } - @Override - public CreateStreamResult createStream(String streamName, Integer integer) - throws AmazonClientException + public CreateStreamResponse createStream(String streamName, Integer integer) + throws SdkException { - return this.createStream((new CreateStreamRequest()).withStreamName(streamName).withShardCount(integer)); + return this.createStream(CreateStreamRequest.builder().streamName(streamName).shardCount(integer).build()); } @Override - public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) - throws AmazonClientException + public PutRecordsResponse putRecords(PutRecordsRequest putRecordsRequest) + throws SdkException { // Setup method to add a batch of new records: - InternalStream theStream = this.getStream(putRecordsRequest.getStreamName()); + InternalStream theStream = this.getStream(putRecordsRequest.streamName()); if (theStream != null) { - PutRecordsResult result = new PutRecordsResult(); + PutRecordsResponse.Builder result = PutRecordsResponse.builder(); List resultList = new ArrayList<>(); - for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) { - PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey()); - resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber())); + for (PutRecordsRequestEntry entry : putRecordsRequest.records()) { + PutRecordResponse putResult = theStream.putRecord(entry.data(), entry.partitionKey()); + resultList.add(PutRecordsResultEntry.builder().shardId(putResult.shardId()).sequenceNumber(putResult.sequenceNumber()).build()); } - result.setRecords(resultList); - return result; + result.records(resultList); + return result.build(); } - throw new AmazonClientException("This stream does not exist!"); + throw SdkException.create("This stream does not exist!", null); } @Override - public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) - throws AmazonClientException + public DescribeStreamResponse describeStream(DescribeStreamRequest describeStreamRequest) + throws SdkException { - InternalStream theStream = this.getStream(describeStreamRequest.getStreamName()); + InternalStream theStream = this.getStream(describeStreamRequest.streamName()); if (theStream == null) { - throw new AmazonClientException("This stream does not exist!"); + throw SdkException.create("This stream does not exist!", null); } - StreamDescription desc = new StreamDescription(); - desc = desc.withStreamName(theStream.getStreamName()).withStreamStatus(theStream.getStreamStatus()).withStreamARN(theStream.getStreamAmazonResourceName()); + StreamDescription.Builder desc = StreamDescription.builder(); + desc = desc.streamName(theStream.getStreamName()).streamStatus(theStream.getStreamStatus()).streamARN(theStream.getStreamAmazonResourceName()); - if (describeStreamRequest.getExclusiveStartShardId() == null || describeStreamRequest.getExclusiveStartShardId().isEmpty()) { - desc.setShards(this.getShards(theStream)); - desc.setHasMoreShards(false); + if (describeStreamRequest.exclusiveStartShardId() == null || describeStreamRequest.exclusiveStartShardId().isEmpty()) { + desc.shards(this.getShards(theStream)); + desc.hasMoreShards(false); } else { // Filter from given shard Id, or may not have any more - String startId = describeStreamRequest.getExclusiveStartShardId(); - desc.setShards(this.getShards(theStream, startId)); - desc.setHasMoreShards(false); + String startId = describeStreamRequest.exclusiveStartShardId(); + desc.shards(this.getShards(theStream, startId)); + desc.hasMoreShards(false); } - DescribeStreamResult result = new DescribeStreamResult(); - result = result.withStreamDescription(desc); - return result; + return DescribeStreamResponse.builder().streamDescription(desc.build()).build(); } @Override - public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) - throws AmazonClientException + public GetShardIteratorResponse getShardIterator(GetShardIteratorRequest getShardIteratorRequest) + throws SdkException { - ShardIterator iter = ShardIterator.fromStreamAndShard(getShardIteratorRequest.getStreamName(), getShardIteratorRequest.getShardId()); + ShardIterator iter = ShardIterator.fromStreamAndShard(getShardIteratorRequest.streamName(), getShardIteratorRequest.shardId()); if (iter == null) { - throw new AmazonClientException("Bad stream or shard iterator!"); + throw SdkException.create("Bad stream or shard iterator!", null); } InternalStream theStream = this.getStream(iter.streamId); if (theStream == null) { - throw new AmazonClientException("Unknown stream or bad shard iterator!"); + throw SdkException.create("Unknown stream or bad shard iterator!", null); } - String seqAsString = getShardIteratorRequest.getStartingSequenceNumber(); - if (seqAsString != null && !seqAsString.isEmpty() && getShardIteratorRequest.getShardIteratorType().equals("AFTER_SEQUENCE_NUMBER")) { + String seqAsString = getShardIteratorRequest.startingSequenceNumber(); + if (seqAsString != null && !seqAsString.isEmpty() && getShardIteratorRequest.shardIteratorType().toString().equals("AFTER_SEQUENCE_NUMBER")) { int sequence = parseInt(seqAsString); iter.recordIndex = sequence + 1; } @@ -378,44 +476,41 @@ public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardI iter.recordIndex = 100; } - GetShardIteratorResult result = new GetShardIteratorResult(); - return result.withShardIterator(iter.makeString()); + return GetShardIteratorResponse.builder().shardIterator(iter.makeString()).build(); } @Override - public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) - throws AmazonClientException + public GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) + throws SdkException { - ShardIterator iterator = ShardIterator.fromString(getRecordsRequest.getShardIterator()); + ShardIterator iterator = ShardIterator.fromString(getRecordsRequest.shardIterator()); if (iterator == null) { - throw new AmazonClientException("Bad shard iterator."); + throw SdkException.create("Bad shard iterator.", null); } // TODO: incorporate maximum batch size (getRecordsRequest.getLimit) InternalStream stream = this.getStream(iterator.streamId); if (stream == null) { - throw new AmazonClientException("Unknown stream or bad shard iterator."); + throw SdkException.create("Unknown stream or bad shard iterator.", null); } InternalShard shard = stream.getShards().get(iterator.shardIndex); - GetRecordsResult result; + GetRecordsResponse.Builder result = GetRecordsResponse.builder(); if (iterator.recordIndex == 100) { - result = new GetRecordsResult(); List recs = shard.getRecords(); - result.setRecords(recs); // NOTE: getting all for now - result.setNextShardIterator(getNextShardIterator(iterator, recs).makeString()); - result.setMillisBehindLatest(100L); + result.records(recs); // NOTE: getting all for now + result.nextShardIterator(getNextShardIterator(iterator, recs).makeString()); + result.millisBehindLatest(100L); } else { - result = new GetRecordsResult(); List recs = shard.getRecordsFrom(iterator); - result.setRecords(recs); // may be empty - result.setNextShardIterator(getNextShardIterator(iterator, recs).makeString()); - result.setMillisBehindLatest(100L); + result.records(recs); // may be empty + result.nextShardIterator(getNextShardIterator(iterator, recs).makeString()); + result.millisBehindLatest(100L); } - return result; + return result.build(); } protected ShardIterator getNextShardIterator(ShardIterator previousIter, List records) @@ -425,105 +520,43 @@ protected ShardIterator getNextShardIterator(ShardIterator previousIter, List1.11.1 ${dep.airlift.version} 1.12.493 - 2.20.89 + 2.20.99 0.11.5 21.9.0.0 1.20 @@ -174,7 +174,7 @@ 1.3.0 3.22.2 4.5.0 - 4.1.92.Final + 4.1.94.Final 5.13.0 3.3.0