Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down Expand Up @@ -194,6 +184,31 @@
<artifactId>jmxutils</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>metrics-spi</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>utils</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -447,7 +462,16 @@
aws-java-sdk-core -->
<ignoredDependency>com.amazonaws:aws-java-sdk-core</ignoredDependency>
<ignoredDependency>org.apache.iceberg:iceberg-api</ignoredDependency>
<ignoredDependency>software.amazon.awssdk:auth</ignoredDependency>
<ignoredDependency>software.amazon.awssdk:metrics-spi</ignoredDependency>
<ignoredDependency>software.amazon.awssdk:sdk-core</ignoredDependency>
</ignoredNonTestScopedDependencies>
<ignoredUnusedDeclaredDependencies>
<!-- dependency plugin fails to recognize software.amazon.awssdk:utils as a compile-time dependency,
as it is not directly used, however most of the other dependencies from software.amazon.awssdk
expect it to be present at compile time -->
<ignoredDependency>software.amazon.awssdk:utils</ignoredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.plugin.deltalake.metastore.glue;

import com.amazonaws.services.glue.model.Table;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.deltalake.AllowDeltaLakeManagedTableRename;
import io.trino.plugin.hive.metastore.glue.ForGlueHiveMetastore;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreModule;
import software.amazon.awssdk.services.glue.model.Table;

import java.util.function.Predicate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
package io.trino.plugin.deltalake.metastore.glue;

import com.amazonaws.services.glue.model.Table;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.trino.plugin.hive.metastore.glue.DefaultGlueMetastoreTableFilterProvider;
import software.amazon.awssdk.services.glue.model.Table;

import java.util.function.Predicate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@
*/
package io.trino.plugin.deltalake.metastore.glue;

import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder;
import com.amazonaws.services.glue.model.DeleteDatabaseRequest;
import com.amazonaws.services.glue.model.EntityNotFoundException;
import com.amazonaws.services.glue.model.GetDatabasesRequest;
import com.amazonaws.services.glue.model.GetDatabasesResult;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.trino.plugin.hive.aws.AwsApiCallStats;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;

import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults;
import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.awsSyncPaginatedRequest;
import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.awsSyncRequest;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.DAYS;

Expand All @@ -39,28 +38,27 @@ public class TestDeltaLakeCleanUpGlueMetastore
@Test
public void cleanupOrphanedDatabases()
{
AWSGlueAsync glueClient = AWSGlueAsyncClientBuilder.defaultClient();
GlueAsyncClient glueClient = GlueAsyncClient.builder().build();
long creationTimeMillisThreshold = currentTimeMillis() - DAYS.toMillis(1);
List<String> orphanedDatabases = getPaginatedResults(
glueClient::getDatabases,
new GetDatabasesRequest(),
GetDatabasesRequest::setNextToken,
GetDatabasesResult::getNextToken,
new AwsApiCallStats())
.map(GetDatabasesResult::getDatabaseList)
.flatMap(List::stream)
.filter(glueDatabase -> glueDatabase.getName().startsWith(TEST_DATABASE_NAME_PREFIX) &&
glueDatabase.getCreateTime().getTime() <= creationTimeMillisThreshold)
.map(com.amazonaws.services.glue.model.Database::getName)
.collect(toImmutableList());
ImmutableList.Builder<String> databaseNames = ImmutableList.builder();
awsSyncPaginatedRequest(glueClient.getDatabasesPaginator(GetDatabasesRequest.builder().build()),
getDatabasesResponse -> getDatabasesResponse.databaseList()
.stream()
.filter(glueDatabase -> glueDatabase.name().startsWith(TEST_DATABASE_NAME_PREFIX)
&& glueDatabase.createTime().toEpochMilli() <= creationTimeMillisThreshold)
.forEach(glueDatabase -> {
databaseNames.add(glueDatabase.name());
}),
new AwsApiCallStats());
List<String> orphanedDatabases = databaseNames.build();

if (!orphanedDatabases.isEmpty()) {
log.info("Found %s %s* databases that look orphaned, removing", orphanedDatabases.size(), TEST_DATABASE_NAME_PREFIX);
orphanedDatabases.forEach(database -> {
try {
log.info("Deleting %s database", database);
glueClient.deleteDatabase(new DeleteDatabaseRequest()
.withName(database));
awsSyncRequest(glueClient::deleteDatabase, DeleteDatabaseRequest.builder()
.name(database).build(), null);
}
catch (EntityNotFoundException e) {
log.info("Database [%s] not found, could be removed by other cleanup process", database);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
*/
package io.trino.plugin.deltalake.metastore.glue;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.model.ConcurrentModificationException;
import io.trino.Session;
import io.trino.plugin.deltalake.TestingDeltaLakePlugin;
import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule;
Expand All @@ -29,6 +26,9 @@
import io.trino.testing.QueryRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -74,14 +74,14 @@ protected QueryRunner createQueryRunner()
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig()
.setDefaultWarehouseDir(dataDirectory.toUri().toString());

AWSGlueAsync glueClient = createAsyncGlueClient(glueConfig, DefaultAWSCredentialsProviderChain.getInstance(), Optional.empty(), stats.newRequestMetricsCollector());
AWSGlueAsync proxiedGlueClient = newProxy(AWSGlueAsync.class, (proxy, method, args) -> {
GlueAsyncClient glueClient = createAsyncGlueClient(glueConfig, DefaultCredentialsProvider.create(), Optional.empty(), stats.newRequestMetricsPublisher());
GlueAsyncClient proxiedGlueClient = newProxy(GlueAsyncClient.class, (proxy, method, args) -> {
Object result;
try {
if (method.getName().equals("deleteTable") && failNextGlueDeleteTableCall.get()) {
// Simulate concurrent modifications on the table that is about to be dropped
failNextGlueDeleteTableCall.set(false);
throw new TrinoException(HIVE_METASTORE_ERROR, new ConcurrentModificationException("Test-simulated metastore concurrent modification exception"));
throw new TrinoException(HIVE_METASTORE_ERROR, ConcurrentModificationException.builder().message("Test-simulated metastore concurrent modification exception").build());
}
result = method.invoke(glueClient, args);
}
Expand Down
79 changes: 74 additions & 5 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
Expand Down Expand Up @@ -272,6 +267,67 @@
<artifactId>jmxutils</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use apache-client since we don't need or want async behavior. Please see trino-filesystem-s3 to see how we use it with S3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few details in this comment. trino-hive may not be able to use sync client as mentioned.
Should I convert the one in trino-iceberg(TrinoGlueCatalog) to sync ?

</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>metrics-spi</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>utils</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -481,6 +537,19 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredNonTestScopedDependencies>
<!-- dependency plugin fails to recognize software.amazon.awssdk:utils as a compile-time dependency,
as it is not directly used, however most of the other dependencies from software.amazon.awssdk
expect it to be present at compile time -->
<ignoredDependency>software.amazon.awssdk:utils</ignoredDependency>
</ignoredNonTestScopedDependencies>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,27 @@

import com.google.common.collect.AbstractIterator;
import io.trino.plugin.hive.aws.AwsApiCallStats;
import io.trino.spi.TrinoException;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.glue.model.GlueException;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static java.util.Objects.requireNonNull;

public final class AwsSdkUtil
{
private AwsSdkUtil() {}

/**
* Helper method to get all results from a paginated API.
*
* @param request request object reused for subsequent requests with
* {@code setNextToken} being used to set the next token in the request object
*/
public static <Request, Result> Stream<Result> getPaginatedResults(
public static <Request, Result> Stream<Result> getPaginatedResultsForS3(
Function<Request, Result> submission,
Request request,
BiConsumer<Request, String> setNextToken,
Expand Down Expand Up @@ -68,4 +69,56 @@ protected Result computeNext()

return stream(iterator);
}

/**
* Helper method to handle sync request with async client
*/
public static <Request, Result> Result awsSyncRequest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed if we use GlueClient.

Function<Request, CompletableFuture<Result>> submission,
Request request,
AwsApiCallStats stats)
{
requireNonNull(submission, "submission is null");
requireNonNull(request, "request is null");
try {
if (stats != null) {
return stats.call(() -> submission.apply(request).join());
}

return submission.apply(request).join();
}
catch (CompletionException e) {
if (e.getCause() instanceof GlueException glueException) {
throw glueException;
}
throw new TrinoException(HIVE_METASTORE_ERROR, e.getCause());
}
}

/**
* Helper method to handle sync paginated request with async client
*/
public static <Result> void awsSyncPaginatedRequest(SdkPublisher<Result> paginator, Consumer<Result> resultConsumer, AwsApiCallStats stats)
{
requireNonNull(paginator, "paginator is null");
requireNonNull(resultConsumer, "resultConsumer is null");
try {
CompletableFuture<Void> paginationFuture;
if (stats != null) {
paginationFuture = stats.call(() -> paginator.subscribe(resultConsumer));
}
else {
paginationFuture = paginator.subscribe(resultConsumer);
}

// for paginator.subscribe() to complete
paginationFuture.join();
}
catch (CompletionException e) {
if (e.getCause() instanceof GlueException glueException) {
throw glueException;
}
throw new TrinoException(HIVE_METASTORE_ERROR, e.getCause());
}
}
}
Loading