Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static <Request, Result> Stream<Result> getPaginatedResults(
Function<Request, Result> submission,
Request request,
BiConsumer<Request, String> setNextToken,
Function<Result, String> extractNextToken)
Function<Result, String> extractNextToken,
GlueMetastoreApiStats stats)
{
requireNonNull(submission, "submission is null");
requireNonNull(request, "request is null");
Expand All @@ -57,7 +58,7 @@ protected Result computeNext()
}

setNextToken.accept(request, nextToken);
Result result = submission.apply(request);
Result result = stats.call(() -> submission.apply(request));
firstRequest = false;
nextToken = extractNextToken.apply(result);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package io.trino.plugin.hive.metastore.glue;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.glue.AWSGlueAsync;
Expand Down Expand Up @@ -291,18 +293,17 @@ public Optional<Database> getDatabase(String databaseName)
public List<String> getAllDatabases()
{
try {
return stats.getGetAllDatabases().call(() -> {
List<String> databaseNames = getPaginatedResults(
glueClient::getDatabases,
new GetDatabasesRequest().withCatalogId(catalogId),
GetDatabasesRequest::setNextToken,
GetDatabasesResult::getNextToken)
.map(GetDatabasesResult::getDatabaseList)
.flatMap(List::stream)
.map(com.amazonaws.services.glue.model.Database::getName)
.collect(toImmutableList());
return databaseNames;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you retained the variable, the diff would be readable.

(btw the IDE asks me to inline such variables, but i acutally find it useful during debugging)

});
List<String> databaseNames = getPaginatedResults(
glueClient::getDatabases,
new GetDatabasesRequest().withCatalogId(catalogId),
GetDatabasesRequest::setNextToken,
GetDatabasesResult::getNextToken,
stats.getGetDatabases())
.map(GetDatabasesResult::getDatabaseList)
.flatMap(List::stream)
.map(com.amazonaws.services.glue.model.Database::getName)
.collect(toImmutableList());
return databaseNames;
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
Expand Down Expand Up @@ -370,10 +371,10 @@ public void updateTableStatistics(String databaseName, String tableName, AcidTra
final Map<String, String> statisticsParameters = updateStatisticsParameters(table.getParameters(), updatedStatistics.getBasicStatistics());
tableInput.setParameters(statisticsParameters);
table = Table.builder(table).setParameters(statisticsParameters).build();
glueClient.updateTable(new UpdateTableRequest()
stats.getUpdateTable().call(() -> glueClient.updateTable(new UpdateTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableInput(tableInput));
.withTableInput(tableInput)));
columnStatisticsProvider.updateTableColumnStatistics(table, updatedStatistics.getColumnStatistics());
}
catch (EntityNotFoundException e) {
Expand Down Expand Up @@ -426,11 +427,13 @@ private void updatePartitionStatisticsBatch(Table table, Map<String, Function<Pa
List<Future<BatchUpdatePartitionResult>> partitionUpdateRequestsFutures = new ArrayList<>();
partitionUpdateRequestsPartitioned.forEach(partitionUpdateRequestsPartition -> {
// Update basic statistics
long startTimestamp = System.currentTimeMillis();
partitionUpdateRequestsFutures.add(glueClient.batchUpdatePartitionAsync(new BatchUpdatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withEntries(partitionUpdateRequestsPartition)));
.withCatalogId(catalogId)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is more *async calls than just this one. We should address all together.

.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withEntries(partitionUpdateRequestsPartition),
new StatsRecordingAsyncHandler(stats.getBatchUpdatePartition(), startTimestamp)));
});

try {
Expand All @@ -448,21 +451,20 @@ private void updatePartitionStatisticsBatch(Table table, Map<String, Function<Pa
public List<String> getAllTables(String databaseName)
{
try {
return stats.getGetAllTables().call(() -> {
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken)
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(tableFilter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return tableNames;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, no idea what changed

});
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(tableFilter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return tableNames;
}
catch (EntityNotFoundException e) {
// database does not exist
Expand All @@ -484,21 +486,20 @@ public synchronized List<String> getTablesWithParameter(String databaseName, Str
public List<String> getAllViews(String databaseName)
{
try {
return stats.getGetAllViews().call(() -> {
List<String> views = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken)
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(table -> VIRTUAL_VIEW.name().equals(table.getTableType()))
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return views;
});
List<String> views = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(table -> VIRTUAL_VIEW.name().equals(table.getTableType()))
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return views;
}
catch (EntityNotFoundException e) {
// database does not exist
Expand Down Expand Up @@ -548,7 +549,7 @@ public void dropDatabase(String databaseName, boolean deleteData)
}

try {
stats.getDropDatabase().call(() ->
stats.getDeleteDatabase().call(() ->
glueClient.deleteDatabase(new DeleteDatabaseRequest().withCatalogId(catalogId).withName(databaseName)));
}
catch (EntityNotFoundException e) {
Expand All @@ -569,7 +570,7 @@ public void renameDatabase(String databaseName, String newDatabaseName)
try {
Database database = getDatabase(databaseName).orElseThrow(() -> new SchemaNotFoundException(databaseName));
DatabaseInput renamedDatabase = GlueInputConverter.convertDatabase(database).withName(newDatabaseName);
stats.getRenameDatabase().call(() ->
stats.getUpdateDatabase().call(() ->
glueClient.updateDatabase(new UpdateDatabaseRequest()
.withCatalogId(catalogId)
.withName(databaseName)
Expand Down Expand Up @@ -614,7 +615,7 @@ public void dropTable(String databaseName, String tableName, boolean deleteData)
Table table = getExistingTable(databaseName, tableName);

try {
stats.getDropTable().call(() ->
stats.getDeleteTable().call(() ->
glueClient.deleteTable(new DeleteTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
Expand Down Expand Up @@ -651,7 +652,7 @@ public void replaceTable(String databaseName, String tableName, Table newTable,
{
try {
TableInput newTableInput = GlueInputConverter.convertTable(newTable);
stats.getReplaceTable().call(() ->
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
Expand Down Expand Up @@ -690,7 +691,7 @@ public void setTableOwner(String databaseName, String tableName, HivePrincipal p
TableInput newTableInput = GlueInputConverter.convertTable(table);
newTableInput.setOwner(principal.getName());

stats.getReplaceTable().call(() ->
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
Expand Down Expand Up @@ -836,27 +837,25 @@ private List<Partition> getPartitions(Table table, String expression)
private List<Partition> getPartitions(Table table, String expression, @Nullable Segment segment)
{
try {
return stats.getGetPartitions().call(() -> {
// Reuse immutable field instances opportunistically between partitions
GluePartitionConverter converter = new GluePartitionConverter(table);

List<Partition> partitions = getPaginatedResults(
glueClient::getPartitions,
new GetPartitionsRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withExpression(expression)
.withSegment(segment)
.withMaxResults(AWS_GLUE_GET_PARTITIONS_MAX_RESULTS),
GetPartitionsRequest::setNextToken,
GetPartitionsResult::getNextToken)
.map(GetPartitionsResult::getPartitions)
.flatMap(List::stream)
.map(converter)
.collect(toImmutableList());
return partitions;
});
// Reuse immutable field instances opportunistically between partitions
GluePartitionConverter converter = new GluePartitionConverter(table);
List<Partition> partitions = getPaginatedResults(
glueClient::getPartitions,
new GetPartitionsRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withExpression(expression)
.withSegment(segment)
.withMaxResults(AWS_GLUE_GET_PARTITIONS_MAX_RESULTS),
GetPartitionsRequest::setNextToken,
GetPartitionsResult::getNextToken,
stats.getGetPartitions())
.map(GetPartitionsResult::getPartitions)
.flatMap(List::stream)
.map(converter)
.collect(toImmutableList());
return partitions;
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
Expand Down Expand Up @@ -920,11 +919,13 @@ private List<Partition> batchGetPartition(Table table, List<String> partitionNam
while (!pendingPartitions.isEmpty()) {
List<Future<BatchGetPartitionResult>> batchGetPartitionFutures = new ArrayList<>();
for (List<PartitionValueList> partitions : Lists.partition(pendingPartitions, BATCH_GET_PARTITION_MAX_PAGE_SIZE)) {
long startTimestamp = System.currentTimeMillis();
batchGetPartitionFutures.add(glueClient.batchGetPartitionAsync(new BatchGetPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withPartitionsToGet(partitions)));
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withPartitionsToGet(partitions),
new StatsRecordingAsyncHandler(stats.getGetPartitions(), startTimestamp)));
}
pendingPartitions.clear();

Expand Down Expand Up @@ -960,16 +961,19 @@ private List<Partition> batchGetPartition(Table table, List<String> partitionNam
public void addPartitions(String databaseName, String tableName, List<PartitionWithStatistics> partitions)
{
try {
stats.getAddPartitions().call(() -> {
stats.getCreatePartitions().call(() -> {
List<Future<BatchCreatePartitionResult>> futures = new ArrayList<>();

for (List<PartitionWithStatistics> partitionBatch : Lists.partition(partitions, BATCH_CREATE_PARTITION_MAX_PAGE_SIZE)) {
List<PartitionInput> partitionInputs = mappedCopy(partitionBatch, partition -> GlueInputConverter.convertPartition(partition));
futures.add(glueClient.batchCreatePartitionAsync(new BatchCreatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableName(tableName)
.withPartitionInputList(partitionInputs)));
long startTime = System.currentTimeMillis();
futures.add(glueClient.batchCreatePartitionAsync(
new BatchCreatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableName(tableName)
.withPartitionInputList(partitionInputs),
new StatsRecordingAsyncHandler(stats.getBatchCreatePartition(), startTime)));
}

for (Future<BatchCreatePartitionResult> future : futures) {
Expand Down Expand Up @@ -1023,7 +1027,7 @@ public void dropPartition(String databaseName, String tableName, List<String> pa
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(databaseName, tableName), parts));

try {
stats.getDropPartition().call(() ->
stats.getDeletePartition().call(() ->
glueClient.deletePartition(new DeletePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
Expand All @@ -1045,7 +1049,7 @@ public void alterPartition(String databaseName, String tableName, PartitionWithS
{
try {
PartitionInput newPartition = convertPartition(partition);
stats.getAlterPartition().call(() ->
stats.getUpdatePartition().call(() ->
glueClient.updatePartition(new UpdatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
Expand Down Expand Up @@ -1126,4 +1130,29 @@ public Set<HivePrivilegeInfo> listTablePrivileges(String databaseName, String ta
{
return ImmutableSet.of();
}

static class StatsRecordingAsyncHandler<Request extends AmazonWebServiceRequest, Result>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this belongs in GlueMetastoreApiStats

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change it as a followup :)

implements AsyncHandler<Request, Result>
{
private final GlueMetastoreApiStats stats;
private final long startTimeInMillis;

public StatsRecordingAsyncHandler(GlueMetastoreApiStats stats, long startTimeInMillis)
{
this.stats = requireNonNull(stats, "stats is null");
this.startTimeInMillis = startTimeInMillis;
}

@Override
public void onError(Exception e)
{
stats.recordCall(System.currentTimeMillis() - startTimeInMillis, true);
}

@Override
public void onSuccess(AmazonWebServiceRequest request, Object o)
{
stats.recordCall(System.currentTimeMillis() - startTimeInMillis, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ public CounterStat getTotalFailures()
return totalFailures;
}

public void recordCall(long executionTimeInMillis, boolean failure)
{
time.add(executionTimeInMillis, MILLISECONDS);
if (failure) {
totalFailures.update(1);
}
}

public interface ThrowingCallable<V, E extends Exception>
{
V call()
Expand Down
Loading