Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public void testBucketedTableValidation()
// Alluxio metastore does not support create operations
}

@Override
public void testBucketedTableEvolutionWithDifferentReadBucketCount()
{
// Alluxio metastore does not support create operations
}

@Override
public void testEmptyOrcFile()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,14 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
if (partition.getPartition().isPresent()) {
Optional<HiveBucketProperty> partitionBucketProperty = partition.getPartition().get().getStorage().getBucketProperty();
if (tableBucketInfo.isPresent() && partitionBucketProperty.isPresent()) {
int readBucketCount = tableBucketInfo.get().getReadBucketCount();
int tableBucketCount = tableBucketInfo.get().getTableBucketCount();
BucketingVersion bucketingVersion = partitionBucketProperty.get().getBucketingVersion(); // TODO can partition's bucketing_version be different from table's?
int partitionBucketCount = partitionBucketProperty.get().getBucketCount();
// Validation was done in HiveSplitManager#getPartitionMetadata.
// Here, it's just trying to see if its needs the BucketConversion.
if (readBucketCount != partitionBucketCount) {
bucketConversion = Optional.of(new BucketConversion(bucketingVersion, readBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns()));
if (readBucketCount > partitionBucketCount) {
if (tableBucketCount != partitionBucketCount) {
bucketConversion = Optional.of(new BucketConversion(bucketingVersion, tableBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns()));
if (tableBucketCount > partitionBucketCount) {
bucketConversionRequiresWorkerParticipation = true;
}
}
Expand Down Expand Up @@ -721,7 +721,7 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
transaction,
maxSplitFileSize);
return Optional.of(locatedFileStatuses.stream()
.map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), splittable, Optional.empty()))
.map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), OptionalInt.empty(), splittable, Optional.empty()))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator());
Expand All @@ -743,6 +743,7 @@ private Iterator<InternalHiveSplit> generateOriginalFilesSplits(
return splitFactory.createInternalHiveSplit(
(LocatedFileStatus) fileStatus,
OptionalInt.empty(),
OptionalInt.empty(),
splittable,
acidInfo);
})
Expand Down Expand Up @@ -778,7 +779,7 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
{
return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions))
.map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), splittable, acidInfo))
.map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), OptionalInt.empty(), splittable, acidInfo))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
Expand All @@ -797,6 +798,8 @@ private List<InternalHiveSplit> getBucketedSplits(
int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
int bucketCount = max(readBucketCount, partitionBucketCount);

checkState(readBucketCount <= tableBucketCount, "readBucketCount(%s) should be less than or equal to tableBucketCount(%s)", readBucketCount, tableBucketCount);

// build mapping of file name to bucket
ListMultimap<Integer, LocatedFileStatus> bucketFiles = ArrayListMultimap.create();
for (LocatedFileStatus file : files) {
Expand Down Expand Up @@ -840,19 +843,19 @@ private List<InternalHiveSplit> getBucketedSplits(
// Logical bucket #. Each logical bucket corresponds to a "bucket" from engine's perspective.
int readBucketNumber = bucketNumber % readBucketCount;

boolean containsEligibleTableBucket = false;
boolean containsIneligibleTableBucket = false;
List<Integer> eligibleTableBucketNumbers = new ArrayList<>();
for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) {
// table bucket number: this is used for evaluating "$bucket" filters.
if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) {
containsEligibleTableBucket = true;
eligibleTableBucketNumbers.add(tableBucketNumber);
}
else {
containsIneligibleTableBucket = true;
}
}

if (containsEligibleTableBucket && containsIneligibleTableBucket) {
if (!eligibleTableBucketNumbers.isEmpty() && containsIneligibleTableBucket) {
throw new TrinoException(
NOT_SUPPORTED,
"The bucket filter cannot be satisfied. There are restrictions on the bucket filter when all the following is true: " +
Expand All @@ -862,12 +865,13 @@ private List<InternalHiveSplit> getBucketedSplits(
"(table name: " + table.getTableName() + ", table bucket count: " + tableBucketCount + ", " +
"partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")");
}
if (containsEligibleTableBucket) {
if (!eligibleTableBucketNumbers.isEmpty()) {
for (LocatedFileStatus file : bucketFiles.get(partitionBucketNumber)) {
// OrcDeletedRows will load only delete delta files matching current bucket id,
// so we can pass all delete delta locations here, without filtering.
splitFactory.createInternalHiveSplit(file, OptionalInt.of(readBucketNumber), splittable, acidInfo)
.ifPresent(splitList::add);
eligibleTableBucketNumbers.stream()
.map(tableBucketNumber -> splitFactory.createInternalHiveSplit(file, OptionalInt.of(readBucketNumber), OptionalInt.of(tableBucketNumber), splittable, acidInfo))
.forEach(optionalSplit -> optionalSplit.ifPresent(splitList::add));
}
}
}
Expand Down Expand Up @@ -991,11 +995,6 @@ public static Optional<BucketSplitInfo> createBucketSplitInfo(Optional<HiveBucke
int tableBucketCount = bucketHandle.get().getTableBucketCount();
int readBucketCount = bucketHandle.get().getReadBucketCount();

if (tableBucketCount != readBucketCount && bucketFilter.isPresent()) {
// TODO: remove when supported
throw new TrinoException(NOT_SUPPORTED, "Filter on \"$bucket\" is not supported when the table has partitions with different bucket counts");
}

List<HiveColumnHandle> bucketColumns = bucketHandle.get().getColumns();
IntPredicate predicate = bucketFilter
.<IntPredicate>map(filter -> filter.getBucketsToKeep()::contains)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
{
return value -> ((HiveSplit) value).getBucketNumber()
return value -> ((HiveSplit) value).getReadBucketNumber()
.orElseThrow(() -> new IllegalArgumentException("Bucket number not set in split"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
path,
hiveSplit.getBucketNumber(),
hiveSplit.getTableBucketNumber(),
hiveSplit.getEstimatedFileSize(),
hiveSplit.getFileModifiedTime());

Expand All @@ -198,7 +198,7 @@ public ConnectorPageSource createPageSource(
configuration,
session,
path,
hiveSplit.getBucketNumber(),
hiveSplit.getTableBucketNumber(),
hiveSplit.getStart(),
hiveSplit.getLength(),
hiveSplit.getEstimatedFileSize(),
Expand Down Expand Up @@ -235,7 +235,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getStatementId(),
source,
typeManager,
hiveSplit.getBucketNumber(),
hiveSplit.getTableBucketNumber(),
path,
originalFile,
orcFileWriterFactory.get(),
Expand All @@ -259,7 +259,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Configuration configuration,
ConnectorSession session,
Path path,
OptionalInt bucketNumber,
OptionalInt tableBucketNumber,
long start,
long length,
long estimatedFileSize,
Expand All @@ -281,8 +281,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(

List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);

Optional<BucketAdaptation> bucketAdaptation = createBucketAdaptation(bucketConversion, bucketNumber, regularAndInterimColumnMappings);
Optional<BucketValidator> bucketValidator = createBucketValidator(path, bucketValidation, bucketNumber, regularAndInterimColumnMappings);
Optional<BucketAdaptation> bucketAdaptation = createBucketAdaptation(bucketConversion, tableBucketNumber, regularAndInterimColumnMappings);
Optional<BucketValidator> bucketValidator = createBucketValidator(path, bucketValidation, tableBucketNumber, regularAndInterimColumnMappings);

for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) {
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager);
Expand All @@ -298,7 +298,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
desiredColumns,
effectivePredicate,
acidInfo,
bucketNumber,
tableBucketNumber,
originalFile,
transaction);

Expand Down Expand Up @@ -386,11 +386,11 @@ public static Optional<ConnectorPageSource> createHivePageSource(

private static boolean shouldSkipBucket(HiveTableHandle hiveTable, HiveSplit hiveSplit, DynamicFilter dynamicFilter)
{
if (hiveSplit.getBucketNumber().isEmpty()) {
if (hiveSplit.getTableBucketNumber().isEmpty()) {
return false;
}
Optional<HiveBucketFilter> hiveBucketFilter = getHiveBucketFilter(hiveTable, dynamicFilter.getCurrentPredicate());
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getBucketNumber().getAsInt())).orElse(false);
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getTableBucketNumber().getAsInt())).orElse(false);
}

private static boolean shouldSkipSplit(List<ColumnMapping> columnMappings, DynamicFilter dynamicFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class HiveSplit
private final String database;
private final String table;
private final String partitionName;
private final OptionalInt bucketNumber;
private final OptionalInt readBucketNumber;
private final OptionalInt tableBucketNumber;
private final int statementId;
private final boolean forceLocalScheduling;
private final TableToPartitionMapping tableToPartitionMapping;
Expand All @@ -76,7 +77,8 @@ public HiveSplit(
@JsonProperty("schema") Properties schema,
@JsonProperty("partitionKeys") List<HivePartitionKey> partitionKeys,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("readBucketNumber") OptionalInt readBucketNumber,
@JsonProperty("tableBucketNumber") OptionalInt tableBucketNumber,
@JsonProperty("statementId") int statementId,
@JsonProperty("forceLocalScheduling") boolean forceLocalScheduling,
@JsonProperty("tableToPartitionMapping") TableToPartitionMapping tableToPartitionMapping,
Expand All @@ -97,7 +99,8 @@ public HiveSplit(
requireNonNull(schema, "schema is null");
requireNonNull(partitionKeys, "partitionKeys is null");
requireNonNull(addresses, "addresses is null");
requireNonNull(bucketNumber, "bucketNumber is null");
requireNonNull(readBucketNumber, "readBucketNumber is null");
requireNonNull(tableBucketNumber, "tableBucketNumber is null");
requireNonNull(tableToPartitionMapping, "tableToPartitionMapping is null");
requireNonNull(bucketConversion, "bucketConversion is null");
requireNonNull(bucketValidation, "bucketValidation is null");
Expand All @@ -114,7 +117,8 @@ public HiveSplit(
this.schema = schema;
this.partitionKeys = ImmutableList.copyOf(partitionKeys);
this.addresses = ImmutableList.copyOf(addresses);
this.bucketNumber = bucketNumber;
this.readBucketNumber = readBucketNumber;
this.tableBucketNumber = tableBucketNumber;
this.statementId = statementId;
this.forceLocalScheduling = forceLocalScheduling;
this.tableToPartitionMapping = tableToPartitionMapping;
Expand Down Expand Up @@ -194,9 +198,15 @@ public List<HostAddress> getAddresses()
}

@JsonProperty
public OptionalInt getBucketNumber()
public OptionalInt getReadBucketNumber()
{
return bucketNumber;
return readBucketNumber;
}

@JsonProperty
public OptionalInt getTableBucketNumber()
{
return tableBucketNumber;
}

@JsonProperty
Expand Down Expand Up @@ -271,7 +281,8 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(database)
+ estimatedSizeOf(table)
+ estimatedSizeOf(partitionName)
+ sizeOf(bucketNumber)
+ sizeOf(readBucketNumber)
+ sizeOf(tableBucketNumber)
+ tableToPartitionMapping.getEstimatedSizeInBytes()
+ sizeOf(bucketConversion, BucketConversion::getRetainedSizeInBytes)
+ sizeOf(bucketValidation, BucketValidation::getRetainedSizeInBytes)
Expand Down Expand Up @@ -317,7 +328,7 @@ public static class BucketConversion
private final int tableBucketCount;
private final int partitionBucketCount;
private final List<HiveColumnHandle> bucketColumnNames;
// bucketNumber is needed, but can be found in bucketNumber field of HiveSplit.
// tableBucketNumber is needed, but can be found in tableBucketNumber field of HiveSplit.

@JsonCreator
public BucketConversion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ public ConnectorSplitSource getSplits(
// sort partitions
partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions);

if (bucketHandle.isPresent()) {
if (bucketHandle.get().getReadBucketCount() > bucketHandle.get().getTableBucketCount()) {
throw new TrinoException(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please open a follow up PR dropping the HivePartitioningHandle#maxCompatibleBucketCount. Currently it is effectively unused as number of read buckets higher than number of table buckets is no longer supported.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

GENERIC_INTERNAL_ERROR,
"readBucketCount (%s) is greater than the tableBucketCount (%s) which generally points to an issue in plan generation");
}
}

Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(session, metastore, table, tableName, partitions, bucketHandle.map(HiveBucketHandle::toTableBucketProperty));

// Only one thread per partition is usable when a table is not transactional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ ListenableFuture<Void> addToQueue(InternalHiveSplit split)
databaseName, tableName, succinctBytes(maxOutstandingSplitsBytes), getBufferedInternalSplitCount()));
}
bufferedInternalSplitCount.incrementAndGet();
OptionalInt bucketNumber = split.getBucketNumber();
OptionalInt bucketNumber = split.getReadBucketNumber();
return queues.offer(bucketNumber, split);
}

Expand Down Expand Up @@ -400,7 +400,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getSchema(),
internalSplit.getPartitionKeys(),
block.getAddresses(),
internalSplit.getBucketNumber(),
internalSplit.getReadBucketNumber(),
internalSplit.getTableBucketNumber(),
internalSplit.getStatementId(),
internalSplit.isForceLocalScheduling(),
internalSplit.getTableToPartitionMapping(),
Expand Down
Loading