From 9adf088159ff048e3fe17ab0e1169e15bc77fcfe Mon Sep 17 00:00:00 2001 From: Chuho Chang Date: Fri, 8 Apr 2022 14:37:48 -0700 Subject: [PATCH] Fix incorrect $bucket for mismatch bucket queries --- .../plugin/hive/TestHiveAlluxioMetastore.java | 6 ++ .../hive/BackgroundHiveSplitLoader.java | 33 ++++---- .../hive/HiveNodePartitioningProvider.java | 2 +- .../plugin/hive/HivePageSourceProvider.java | 18 ++--- .../java/io/trino/plugin/hive/HiveSplit.java | 27 +++++-- .../trino/plugin/hive/HiveSplitManager.java | 8 ++ .../io/trino/plugin/hive/HiveSplitSource.java | 5 +- .../trino/plugin/hive/InternalHiveSplit.java | 21 +++-- .../hive/util/InternalHiveSplitFactory.java | 14 ++-- .../trino/plugin/hive/AbstractTestHive.java | 77 +++++++++++++++++++ .../plugin/hive/BaseHiveConnectorTest.java | 40 ++++++++++ .../plugin/hive/TestHiveFileMetastore.java | 6 ++ .../trino/plugin/hive/TestHivePageSink.java | 1 + .../io/trino/plugin/hive/TestHiveSplit.java | 1 + .../plugin/hive/TestHiveSplitSource.java | 1 + .../TestNodeLocalDynamicSplitPruning.java | 1 + .../hive/benchmark/AbstractFileFormat.java | 1 + 17 files changed, 214 insertions(+), 48 deletions(-) diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java index 9a0552778002..f796a47a4b26 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java @@ -95,6 +95,12 @@ public void testBucketedTableValidation() // Alluxio metastore does not support create operations } + @Override + public void testBucketedTableEvolutionWithDifferentReadBucketCount() + { + // Alluxio metastore does not support create operations + } + @Override public void testEmptyOrcFile() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index b98b4510333d..f07b750fc91e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -438,14 +438,14 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) if (partition.getPartition().isPresent()) { Optional partitionBucketProperty = partition.getPartition().get().getStorage().getBucketProperty(); if (tableBucketInfo.isPresent() && partitionBucketProperty.isPresent()) { - int readBucketCount = tableBucketInfo.get().getReadBucketCount(); + int tableBucketCount = tableBucketInfo.get().getTableBucketCount(); BucketingVersion bucketingVersion = partitionBucketProperty.get().getBucketingVersion(); // TODO can partition's bucketing_version be different from table's? int partitionBucketCount = partitionBucketProperty.get().getBucketCount(); // Validation was done in HiveSplitManager#getPartitionMetadata. // Here, it's just trying to see if its needs the BucketConversion. - if (readBucketCount != partitionBucketCount) { - bucketConversion = Optional.of(new BucketConversion(bucketingVersion, readBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns())); - if (readBucketCount > partitionBucketCount) { + if (tableBucketCount != partitionBucketCount) { + bucketConversion = Optional.of(new BucketConversion(bucketingVersion, tableBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns())); + if (tableBucketCount > partitionBucketCount) { bucketConversionRequiresWorkerParticipation = true; } } @@ -721,7 +721,7 @@ Optional> buildManifestFileIterator( transaction, maxSplitFileSize); return Optional.of(locatedFileStatuses.stream() - .map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), splittable, Optional.empty())) + .map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), OptionalInt.empty(), splittable, Optional.empty())) .filter(Optional::isPresent) .map(Optional::get) .iterator()); @@ -743,6 +743,7 @@ private Iterator generateOriginalFilesSplits( return splitFactory.createInternalHiveSplit( (LocatedFileStatus) fileStatus, OptionalInt.empty(), + OptionalInt.empty(), splittable, acidInfo); }) @@ -778,7 +779,7 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat inpu private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo) { return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions)) - .map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), splittable, acidInfo)) + .map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), OptionalInt.empty(), splittable, acidInfo)) .filter(Optional::isPresent) .map(Optional::get) .iterator(); @@ -797,6 +798,8 @@ private List getBucketedSplits( int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount); int bucketCount = max(readBucketCount, partitionBucketCount); + checkState(readBucketCount <= tableBucketCount, "readBucketCount(%s) should be less than or equal to tableBucketCount(%s)", readBucketCount, tableBucketCount); + // build mapping of file name to bucket ListMultimap bucketFiles = ArrayListMultimap.create(); for (LocatedFileStatus file : files) { @@ -840,19 +843,19 @@ private List getBucketedSplits( // Logical bucket #. Each logical bucket corresponds to a "bucket" from engine's perspective. int readBucketNumber = bucketNumber % readBucketCount; - boolean containsEligibleTableBucket = false; boolean containsIneligibleTableBucket = false; + List eligibleTableBucketNumbers = new ArrayList<>(); for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) { // table bucket number: this is used for evaluating "$bucket" filters. if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) { - containsEligibleTableBucket = true; + eligibleTableBucketNumbers.add(tableBucketNumber); } else { containsIneligibleTableBucket = true; } } - if (containsEligibleTableBucket && containsIneligibleTableBucket) { + if (!eligibleTableBucketNumbers.isEmpty() && containsIneligibleTableBucket) { throw new TrinoException( NOT_SUPPORTED, "The bucket filter cannot be satisfied. There are restrictions on the bucket filter when all the following is true: " + @@ -862,12 +865,13 @@ private List getBucketedSplits( "(table name: " + table.getTableName() + ", table bucket count: " + tableBucketCount + ", " + "partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")"); } - if (containsEligibleTableBucket) { + if (!eligibleTableBucketNumbers.isEmpty()) { for (LocatedFileStatus file : bucketFiles.get(partitionBucketNumber)) { // OrcDeletedRows will load only delete delta files matching current bucket id, // so we can pass all delete delta locations here, without filtering. - splitFactory.createInternalHiveSplit(file, OptionalInt.of(readBucketNumber), splittable, acidInfo) - .ifPresent(splitList::add); + eligibleTableBucketNumbers.stream() + .map(tableBucketNumber -> splitFactory.createInternalHiveSplit(file, OptionalInt.of(readBucketNumber), OptionalInt.of(tableBucketNumber), splittable, acidInfo)) + .forEach(optionalSplit -> optionalSplit.ifPresent(splitList::add)); } } } @@ -991,11 +995,6 @@ public static Optional createBucketSplitInfo(Optional bucketColumns = bucketHandle.get().getColumns(); IntPredicate predicate = bucketFilter .map(filter -> filter.getBucketsToKeep()::contains) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveNodePartitioningProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveNodePartitioningProvider.java index db23026d5d29..52ad45a69093 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveNodePartitioningProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveNodePartitioningProvider.java @@ -122,7 +122,7 @@ public ToIntFunction getSplitBucketFunction( ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) { - return value -> ((HiveSplit) value).getBucketNumber() + return value -> ((HiveSplit) value).getReadBucketNumber() .orElseThrow(() -> new IllegalArgumentException("Bucket number not set in split")); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java index ef9a3bdba25f..97626ecd3521 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java @@ -177,7 +177,7 @@ public ConnectorPageSource createPageSource( hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()), hiveSplit.getTableToPartitionMapping(), path, - hiveSplit.getBucketNumber(), + hiveSplit.getTableBucketNumber(), hiveSplit.getEstimatedFileSize(), hiveSplit.getFileModifiedTime()); @@ -198,7 +198,7 @@ public ConnectorPageSource createPageSource( configuration, session, path, - hiveSplit.getBucketNumber(), + hiveSplit.getTableBucketNumber(), hiveSplit.getStart(), hiveSplit.getLength(), hiveSplit.getEstimatedFileSize(), @@ -235,7 +235,7 @@ public ConnectorPageSource createPageSource( hiveSplit.getStatementId(), source, typeManager, - hiveSplit.getBucketNumber(), + hiveSplit.getTableBucketNumber(), path, originalFile, orcFileWriterFactory.get(), @@ -259,7 +259,7 @@ public static Optional createHivePageSource( Configuration configuration, ConnectorSession session, Path path, - OptionalInt bucketNumber, + OptionalInt tableBucketNumber, long start, long length, long estimatedFileSize, @@ -281,8 +281,8 @@ public static Optional createHivePageSource( List regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings); - Optional bucketAdaptation = createBucketAdaptation(bucketConversion, bucketNumber, regularAndInterimColumnMappings); - Optional bucketValidator = createBucketValidator(path, bucketValidation, bucketNumber, regularAndInterimColumnMappings); + Optional bucketAdaptation = createBucketAdaptation(bucketConversion, tableBucketNumber, regularAndInterimColumnMappings); + Optional bucketValidator = createBucketValidator(path, bucketValidation, tableBucketNumber, regularAndInterimColumnMappings); for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) { List desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager); @@ -298,7 +298,7 @@ public static Optional createHivePageSource( desiredColumns, effectivePredicate, acidInfo, - bucketNumber, + tableBucketNumber, originalFile, transaction); @@ -386,11 +386,11 @@ public static Optional createHivePageSource( private static boolean shouldSkipBucket(HiveTableHandle hiveTable, HiveSplit hiveSplit, DynamicFilter dynamicFilter) { - if (hiveSplit.getBucketNumber().isEmpty()) { + if (hiveSplit.getTableBucketNumber().isEmpty()) { return false; } Optional hiveBucketFilter = getHiveBucketFilter(hiveTable, dynamicFilter.getCurrentPredicate()); - return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getBucketNumber().getAsInt())).orElse(false); + return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getTableBucketNumber().getAsInt())).orElse(false); } private static boolean shouldSkipSplit(List columnMappings, DynamicFilter dynamicFilter) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java index a0d02cdd4f1a..fa9797f85e0e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java @@ -52,7 +52,8 @@ public class HiveSplit private final String database; private final String table; private final String partitionName; - private final OptionalInt bucketNumber; + private final OptionalInt readBucketNumber; + private final OptionalInt tableBucketNumber; private final int statementId; private final boolean forceLocalScheduling; private final TableToPartitionMapping tableToPartitionMapping; @@ -76,7 +77,8 @@ public HiveSplit( @JsonProperty("schema") Properties schema, @JsonProperty("partitionKeys") List partitionKeys, @JsonProperty("addresses") List addresses, - @JsonProperty("bucketNumber") OptionalInt bucketNumber, + @JsonProperty("readBucketNumber") OptionalInt readBucketNumber, + @JsonProperty("tableBucketNumber") OptionalInt tableBucketNumber, @JsonProperty("statementId") int statementId, @JsonProperty("forceLocalScheduling") boolean forceLocalScheduling, @JsonProperty("tableToPartitionMapping") TableToPartitionMapping tableToPartitionMapping, @@ -97,7 +99,8 @@ public HiveSplit( requireNonNull(schema, "schema is null"); requireNonNull(partitionKeys, "partitionKeys is null"); requireNonNull(addresses, "addresses is null"); - requireNonNull(bucketNumber, "bucketNumber is null"); + requireNonNull(readBucketNumber, "readBucketNumber is null"); + requireNonNull(tableBucketNumber, "tableBucketNumber is null"); requireNonNull(tableToPartitionMapping, "tableToPartitionMapping is null"); requireNonNull(bucketConversion, "bucketConversion is null"); requireNonNull(bucketValidation, "bucketValidation is null"); @@ -114,7 +117,8 @@ public HiveSplit( this.schema = schema; this.partitionKeys = ImmutableList.copyOf(partitionKeys); this.addresses = ImmutableList.copyOf(addresses); - this.bucketNumber = bucketNumber; + this.readBucketNumber = readBucketNumber; + this.tableBucketNumber = tableBucketNumber; this.statementId = statementId; this.forceLocalScheduling = forceLocalScheduling; this.tableToPartitionMapping = tableToPartitionMapping; @@ -194,9 +198,15 @@ public List getAddresses() } @JsonProperty - public OptionalInt getBucketNumber() + public OptionalInt getReadBucketNumber() { - return bucketNumber; + return readBucketNumber; + } + + @JsonProperty + public OptionalInt getTableBucketNumber() + { + return tableBucketNumber; } @JsonProperty @@ -271,7 +281,8 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(database) + estimatedSizeOf(table) + estimatedSizeOf(partitionName) - + sizeOf(bucketNumber) + + sizeOf(readBucketNumber) + + sizeOf(tableBucketNumber) + tableToPartitionMapping.getEstimatedSizeInBytes() + sizeOf(bucketConversion, BucketConversion::getRetainedSizeInBytes) + sizeOf(bucketValidation, BucketValidation::getRetainedSizeInBytes) @@ -317,7 +328,7 @@ public static class BucketConversion private final int tableBucketCount; private final int partitionBucketCount; private final List bucketColumnNames; - // bucketNumber is needed, but can be found in bucketNumber field of HiveSplit. + // tableBucketNumber is needed, but can be found in tableBucketNumber field of HiveSplit. @JsonCreator public BucketConversion( diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index e0950b21917e..a1d9d8909c84 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -219,6 +219,14 @@ public ConnectorSplitSource getSplits( // sort partitions partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions); + if (bucketHandle.isPresent()) { + if (bucketHandle.get().getReadBucketCount() > bucketHandle.get().getTableBucketCount()) { + throw new TrinoException( + GENERIC_INTERNAL_ERROR, + "readBucketCount (%s) is greater than the tableBucketCount (%s) which generally points to an issue in plan generation"); + } + } + Iterable hivePartitions = getPartitionMetadata(session, metastore, table, tableName, partitions, bucketHandle.map(HiveBucketHandle::toTableBucketProperty)); // Only one thread per partition is usable when a table is not transactional diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 20cf6b00b916..5ab32345f6f7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -299,7 +299,7 @@ ListenableFuture addToQueue(InternalHiveSplit split) databaseName, tableName, succinctBytes(maxOutstandingSplitsBytes), getBufferedInternalSplitCount())); } bufferedInternalSplitCount.incrementAndGet(); - OptionalInt bucketNumber = split.getBucketNumber(); + OptionalInt bucketNumber = split.getReadBucketNumber(); return queues.offer(bucketNumber, split); } @@ -400,7 +400,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { internalSplit.getSchema(), internalSplit.getPartitionKeys(), block.getAddresses(), - internalSplit.getBucketNumber(), + internalSplit.getReadBucketNumber(), + internalSplit.getTableBucketNumber(), internalSplit.getStatementId(), internalSplit.isForceLocalScheduling(), internalSplit.getTableToPartitionMapping(), diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java index c9fe51db81a0..edd47f67c40b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java @@ -53,7 +53,8 @@ public class InternalHiveSplit private final List partitionKeys; private final List blocks; private final String partitionName; - private final OptionalInt bucketNumber; + private final OptionalInt readBucketNumber; + private final OptionalInt tableBucketNumber; // This supplier returns an unused statementId, to guarantee that created split // files do not collide. Successive calls return the the next sequential integer, // starting with zero. @@ -81,7 +82,8 @@ public InternalHiveSplit( Properties schema, List partitionKeys, List blocks, - OptionalInt bucketNumber, + OptionalInt readBucketNumber, + OptionalInt tableBucketNumber, Supplier statementIdSupplier, boolean splittable, boolean forceLocalScheduling, @@ -100,7 +102,8 @@ public InternalHiveSplit( requireNonNull(schema, "schema is null"); requireNonNull(partitionKeys, "partitionKeys is null"); requireNonNull(blocks, "blocks is null"); - requireNonNull(bucketNumber, "bucketNumber is null"); + requireNonNull(readBucketNumber, "readBucketNumber is null"); + requireNonNull(tableBucketNumber, "tableBucketNumber is null"); requireNonNull(statementIdSupplier, "statementIdSupplier is null"); requireNonNull(tableToPartitionMapping, "tableToPartitionMapping is null"); requireNonNull(bucketConversion, "bucketConversion is null"); @@ -117,7 +120,8 @@ public InternalHiveSplit( this.schema = schema; this.partitionKeys = ImmutableList.copyOf(partitionKeys); this.blocks = ImmutableList.copyOf(blocks); - this.bucketNumber = bucketNumber; + this.readBucketNumber = readBucketNumber; + this.tableBucketNumber = tableBucketNumber; this.statementIdSupplier = statementIdSupplier; this.statementId = statementIdSupplier.get(); this.splittable = splittable; @@ -175,9 +179,14 @@ public String getPartitionName() return partitionName; } - public OptionalInt getBucketNumber() + public OptionalInt getReadBucketNumber() { - return bucketNumber; + return readBucketNumber; + } + + public OptionalInt getTableBucketNumber() + { + return tableBucketNumber; } public int getStatementId() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java index d90a1f431b19..712c423f1ccb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java @@ -113,7 +113,7 @@ public String getPartitionName() return partitionName; } - public Optional createInternalHiveSplit(LocatedFileStatus status, OptionalInt bucketNumber, boolean splittable, Optional acidInfo) + public Optional createInternalHiveSplit(LocatedFileStatus status, OptionalInt readBucketNumber, OptionalInt tableBucketNumber, boolean splittable, Optional acidInfo) { splittable = splittable && status.getLen() > minimumTargetSplitSizeInBytes && @@ -125,7 +125,8 @@ public Optional createInternalHiveSplit(LocatedFileStatus sta status.getLen(), status.getLen(), status.getModificationTime(), - bucketNumber, + readBucketNumber, + tableBucketNumber, splittable, acidInfo); } @@ -142,6 +143,7 @@ public Optional createInternalHiveSplit(FileSplit split) file.getLen(), file.getModificationTime(), OptionalInt.empty(), + OptionalInt.empty(), false, Optional.empty()); } @@ -154,7 +156,8 @@ private Optional createInternalHiveSplit( // Estimated because, for example, encrypted S3 files may be padded, so reported size may not reflect actual size long estimatedFileSize, long fileModificationTime, - OptionalInt bucketNumber, + OptionalInt readBucketNumber, + OptionalInt tableBucketNumber, boolean splittable, Optional acidInfo) { @@ -201,7 +204,7 @@ private Optional createInternalHiveSplit( blocks = ImmutableList.of(new InternalHiveBlock(start, start + length, blocks.get(0).getAddresses())); } - int bucketNumberIndex = bucketNumber.orElse(0); + int bucketNumberIndex = readBucketNumber.orElse(0); return Optional.of(new InternalHiveSplit( partitionName, pathString, @@ -212,7 +215,8 @@ private Optional createInternalHiveSplit( schema, partitionKeys, blocks, - bucketNumber, + readBucketNumber, + tableBucketNumber, () -> bucketStatementCounters.computeIfAbsent(bucketNumberIndex, index -> new AtomicInteger()).getAndIncrement(), splittable, forceLocalScheduling && allBlocksHaveAddress(blocks), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 60eb2b56dc58..0a19ab709e12 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -1727,6 +1727,83 @@ public void testBucketedTableDoubleFloat() } } + @Test + public void testBucketedTableEvolutionWithDifferentReadBucketCount() + throws Exception + { + for (HiveStorageFormat storageFormat : createTableFormats) { + SchemaTableName temporaryBucketEvolutionTable = temporaryTable("bucket_evolution"); + try { + doTestBucketedTableEvolutionWithDifferentReadCount(storageFormat, temporaryBucketEvolutionTable); + } + finally { + dropTable(temporaryBucketEvolutionTable); + } + } + } + + private void doTestBucketedTableEvolutionWithDifferentReadCount(HiveStorageFormat storageFormat, SchemaTableName tableName) + throws Exception + { + int rowCount = 100; + int bucketCount = 16; + + // Produce a table with a partition with bucket count different but compatible with the table bucket count + createEmptyTable( + tableName, + storageFormat, + ImmutableList.of( + new Column("id", HIVE_LONG, Optional.empty()), + new Column("name", HIVE_STRING, Optional.empty())), + ImmutableList.of(new Column("pk", HIVE_STRING, Optional.empty())), + Optional.of(new HiveBucketProperty(ImmutableList.of("id"), BUCKETING_V1, 4, ImmutableList.of()))); + // write a 4-bucket partition + MaterializedResult.Builder bucket8Builder = MaterializedResult.resultBuilder(SESSION, BIGINT, VARCHAR, VARCHAR); + IntStream.range(0, rowCount).forEach(i -> bucket8Builder.row((long) i, String.valueOf(i), "four")); + insertData(tableName, bucket8Builder.build()); + + // Alter the bucket count to 16 + alterBucketProperty(tableName, Optional.of(new HiveBucketProperty(ImmutableList.of("id"), BUCKETING_V1, bucketCount, ImmutableList.of()))); + + MaterializedResult result; + try (Transaction transaction = newTransaction()) { + ConnectorMetadata metadata = transaction.getMetadata(); + ConnectorSession session = newSession(); + metadata.beginQuery(session); + + ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName); + + // read entire table + List columnHandles = ImmutableList.builder() + .addAll(metadata.getColumnHandles(session, tableHandle).values()) + .build(); + + List splits = getAllSplits(getSplits(splitManager, transaction, session, tableHandle)); + assertEquals(splits.size(), 16); + + ImmutableList.Builder allRows = ImmutableList.builder(); + for (ConnectorSplit split : splits) { + try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle, columnHandles, DynamicFilter.EMPTY)) { + MaterializedResult intermediateResult = materializeSourceDataStream(session, pageSource, getTypes(columnHandles)); + allRows.addAll(intermediateResult.getMaterializedRows()); + } + } + result = new MaterializedResult(allRows.build(), getTypes(columnHandles)); + + assertEquals(result.getRowCount(), rowCount); + + Map columnIndex = indexColumns(columnHandles); + int nameColumnIndex = columnIndex.get("name"); + int bucketColumnIndex = columnIndex.get(BUCKET_COLUMN_NAME); + for (MaterializedRow row : result.getMaterializedRows()) { + String name = (String) row.getField(nameColumnIndex); + int bucket = (int) row.getField(bucketColumnIndex); + + assertEquals(bucket, Integer.parseInt(name) % bucketCount); + } + } + } + @Test public void testBucketedTableEvolution() throws Exception diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 5c72f82aa21a..081e8ad588ec 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -4625,6 +4625,46 @@ public void testPredicatePushDownToTableScan() } } + @Test + public void testMismatchedBucketWithBucketPredicate() + { + assertUpdate("DROP TABLE IF EXISTS test_mismatch_bucketing8"); + assertUpdate("DROP TABLE IF EXISTS test_mismatch_bucketing32"); + + assertUpdate( + "CREATE TABLE test_mismatch_bucketing8 " + + "WITH (bucket_count = 8, bucketed_by = ARRAY['key8']) AS " + + "SELECT nationkey key8, comment value8 FROM nation", + 25); + assertUpdate( + "CREATE TABLE test_mismatch_bucketing32 " + + "WITH (bucket_count = 32, bucketed_by = ARRAY['key32']) AS " + + "SELECT nationkey key32, comment value32 FROM nation", + 25); + + Session withMismatchOptimization = Session.builder(getSession()) + .setCatalogSessionProperty(catalog, "optimize_mismatched_bucket_count", "true") + .build(); + Session withoutMismatchOptimization = Session.builder(getSession()) + .setCatalogSessionProperty(catalog, "optimize_mismatched_bucket_count", "false") + .build(); + + @Language("SQL") String query = "SELECT count(*) AS count " + + "FROM (" + + " SELECT key32" + + " FROM test_mismatch_bucketing32" + + " WHERE \"$bucket\" between 16 AND 31" + + ") a " + + "JOIN test_mismatch_bucketing8 b " + + "ON a.key32 = b.key8"; + + assertQuery(withMismatchOptimization, query, "SELECT 9"); + assertQuery(withoutMismatchOptimization, query, "SELECT 9"); + + assertUpdate("DROP TABLE IF EXISTS test_mismatch_bucketing8"); + assertUpdate("DROP TABLE IF EXISTS test_mismatch_bucketing32"); + } + @DataProvider public Object[][] timestampPrecisionAndValues() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileMetastore.java index bc265a5c1b58..a22db0ba6e24 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileMetastore.java @@ -71,6 +71,12 @@ public void testBucketedTableEvolution() // FileHiveMetastore only supports replaceTable() for views } + @Override + public void testBucketedTableEvolutionWithDifferentReadBucketCount() + { + // FileHiveMetastore has various incompatibilities + } + @Override public void testTransactionDeleteInsert() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index 27e436ac033a..9134bc3f4378 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -236,6 +236,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa ImmutableList.of(), ImmutableList.of(), OptionalInt.empty(), + OptionalInt.empty(), 0, false, TableToPartitionMapping.empty(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java index fdef347215a3..161e4874383a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java @@ -72,6 +72,7 @@ public void testJsonRoundTrip() partitionKeys, addresses, OptionalInt.empty(), + OptionalInt.empty(), 0, true, TableToPartitionMapping.mapColumnsByIndex(ImmutableMap.of(1, new HiveTypeName("string"))), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java index 048988ef5054..1f1a393fbbd3 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java @@ -391,6 +391,7 @@ private TestSplit(int id, OptionalInt bucketNumber, DataSize fileSize, BooleanSu ImmutableList.of(), ImmutableList.of(new InternalHiveBlock(0, fileSize.toBytes(), ImmutableList.of())), bucketNumber, + bucketNumber, () -> 0, true, false, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java index 16d14c62592e..37a7cf4978b8 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java @@ -129,6 +129,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle ImmutableList.of(new HivePartitionKey(PARTITION_COLUMN.getName(), "42")), ImmutableList.of(), OptionalInt.of(1), + OptionalInt.of(1), 0, false, TableToPartitionMapping.empty(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java index a12cb0d6a861..f1bb8d2e1878 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java @@ -147,6 +147,7 @@ public ConnectorPageSource createGenericReader( ImmutableList.of(), ImmutableList.of(), OptionalInt.empty(), + OptionalInt.empty(), 0, false, TableToPartitionMapping.empty(),