Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 168 additions & 46 deletions plugin/trino-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${dep.aws-sdk.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -118,6 +72,155 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<exclusions>

<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<exclusions>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
<exclusions>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<exclusions>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<exclusions>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<exclusions>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<exclusions>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>2.4.5</version>
<exclusions>
<exclusion>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -173,6 +276,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand All @@ -198,6 +307,7 @@
<exclude>**/TestS3TableConfigClient.java</exclude>
</excludes>
<systemPropertyVariables>
<aws.region>us-east-1</aws.region>
<kinesis.awsAccessKey>ACCESS-KEY</kinesis.awsAccessKey>
<kinesis.awsSecretKey>SECRET-KEY</kinesis.awsSecretKey>
<kinesis.test-table-description-location>s3://S3-LOC</kinesis.test-table-description-location>
Expand All @@ -211,9 +321,21 @@
<ignoredResourcePatterns combine.children="append">
<!-- com.google.protobuf:protobuf-java and com.squareup.wire:wire-schema proto file duplicate -->
<ignoredResourcePattern>google/protobuf/.*\.proto$</ignoredResourcePattern>
<ignoredResourcePattern>AUTHORS</ignoredResourcePattern>
<ignoredResourcePattern>wire/extensions.proto</ignoredResourcePattern>
<ignoredResourcePattern>checkstyle.xml</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>software.amazon.kinesis:kinesis-client</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
*/
package io.trino.plugin.kinesis;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.s3.AmazonS3Client;
import com.google.inject.Inject;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.s3.S3Client;

import static com.google.common.base.Strings.isNullOrEmpty;

Expand All @@ -31,44 +36,55 @@
public class KinesisClientManager
implements KinesisClientProvider
{
private final AmazonKinesisClient client;
private final AmazonS3Client amazonS3Client;
private final AmazonDynamoDBClient dynamoDbClient; // for Checkpointing
private final KinesisClient client;
private final S3Client amazonS3Client;
private final DynamoDbAsyncClient dynamoDbClient; // for Checkpointing

@Inject
public KinesisClientManager(KinesisConfig config)
{
AwsCredentialsProvider credentialsProvider;
if (!isNullOrEmpty(config.getAccessKey()) && !isNullOrEmpty(config.getSecretKey())) {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
this.client = new AmazonKinesisClient(awsCredentials);
this.amazonS3Client = new AmazonS3Client(awsCredentials);
this.dynamoDbClient = new AmazonDynamoDBClient(awsCredentials);
AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey());
credentialsProvider = StaticCredentialsProvider.create(awsCredentials);
}
else {
DefaultAWSCredentialsProviderChain defaultChain = new DefaultAWSCredentialsProviderChain();
this.client = new AmazonKinesisClient(defaultChain);
this.amazonS3Client = new AmazonS3Client(defaultChain);
this.dynamoDbClient = new AmazonDynamoDBClient(defaultChain);
credentialsProvider = DefaultCredentialsProvider.create();
}

this.client.setEndpoint("kinesis." + config.getAwsRegion() + ".amazonaws.com");
this.dynamoDbClient.setEndpoint("dynamodb." + config.getAwsRegion() + ".amazonaws.com");
this.client = KinesisClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.of(config.getAwsRegion()))
.httpClient(ApacheHttpClient.create())
.build();
this.amazonS3Client = S3Client.builder()
.credentialsProvider(credentialsProvider)
.region(Region.of(config.getAwsRegion()))
.crossRegionAccessEnabled(true)
.httpClient(ApacheHttpClient.create())
.build();

this.dynamoDbClient = DynamoDbAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.region(Region.of(config.getAwsRegion()))
.httpClient(NettyNioAsyncHttpClient.create())
.build();
}

@Override
public AmazonKinesisClient getClient()
public KinesisClient getClient()
{
return client;
}

@Override
public AmazonDynamoDBClient getDynamoDbClient()
public DynamoDbAsyncClient getDynamoDbClient()
{
return dynamoDbClient;
}

@Override
public AmazonS3Client getS3Client()
public S3Client getS3Client()
{
return amazonS3Client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
*/
package io.trino.plugin.kinesis;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.s3.AmazonS3Client;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Interface to a client manager that provides the AWS clients needed.
*/
//TODO: This interface needs to be removed and abstraction in unneccesary
public interface KinesisClientProvider
{
AmazonKinesisClient getClient();
KinesisClient getClient();

AmazonDynamoDBClient getDynamoDbClient();
DynamoDbAsyncClient getDynamoDbClient();

AmazonS3Client getS3Client();
S3Client getS3Client();
}
Loading