Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static io.trino.plugin.hive.aws.AwsCurrentRegionHolder.getCurrentRegionFromEC2Metadata;
import static io.trino.plugin.hive.metastore.MetastoreUtil.makePartitionName;
import static io.trino.plugin.hive.metastore.MetastoreUtil.toPartitionName;
import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyCanDropColumn;
import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults;
import static io.trino.plugin.hive.metastore.glue.converter.GlueInputConverter.convertPartition;
Expand All @@ -143,7 +144,6 @@
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.security.PrincipalType.USER;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.function.Predicate.not;
import static java.util.function.UnaryOperator.identity;
Expand All @@ -163,8 +163,7 @@ public class GlueHiveMetastore
private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100;
private static final int BATCH_UPDATE_PARTITION_MAX_PAGE_SIZE = 100;
private static final int AWS_GLUE_GET_PARTITIONS_MAX_RESULTS = 1000;
private static final Comparator<Partition> PARTITION_COMPARATOR =
comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER));
private static final Comparator<Iterable<String>> PARTITION_VALUE_COMPARATOR = lexicographical(String.CASE_INSENSITIVE_ORDER);

private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
Expand Down Expand Up @@ -798,29 +797,28 @@ public Optional<List<String>> getPartitionNamesByFilter(
if (partitionKeysFilter.isNone()) {
return Optional.of(ImmutableList.of());
}
Table table = getExistingTable(databaseName, tableName);
String expression = GlueExpressionUtil.buildGlueExpression(columnNames, partitionKeysFilter, assumeCanonicalPartitionKeys);
List<Partition> partitions = getPartitions(table, expression);
return Optional.of(buildPartitionNames(table.getPartitionColumns(), partitions));
List<List<String>> partitionValues = getPartitionValues(databaseName, tableName, expression);
return Optional.of(buildPartitionNames(columnNames, partitionValues));
}

private List<Partition> getPartitions(Table table, String expression)
private List<List<String>> getPartitionValues(String databaseName, String tableName, String expression)
{
if (partitionSegments == 1) {
return getPartitions(table, expression, null);
return getPartitionValues(databaseName, tableName, expression, null);
}

// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(partitionsReadExecutor);
CompletionService<List<List<String>>> completionService = new ExecutorCompletionService<>(partitionsReadExecutor);
for (int i = 0; i < partitionSegments; i++) {
Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
completionService.submit(() -> getPartitions(table, expression, segment));
completionService.submit(() -> getPartitionValues(databaseName, tableName, expression, segment));
}

List<Partition> partitions = new ArrayList<>();
List<List<String>> partitions = new ArrayList<>();
try {
for (int i = 0; i < partitionSegments; i++) {
Future<List<Partition>> futurePartitions = completionService.take();
Future<List<List<String>>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
}
Expand All @@ -831,40 +829,41 @@ private List<Partition> getPartitions(Table table, String expression)
throw new TrinoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
}

partitions.sort(PARTITION_COMPARATOR);
partitions.sort(PARTITION_VALUE_COMPARATOR);
return partitions;
}

private List<Partition> getPartitions(Table table, String expression, @Nullable Segment segment)
private List<List<String>> getPartitionValues(String databaseName, String tableName, String expression, @Nullable Segment segment)
{
try {
// Reuse immutable field instances opportunistically between partitions
GluePartitionConverter converter = new GluePartitionConverter(table);
List<Partition> partitions = getPaginatedResults(
return getPaginatedResults(
glueClient::getPartitions,
new GetPartitionsRequest()
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withDatabaseName(databaseName)
.withTableName(tableName)
.withExpression(expression)
.withSegment(segment)
// We are interested in the partition values and excluding column schema
// avoids the problem of a large response.
.withExcludeColumnSchema(true)
.withMaxResults(AWS_GLUE_GET_PARTITIONS_MAX_RESULTS),
GetPartitionsRequest::setNextToken,
GetPartitionsResult::getNextToken,
stats.getGetPartitions())
.map(GetPartitionsResult::getPartitions)
.flatMap(List::stream)
.map(converter)
.map(com.amazonaws.services.glue.model.Partition::getValues)
.collect(toImmutableList());
return partitions;
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
}

private static List<String> buildPartitionNames(List<Column> partitionColumns, List<Partition> partitions)
private static List<String> buildPartitionNames(List<String> partitionColumns, List<List<String>> partitions)
{
return mappedCopy(partitions, partition -> makePartitionName(partitionColumns, partition.getValues()));
return mappedCopy(partitions, partition -> toPartitionName(partitionColumns, partition));
}

/**
Expand Down