diff --git a/presto-docs/src/main/sphinx/connector/hive.rst b/presto-docs/src/main/sphinx/connector/hive.rst index 7b33d6199f338..d9fa6b1ca1e3a 100644 --- a/presto-docs/src/main/sphinx/connector/hive.rst +++ b/presto-docs/src/main/sphinx/connector/hive.rst @@ -143,6 +143,8 @@ Property Name Description ``hive.immutable-partitions`` Can new data be inserted into existing partitions? ``false`` +``hive.create-empty-bucket-files`` Should empty files be created for buckets that have no data? ``false`` + ``hive.max-partitions-per-writers`` Maximum number of partitions per writer. 100 ``hive.max-partitions-per-scan`` Maximum number of partitions for a single table scan. 100,000 diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Partition.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Partition.java index 00902787e2b2a..72e6f848fad72 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Partition.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Partition.java @@ -13,7 +13,9 @@ */ package com.facebook.presto.hive.metastore; +import com.facebook.presto.spi.SchemaTableName; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -77,6 +79,12 @@ public String getTableName() return tableName; } + @JsonIgnore + public SchemaTableName getSchemaTableName() + { + return new SchemaTableName(databaseName, tableName); + } + @JsonProperty public List getValues() { diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java index ce153f108efc1..098c7de4fbc6c 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java @@ -92,6 +92,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.Futures.whenAllSucceed; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.lang.String.format; @@ -211,7 +212,7 @@ public synchronized Map getPartitionStatistics(Meta return ImmutableMap.of(); } TableSource tableSource = getTableSource(databaseName, tableName); - Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); + Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(table.get().getSchemaTableName(), k -> new HashMap<>()); ImmutableSet.Builder partitionNamesToQuery = ImmutableSet.builder(); ImmutableMap.Builder resultBuilder = ImmutableMap.builder(); for (String partitionName : partitionNames) { @@ -373,12 +374,11 @@ public synchronized void createTable( setShared(); // When creating a table, it should never have partition actions. This is just a sanity check. checkNoPartitionAction(table.getDatabaseName(), table.getTableName()); - SchemaTableName schemaTableName = new SchemaTableName(table.getDatabaseName(), table.getTableName()); - Action oldTableAction = tableActions.get(schemaTableName); + Action oldTableAction = tableActions.get(table.getSchemaTableName()); TableAndMore tableAndMore = new TableAndMore(table, Optional.of(principalPrivileges), currentPath, Optional.empty(), ignoreExisting, statistics, statistics); if (oldTableAction == null) { HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), true); - tableActions.put(schemaTableName, new Action<>(ActionType.ADD, tableAndMore, context)); + tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, context)); return; } switch (oldTableAction.getType()) { @@ -387,7 +387,7 @@ public synchronized void createTable( case ADD: case ALTER: case INSERT_EXISTING: - throw new TableAlreadyExistsException(schemaTableName); + throw new TableAlreadyExistsException(table.getSchemaTableName()); default: throw new IllegalStateException("Unknown action type"); } @@ -507,7 +507,7 @@ public synchronized void truncateUnpartitionedTable(ConnectorSession session, St Path path = new Path(table.get().getStorage().getLocation()); HdfsContext context = new HdfsContext(session, databaseName, tableName, table.get().getStorage().getLocation(), false); setExclusive((delegate, hdfsEnvironment) -> { - RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(hdfsEnvironment, context, path, ImmutableList.of(""), false); + RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(hdfsEnvironment, context, path, ImmutableSet.of(""), false); if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) { throw new PrestoException(HIVE_FILESYSTEM_ERROR, format( "Error deleting from unpartitioned table %s. These items can not be deleted: %s", @@ -565,7 +565,7 @@ private Optional> doGetPartitionNames( default: throw new UnsupportedOperationException("Unknown table source"); } - Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); + Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(table.get().getSchemaTableName(), k -> new HashMap<>()); ImmutableList.Builder resultBuilder = ImmutableList.builder(); // alter/remove newly-altered/dropped partitions from the results from underlying metastore for (String partitionName : partitionNames) { @@ -900,7 +900,6 @@ public synchronized void declareIntentionToWrite( WriteMode writeMode, Path stagingPathRoot, Optional tempPathRoot, - String filePrefix, SchemaTableName schemaTableName, boolean temporaryTable) { @@ -911,7 +910,7 @@ public synchronized void declareIntentionToWrite( throw new PrestoException(NOT_SUPPORTED, "Can not insert into a table with a partition that has been modified in the same transaction when Presto is configured to skip temporary directories."); } } - declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(writeMode, context, metastoreContext, stagingPathRoot, tempPathRoot, filePrefix, schemaTableName, temporaryTable)); + declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(writeMode, context, metastoreContext, stagingPathRoot, tempPathRoot, context.getSession().get().getQueryId(), schemaTableName, temporaryTable)); } public synchronized void commit() @@ -1042,7 +1041,7 @@ private void commitShared() // fileRenameFutures must all come back before any file system cleanups are carried out. // Otherwise, files that should be deleted may be created after cleanup is done. - committer.executeCleanupTasksForAbort(committer.extractFilePrefixes(declaredIntentionsToWrite)); + committer.executeCleanupTasksForAbort(declaredIntentionsToWrite); committer.executeRenameTasksForAbort(); @@ -1188,7 +1187,7 @@ private void prepareAddTable(MetastoreContext metastoreContext, HdfsContext cont addTableOperations.add(new CreateTableOperation(metastoreContext, table, tableAndMore.getPrincipalPrivileges(), tableAndMore.isIgnoreExisting())); if (!isPrestoView(table)) { updateStatisticsOperations.add(new UpdateStatisticsOperation(metastoreContext, - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), Optional.empty(), tableAndMore.getStatisticsUpdate(), false)); @@ -1213,7 +1212,7 @@ private void prepareInsertExistingTable(MetastoreContext metastoreContext, HdfsC asyncRename(hdfsEnvironment, renameExecutor, fileRenameCancelled, fileRenameFutures, context, currentPath, targetPath, tableAndMore.getFileNames().get()); } updateStatisticsOperations.add(new UpdateStatisticsOperation(metastoreContext, - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), Optional.empty(), tableAndMore.getStatisticsUpdate(), true)); @@ -1320,9 +1319,8 @@ private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext Path currentPath = partitionAndMore.getCurrentLocation(); Path targetPath = new Path(targetLocation); - SchemaTableName schemaTableName = new SchemaTableName(partition.getDatabaseName(), partition.getTableName()); PartitionAdder partitionAdder = partitionAdders.computeIfAbsent( - schemaTableName, + partition.getSchemaTableName(), ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE)); if (pathExists(context, hdfsEnvironment, currentPath)) { @@ -1355,16 +1353,19 @@ private void prepareInsertExistingPartition(MetastoreContext metastoreContext, H asyncRename(hdfsEnvironment, renameExecutor, fileRenameCancelled, fileRenameFutures, context, currentPath, targetPath, partitionAndMore.getFileNames()); } updateStatisticsOperations.add(new UpdateStatisticsOperation(metastoreContext, - new SchemaTableName(partition.getDatabaseName(), partition.getTableName()), + partition.getSchemaTableName(), Optional.of(getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues())), partitionAndMore.getStatisticsUpdate(), true)); } - private void executeCleanupTasksForAbort(List filePrefixes) + private void executeCleanupTasksForAbort(Collection declaredIntentionsToWrite) { + Set queryIds = declaredIntentionsToWrite.stream() + .map(DeclaredIntentionToWrite::getQueryId) + .collect(toImmutableSet()); for (DirectoryCleanUpTask cleanUpTask : cleanUpTasksForAbort) { - recursiveDeleteFilesAndLog(cleanUpTask.getContext(), cleanUpTask.getPath(), filePrefixes, cleanUpTask.isDeleteEmptyDirectory(), "temporary directory commit abort"); + recursiveDeleteFilesAndLog(cleanUpTask.getContext(), cleanUpTask.getPath(), queryIds, cleanUpTask.isDeleteEmptyDirectory(), "temporary directory commit abort"); } } @@ -1400,7 +1401,7 @@ private void deleteEmptyStagingDirectories(List declar continue; } Path path = declaredIntentionToWrite.getStagingPathRoot(); - recursiveDeleteFilesAndLog(declaredIntentionToWrite.getContext(), path, ImmutableList.of(), true, "staging directory cleanup"); + recursiveDeleteFilesAndLog(declaredIntentionToWrite.getContext(), path, ImmutableSet.of(), true, "staging directory cleanup"); } } @@ -1550,15 +1551,6 @@ private void executeMetastoreDeleteOperations() throw prestoException; } } - - private List extractFilePrefixes(List declaredIntentionsToWrite) - { - Set filePrefixSet = new HashSet<>(); - for (DeclaredIntentionToWrite declaredIntentionToWrite : declaredIntentionsToWrite) { - filePrefixSet.add(declaredIntentionToWrite.getFilePrefix()); - } - return ImmutableList.copyOf(filePrefixSet); - } } @GuardedBy("this") @@ -1583,13 +1575,12 @@ private void rollbackShared() // In the case of DIRECT_TO_TARGET_NEW_DIRECTORY, if the directory is not guaranteed to be unique // for the query, it is possible that another query or compute engine may see the directory, wrote // data to it, and exported it through metastore. Therefore it may be argued that cleanup of staging - // directories must be carried out conservatively. To be safe, we only delete files that start with - // the unique prefix for queries in this transaction. - + // directories must be carried out conservatively. To be safe, we only delete files that start or + // end with the query IDs in this transaction. recursiveDeleteFilesAndLog( declaredIntentionToWrite.getContext(), rootPath, - ImmutableList.of(declaredIntentionToWrite.getFilePrefix()), + ImmutableSet.of(declaredIntentionToWrite.getQueryId()), true, format("staging/target_new directory rollback for table %s", declaredIntentionToWrite.getSchemaTableName())); break; @@ -1630,14 +1621,14 @@ private void rollbackShared() schemaTableName.getTableName()); } - // delete any file that starts with the unique prefix of this query + // delete any file that starts or ends with the query ID for (Path path : pathsToClean) { // TODO: It is a known deficiency that some empty directory does not get cleaned up in S3. // We can not delete any of the directories here since we do not know who created them. recursiveDeleteFilesAndLog( declaredIntentionToWrite.getContext(), path, - ImmutableList.of(declaredIntentionToWrite.getFilePrefix()), + ImmutableSet.of(declaredIntentionToWrite.getQueryId()), false, format("target_existing directory rollback for table %s", schemaTableName)); } @@ -1782,13 +1773,13 @@ private static void asyncRename( } } - private void recursiveDeleteFilesAndLog(HdfsContext context, Path directory, List filePrefixes, boolean deleteEmptyDirectories, String reason) + private void recursiveDeleteFilesAndLog(HdfsContext context, Path directory, Set queryIds, boolean deleteEmptyDirectories, String reason) { RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles( hdfsEnvironment, context, directory, - filePrefixes, + queryIds, deleteEmptyDirectories); if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) { logCleanupFailure( @@ -1808,19 +1799,19 @@ else if (deleteEmptyDirectories && !recursiveDeleteResult.isDirectoryNoLongerExi /** * Attempt to recursively remove eligible files and/or directories in {@code directory}. *

- * When {@code filePrefixes} is not present, all files (but not necessarily directories) will be - * ineligible. If all files shall be deleted, you can use an empty string as {@code filePrefixes}. + * When {@code queryIds} is not present, all files (but not necessarily directories) will be + * ineligible. If all files shall be deleted, you can use an empty string as {@code queryIds}. *

* When {@code deleteEmptySubDirectory} is true, any empty directory (including directories that - * were originally empty, and directories that become empty after files prefixed with - * {@code filePrefixes} are deleted) will be eligible. + * were originally empty, and directories that become empty after files prefixed or suffixed with + * {@code queryIds} are deleted) will be eligible. *

* This method will not delete anything that's neither a directory nor a file. * - * @param filePrefixes prefix of files that should be deleted + * @param queryIds prefix or suffix of files that should be deleted * @param deleteEmptyDirectories whether empty directories should be deleted */ - private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEnvironment, HdfsContext context, Path directory, List filePrefixes, boolean deleteEmptyDirectories) + private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEnvironment, HdfsContext context, Path directory, Set queryIds, boolean deleteEmptyDirectories) { FileSystem fileSystem; try { @@ -1836,10 +1827,10 @@ private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEn return new RecursiveDeleteResult(false, notDeletedItems.build()); } - return doRecursiveDeleteFiles(fileSystem, directory, filePrefixes, deleteEmptyDirectories); + return doRecursiveDeleteFiles(fileSystem, directory, queryIds, deleteEmptyDirectories); } - private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSystem, Path directory, List filePrefixes, boolean deleteEmptyDirectories) + private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSystem, Path directory, Set queryIds, boolean deleteEmptyDirectories) { // don't delete hidden presto directories if (directory.getName().startsWith(".presto")) { @@ -1866,8 +1857,8 @@ private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSyste // never delete presto dot files if (!fileName.startsWith(".presto")) { // file name that starts with ".tmp.presto" is staging file, see HiveWriterFactory#createWriter. - eligible = filePrefixes.stream().anyMatch(prefix -> - fileName.startsWith(prefix) || fileName.startsWith(".tmp.presto." + prefix)); + eligible = queryIds.stream().anyMatch(id -> + fileName.startsWith(id) || fileName.startsWith(".tmp.presto." + id) || fileName.endsWith(id)); } if (eligible) { if (!deleteIfExists(fileSystem, filePath, false)) { @@ -1880,7 +1871,7 @@ private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSyste } } else if (fileStatus.isDirectory()) { - RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, fileStatus.getPath(), filePrefixes, deleteEmptyDirectories); + RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, fileStatus.getPath(), queryIds, deleteEmptyDirectories); if (!subResult.isDirectoryNoLongerExists()) { allDescendentsDeleted = false; } @@ -2240,7 +2231,7 @@ private static class DeclaredIntentionToWrite { private final WriteMode mode; private final HdfsContext context; - private final String filePrefix; + private final String queryId; private final Path stagingPathRoot; private final Optional tempPathRoot; private final SchemaTableName schemaTableName; @@ -2253,7 +2244,7 @@ public DeclaredIntentionToWrite( MetastoreContext metastoreContext, Path stagingPathRoot, Optional tempPathRoot, - String filePrefix, + String queryId, SchemaTableName schemaTableName, boolean temporaryTable) { @@ -2262,7 +2253,7 @@ public DeclaredIntentionToWrite( this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null"); this.stagingPathRoot = requireNonNull(stagingPathRoot, "stagingPathRoot is null"); this.tempPathRoot = requireNonNull(tempPathRoot, "tempPathRoot is null"); - this.filePrefix = requireNonNull(filePrefix, "filePrefix is null"); + this.queryId = requireNonNull(queryId, "queryId is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.temporaryTable = temporaryTable; } @@ -2277,9 +2268,9 @@ public HdfsContext getContext() return context; } - public String getFilePrefix() + public String getQueryId() { - return filePrefix; + return queryId; } public Path getStagingPathRoot() @@ -2314,7 +2305,7 @@ public String toString() .add("mode", mode) .add("context", context) .add("metastoreContext", metastoreContext) - .add("filePrefix", filePrefix) + .add("queryId", queryId) .add("stagingPathRoot", stagingPathRoot) .add("tempPathRoot", tempPathRoot) .add("schemaTableName", schemaTableName) diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Table.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Table.java index 2e089b433d342..654e2fd4b475a 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Table.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/Table.java @@ -13,7 +13,9 @@ */ package com.facebook.presto.hive.metastore; +import com.facebook.presto.spi.SchemaTableName; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -83,6 +85,12 @@ public String getTableName() return tableName; } + @JsonIgnore + public SchemaTableName getSchemaTableName() + { + return new SchemaTableName(databaseName, tableName); + } + @JsonProperty public String getOwner() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java index 104a04283671d..6244030825a6a 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java @@ -84,15 +84,13 @@ public Iterator list( PathFilter pathFilter, HiveDirectoryContext hiveDirectoryContext) { - SchemaTableName schemaTableName = new SchemaTableName(table.getDatabaseName(), table.getTableName()); - List files = cache.getIfPresent(path); if (files != null) { return files.iterator(); } Iterator iterator = delegate.list(fileSystem, table, path, namenodeStats, pathFilter, hiveDirectoryContext); - if (hiveDirectoryContext.isCacheable() && cachedTableChecker.isCachedTable(schemaTableName)) { + if (hiveDirectoryContext.isCacheable() && cachedTableChecker.isCachedTable(table.getSchemaTableName())) { return cachingIterator(iterator, path); } return iterator; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index ad7b4a63df2f4..68e9787d8f540 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -89,6 +89,7 @@ public class HiveClientConfig private HiveCompressionCodec orcCompressionCodec = HiveCompressionCodec.GZIP; private boolean respectTableFormat = true; private boolean immutablePartitions; + private boolean createEmptyBucketFiles; private boolean insertOverwriteImmutablePartitions; private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true; private int maxPartitionsPerWriter = 100; @@ -584,6 +585,19 @@ public HiveClientConfig setImmutablePartitions(boolean immutablePartitions) return this; } + public boolean isCreateEmptyBucketFiles() + { + return createEmptyBucketFiles; + } + + @Config("hive.create-empty-bucket-files") + @ConfigDescription("Create empty files for buckets that have no data") + public HiveClientConfig setCreateEmptyBucketFiles(boolean createEmptyBucketFiles) + { + this.createEmptyBucketFiles = createEmptyBucketFiles; + return this; + } + public boolean isFailFastOnInsertIntoImmutablePartitionsEnabled() { return failFastOnInsertIntoImmutablePartitionsEnabled; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveInsertTableHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveInsertTableHandle.java index deed117a7e51e..c38642158ab50 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveInsertTableHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveInsertTableHandle.java @@ -31,7 +31,6 @@ public HiveInsertTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("inputColumns") List inputColumns, - @JsonProperty("filePrefix") String filePrefix, @JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata, @JsonProperty("locationHandle") LocationHandle locationHandle, @JsonProperty("bucketProperty") Optional bucketProperty, @@ -46,7 +45,6 @@ public HiveInsertTableHandle( schemaName, tableName, inputColumns, - filePrefix, pageSinkMetadata, locationHandle, bucketProperty, diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index ae5567f7d8f61..5f93e252a1451 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -70,7 +70,6 @@ import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTablePrefix; -import com.facebook.presto.spi.StandardErrorCode; import com.facebook.presto.spi.SystemTable; import com.facebook.presto.spi.TableLayoutFilterCoverage; import com.facebook.presto.spi.TableNotFoundException; @@ -101,7 +100,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.base.Suppliers; -import com.google.common.base.Verify; import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; @@ -144,7 +142,6 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -210,6 +207,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.getVirtualBucketCount; import static com.facebook.presto.hive.HiveSessionProperties.isBucketExecutionEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite; +import static com.facebook.presto.hive.HiveSessionProperties.isCreateEmptyBucketFiles; import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount; @@ -269,12 +267,12 @@ import static com.facebook.presto.hive.HiveUtil.encodeViewData; import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles; import static com.facebook.presto.hive.HiveUtil.hiveColumnHandles; -import static com.facebook.presto.hive.HiveUtil.schemaTableName; import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypeForTemporaryTable; import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypesForTemporaryTable; import static com.facebook.presto.hive.HiveUtil.verifyPartitionTypeSupported; import static com.facebook.presto.hive.HiveWriteUtils.checkTableIsWritable; import static com.facebook.presto.hive.HiveWriteUtils.isWritableType; +import static com.facebook.presto.hive.HiveWriterFactory.computeBucketedFileName; import static com.facebook.presto.hive.HiveWriterFactory.getFileExtension; import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.APPEND; import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.NEW; @@ -479,7 +477,7 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName if (getSourceTableNameFromSystemTable(tableName).isPresent()) { // We must not allow system table due to how permissions are checked in SystemTableAwareAccessControl.checkCanSelectFromTable() - throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, format("Unexpected table %s present in Hive metastore", tableName)); + throw new PrestoException(NOT_SUPPORTED, "Unexpected table present in Hive metastore: " + tableName); } if (!isOfflineDataDebugModeEnabled(session)) { @@ -548,7 +546,9 @@ private Optional getPartitionsSystemTable(ConnectorSession session, } MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); - List partitionColumns = getPartitionColumns(session.getIdentity(), metastoreContext, sourceTableName); + Table sourceTable = metastore.getTable(metastoreContext, sourceTableName.getSchemaName(), sourceTableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(sourceTableName)); + List partitionColumns = getPartitionKeyColumnHandles(sourceTable); if (partitionColumns.isEmpty()) { return Optional.empty(); } @@ -590,18 +590,10 @@ private Optional getPartitionsSystemTable(ConnectorSession session, })); } - private List getPartitionColumns(ConnectorIdentity identity, MetastoreContext metastoreContext, SchemaTableName tableName) - { - Table sourceTable = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()).get(); - return getPartitionKeyColumnHandles(sourceTable); - } - @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle) { - requireNonNull(tableHandle, "tableHandle is null"); - SchemaTableName tableName = schemaTableName(tableHandle); - return getTableMetadata(session, tableName); + return getTableMetadata(session, ((HiveTableHandle) tableHandle).getSchemaTableName()); } private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName) @@ -646,11 +638,11 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema // Bucket properties Optional bucketProperty = table.get().getStorage().getBucketProperty(); - if (bucketProperty.isPresent()) { - properties.put(BUCKET_COUNT_PROPERTY, bucketProperty.get().getBucketCount()); - properties.put(BUCKETED_BY_PROPERTY, bucketProperty.get().getBucketedBy()); - properties.put(SORTED_BY_PROPERTY, bucketProperty.get().getSortedBy()); - } + table.get().getStorage().getBucketProperty().ifPresent(property -> { + properties.put(BUCKET_COUNT_PROPERTY, property.getBucketCount()); + properties.put(BUCKETED_BY_PROPERTY, property.getBucketedBy()); + properties.put(SORTED_BY_PROPERTY, property.getSortedBy()); + }); // Preferred ordering columns List preferredOrderingColumns = decodePreferredOrderingColumnsFromStorage(table.get().getStorage()); @@ -733,7 +725,7 @@ public Optional getInfo(ConnectorTableLayoutHandle layoutHandle) return Optional.of(new HiveInputInfo( tableLayoutHandle.getPartitions().get().stream() .map(HivePartition::getPartitionId) - .collect(Collectors.toList()), + .collect(toList()), false)); } @@ -764,17 +756,12 @@ private List listSchemas(ConnectorSession session, String schemaNameOrNu @Override public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { - SchemaTableName tableName = schemaTableName(tableHandle); + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); - Optional table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()); - if (!table.isPresent()) { - throw new TableNotFoundException(tableName); - } - ImmutableMap.Builder columnHandles = ImmutableMap.builder(); - for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) { - columnHandles.put(columnHandle.getName(), columnHandle); - } - return columnHandles.build(); + Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); + return hiveColumnHandles(table).stream() + .collect(toImmutableMap(HiveColumnHandle::getName, identity())); } @SuppressWarnings("TryWithIdenticalCatches") @@ -1400,16 +1387,12 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl private void failIfAvroSchemaIsSet(ConnectorSession session, HiveTableHandle handle) { - String tableName = handle.getTableName(); - String schemaName = handle.getSchemaName(); MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); - Optional
table = metastore.getTable(metastoreContext, schemaName, tableName); - if (!table.isPresent()) { - throw new TableNotFoundException(new SchemaTableName(schemaName, tableName)); - } + Table table = metastore.getTable(metastoreContext, handle.getSchemaName(), handle.getTableName()) + .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); - if (table.get().getParameters().get(AVRO_SCHEMA_URL_KEY) != null) { + if (table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null) { throw new PrestoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema url is set"); } } @@ -1426,12 +1409,11 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { HiveTableHandle handle = (HiveTableHandle) tableHandle; - SchemaTableName tableName = schemaTableName(tableHandle); MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); Optional
target = metastore.getTable(metastoreContext, handle.getSchemaName(), handle.getTableName()); if (!target.isPresent()) { - throw new TableNotFoundException(tableName); + throw new TableNotFoundException(handle.getSchemaTableName()); } metastore.dropTable( new HdfsContext(session, handle.getSchemaName(), handle.getTableName(), target.get().getStorage().getLocation(), false), @@ -1443,13 +1425,12 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle) { verifyJvmTimeZone(); - HiveTableHandle handle = (HiveTableHandle) tableHandle; - SchemaTableName tableName = handle.getSchemaTableName(); + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(tableName)); - return handle; + return tableHandle; } @Override @@ -1574,7 +1555,6 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto schemaName, tableName, columnHandles, - session.getQueryId(), metastore.generatePageSinkMetadata(metastoreContext, schemaTableName), locationHandle, tableStorageFormat, @@ -1595,7 +1575,6 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto writeInfo.getWriteMode(), writeInfo.getWritePath(), writeInfo.getTempPath(), - result.getFilePrefix(), schemaTableName, false); @@ -1645,14 +1624,14 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); - if (handle.getBucketProperty().isPresent()) { - ImmutableList partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets( + if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets( session, handle, table, partitionUpdates); // replace partitionUpdates before creating the zero-row files so that those files will be cleaned up if we end up rollback - partitionUpdates = PartitionUpdate.mergePartitionUpdates(Iterables.concat(partitionUpdates, partitionUpdatesForMissingBuckets)); + partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), true); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : @@ -1691,7 +1670,7 @@ public Optional finishCreateTable(ConnectorSession sess } if (isRespectTableFormat(session)) { - Verify.verify(handle.getPartitionStorageFormat() == handle.getTableStorageFormat()); + verify(handle.getPartitionStorageFormat() == handle.getTableStorageFormat()); } for (PartitionUpdate update : partitionUpdates) { Map partitionParameters = partitionEncryptionParameters; @@ -1719,7 +1698,7 @@ public Optional finishCreateTable(ConnectorSession sess return Optional.of(new HiveWrittenPartitions( partitionUpdates.stream() .map(PartitionUpdate::getName) - .collect(Collectors.toList()))); + .collect(toList()))); } public static boolean shouldCreateFilesForMissingBuckets(Table table, ConnectorSession session) @@ -1737,7 +1716,7 @@ private StorageFormat getStorageFormat(Optional partition, Table tabl return partition.isPresent() ? partition.get().getStorage().getStorageFormat() : table.getStorage().getStorageFormat(); } - private ImmutableList computePartitionUpdatesForMissingBuckets( + private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, Table table, @@ -1757,7 +1736,6 @@ private ImmutableList computePartitionUpdatesForMissingBuckets( session, storageFormat, handle.getCompressionCodec(), - handle.getFilePrefix(), bucketCount, ImmutableSet.of()); return ImmutableList.of(new PartitionUpdate( @@ -1782,7 +1760,6 @@ private ImmutableList computePartitionUpdatesForMissingBuckets( session, storageFormat, handle.getCompressionCodec(), - handle.getFilePrefix(), bucketCount, ImmutableSet.copyOf(getTargetFileNames(partitionUpdate.getFileWriteInfos()))); partitionUpdatesForMissingBucketsBuilder.add(new PartitionUpdate( @@ -1805,7 +1782,6 @@ private List computeFileNamesForMissingBuckets( ConnectorSession session, HiveStorageFormat storageFormat, HiveCompressionCodec compressionCodec, - String filePrefix, int bucketCount, Set existingFileNames) { @@ -1816,7 +1792,7 @@ private List computeFileNamesForMissingBuckets( String fileExtension = getFileExtension(fromHiveStorageFormat(storageFormat), compressionCodec); ImmutableList.Builder missingFileNamesBuilder = ImmutableList.builder(); for (int i = 0; i < bucketCount; i++) { - String targetFileName = isFileRenamingEnabled(session) ? String.valueOf(i) : HiveWriterFactory.computeBucketedFileName(filePrefix, i) + fileExtension; + String targetFileName = isFileRenamingEnabled(session) ? String.valueOf(i) : computeBucketedFileName(session.getQueryId(), i) + fileExtension; if (!existingFileNames.contains(targetFileName)) { missingFileNamesBuilder.add(targetFileName); } @@ -1838,44 +1814,42 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); - SchemaTableName tableName = schemaTableName(tableHandle); - Optional
table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()); - if (!table.isPresent()) { - throw new TableNotFoundException(tableName); - } + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); + Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); - checkTableIsWritable(table.get(), writesToNonManagedTablesEnabled); + checkTableIsWritable(table, writesToNonManagedTablesEnabled); - for (Column column : table.get().getDataColumns()) { + for (Column column : table.getDataColumns()) { if (!isWritableType(column.getType())) { throw new PrestoException( NOT_SUPPORTED, - format("Inserting into Hive table %s.%s with column type %s not supported", table.get().getDatabaseName(), table.get().getTableName(), column.getType())); + format("Inserting into Hive table %s with column type %s not supported", tableName, column.getType())); } } - List handles = hiveColumnHandles(table.get()).stream() + List handles = hiveColumnHandles(table).stream() .filter(columnHandle -> !columnHandle.isHidden()) .collect(toList()); - HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table.get()); + HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table); LocationHandle locationHandle; - boolean isTemporaryTable = table.get().getTableType().equals(TEMPORARY_TABLE); + boolean isTemporaryTable = table.getTableType().equals(TEMPORARY_TABLE); boolean tempPathRequired = isTempPathRequired( session, - table.map(Table::getStorage).flatMap(Storage::getBucketProperty), - decodePreferredOrderingColumnsFromStorage(table.get().getStorage())); + table.getStorage().getBucketProperty(), + decodePreferredOrderingColumnsFromStorage(table.getStorage())); if (isTemporaryTable) { - locationHandle = locationService.forTemporaryTable(metastore, session, table.get(), tempPathRequired); + locationHandle = locationService.forTemporaryTable(metastore, session, table, tempPathRequired); } else { - locationHandle = locationService.forExistingTable(metastore, session, table.get(), tempPathRequired); + locationHandle = locationService.forExistingTable(metastore, session, table, tempPathRequired); } - Optional tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties(table.get().getParameters(), tableStorageFormat); + Optional tableEncryptionProperties = getTableEncryptionPropertiesFromHiveProperties(table.getParameters(), tableStorageFormat); - HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : HiveSessionProperties.getHiveStorageFormat(session); - HiveStorageFormat actualStorageFormat = table.get().getPartitionColumns().isEmpty() ? tableStorageFormat : partitionStorageFormat; + HiveStorageFormat partitionStorageFormat = isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session); + HiveStorageFormat actualStorageFormat = table.getPartitionColumns().isEmpty() ? tableStorageFormat : partitionStorageFormat; if (tableEncryptionProperties.isPresent() && actualStorageFormat != tableStorageFormat) { throw new PrestoException( @@ -1892,11 +1866,10 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn tableName.getSchemaName(), tableName.getTableName(), handles, - session.getQueryId(), metastore.generatePageSinkMetadata(metastoreContext, tableName), locationHandle, - table.get().getStorage().getBucketProperty(), - decodePreferredOrderingColumnsFromStorage(table.get().getStorage()), + table.getStorage().getBucketProperty(), + decodePreferredOrderingColumnsFromStorage(table.getStorage()), tableStorageFormat, partitionStorageFormat, actualStorageFormat, @@ -1905,12 +1878,11 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); metastore.declareIntentionToWrite( - new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), table.get().getStorage().getLocation(), false), + new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), table.getStorage().getLocation(), false), metastoreContext, writeInfo.getWriteMode(), writeInfo.getWritePath(), writeInfo.getTempPath(), - result.getFilePrefix(), tableName, isTemporaryTable); return result; @@ -1944,28 +1916,26 @@ private Optional finishInsertInternal(ConnectorSession MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource()); - Optional
table = metastore.getTable(metastoreContext, handle.getSchemaName(), handle.getTableName()); - if (!table.isPresent()) { - throw new TableNotFoundException(new SchemaTableName(handle.getSchemaName(), handle.getTableName())); - } - if (!table.get().getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) && isRespectTableFormat(session)) { + Table table = metastore.getTable(metastoreContext, handle.getSchemaName(), handle.getTableName()) + .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); + if (!table.getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) && isRespectTableFormat(session)) { throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Table format changed during insert"); } - if (handle.getBucketProperty().isPresent()) { - ImmutableList partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets( + if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets( session, handle, - table.get(), + table, partitionUpdates); // replace partitionUpdates before creating the zero-row files so that those files will be cleaned up if we end up rollback - partitionUpdates = PartitionUpdate.mergePartitionUpdates(Iterables.concat(partitionUpdates, partitionUpdatesForMissingBuckets)); - HdfsContext hdfsContext = new HdfsContext(session, table.get().getDatabaseName(), table.get().getTableName(), table.get().getStorage().getLocation(), false); + partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); + HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { - Optional partition = table.get().getPartitionColumns().isEmpty() ? Optional.empty() : + Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(partitionObjectBuilder.buildPartitionObject( session, - table.get(), + table, partitionUpdate, prestoVersion, handle.getEncryptionInformation() @@ -1976,13 +1946,13 @@ private Optional finishInsertInternal(ConnectorSession hdfsContext, partitionUpdate.getWritePath(), getTargetFileNames(partitionUpdate.getFileWriteInfos()), - getStorageFormat(partition, table.get()), + getStorageFormat(partition, table), handle.getCompressionCodec(), - getSchema(partition, table.get())); + getSchema(partition, table)); } } - List partitionedBy = table.get().getPartitionColumns().stream() + List partitionedBy = table.getPartitionColumns().stream() .map(Column::getName) .collect(toImmutableList()); Map columnTypes = handle.getInputColumns().stream() @@ -2048,7 +2018,7 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode // insert into new partition or overwrite existing partition Partition partition = partitionObjectBuilder.buildPartitionObject( session, - table.get(), + table, partitionUpdate, prestoVersion, extraPartitionMetadata); @@ -2072,7 +2042,7 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode session, handle.getSchemaName(), handle.getTableName(), - table.get().getStorage().getLocation(), + table.getStorage().getLocation(), false, partition, partitionUpdate.getWritePath(), @@ -2087,7 +2057,7 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode partitionUpdates.stream() .map(PartitionUpdate::getName) .map(name -> name.isEmpty() ? UNPARTITIONED_ID : name) - .collect(Collectors.toList()))); + .collect(toList()))); } private static boolean isTempPathRequired(ConnectorSession session, Optional bucketProperty, List preferredOrderingColumns) @@ -2134,9 +2104,9 @@ private PartitionStatistics createPartitionStatistics( return new PartitionStatistics(basicStatistics, columnStatistics); } - private Map getColumnStatistics(Map, ComputedStatistics> partitionComputedStatistics, List partitionValues) + private static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) { - return Optional.ofNullable(partitionComputedStatistics.get(partitionValues)) + return Optional.ofNullable(statistics.get(partitionValues)) .map(ComputedStatistics::getColumnStatistics) .orElse(ImmutableMap.of()); } @@ -2898,9 +2868,7 @@ static TupleDomain createPredicate(List partitionCol return withColumnDomains( partitionColumns.stream() - .collect(Collectors.toMap( - identity(), - column -> buildColumnDomain(column, partitions)))); + .collect(toMap(identity(), column -> buildColumnDomain(column, partitions)))); } private static Domain buildColumnDomain(ColumnHandle column, List partitions) @@ -2984,7 +2952,7 @@ public Optional getInsertLayout(ConnectorSession sessio List partitionColumns = hiveBucketHandle.get().getColumns().stream() .map(HiveColumnHandle::getName) - .collect(Collectors.toList()); + .collect(toList()); return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns)); } @@ -3013,11 +2981,11 @@ public Optional getPreferredShuffleLayoutForInsert(Conn SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, table.getPartitionColumns().stream() .map(Column::getType) - .collect(Collectors.toList()), + .collect(toList()), OptionalInt.empty()); List partitionedBy = table.getPartitionColumns().stream() .map(Column::getName) - .collect(Collectors.toList()); + .collect(toList()); return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); } @@ -3082,7 +3050,7 @@ public Optional getPreferredShuffleLayoutForNewTable(Co SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, partitionColumns.stream() .map(Column::getType) - .collect(Collectors.toList()), + .collect(toList()), OptionalInt.empty()); return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); @@ -3138,6 +3106,7 @@ private List getColumnStatisticMetadata(String columnNa .collect(toImmutableList()); } + @Override public void createRole(ConnectorSession session, String role, Optional grantor) { // roles are case insensitive in Hive diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java index ffb0b3b728b15..8e90847823204 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveOutputTableHandle.java @@ -40,7 +40,6 @@ public HiveOutputTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("inputColumns") List inputColumns, - @JsonProperty("filePrefix") String filePrefix, @JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata, @JsonProperty("locationHandle") LocationHandle locationHandle, @JsonProperty("tableStorageFormat") HiveStorageFormat tableStorageFormat, @@ -58,7 +57,6 @@ public HiveOutputTableHandle( schemaName, tableName, inputColumns, - filePrefix, pageSinkMetadata, locationHandle, bucketProperty, diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java index 58d137cf2478e..ff7c7dc7759cf 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java @@ -166,7 +166,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean sortedBy, handle.getLocationHandle(), locationService, - handle.getFilePrefix(), + session.getQueryId(), // The scope of metastore cache is within a single HivePageSink object // TODO: Extend metastore cache scope to the entire transaction new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), memoizeMetastore(metastore, metastoreImpersonationEnabled, perTransactionMetastoreCacheMaximumSize), new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource())), diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index cd5aa5238952f..99a07e3749a95 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -70,6 +70,7 @@ public final class HiveSessionProperties private static final String COMPRESSION_CODEC = "compression_codec"; private static final String ORC_COMPRESSION_CODEC = "orc_compression_codec"; public static final String RESPECT_TABLE_FORMAT = "respect_table_format"; + private static final String CREATE_EMPTY_BUCKET_FILES = "create_empty_bucket_files"; private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names"; private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; @@ -301,6 +302,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "Write new partitions using table format rather than default storage format", hiveClientConfig.isRespectTableFormat(), false), + booleanProperty( + CREATE_EMPTY_BUCKET_FILES, + "Create empty files for buckets that have no data", + hiveClientConfig.isCreateEmptyBucketFiles(), + false), booleanProperty( PARQUET_USE_COLUMN_NAME, "Experimental: Parquet: Access Parquet columns using names from the file", @@ -712,6 +718,11 @@ public static boolean isRespectTableFormat(ConnectorSession session) return session.getProperty(RESPECT_TABLE_FORMAT, Boolean.class); } + public static boolean isCreateEmptyBucketFiles(ConnectorSession session) + { + return session.getProperty(CREATE_EMPTY_BUCKET_FILES, Boolean.class); + } + public static boolean isUseParquetColumnNames(ConnectorSession session) { return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java index 3d405c69d7f36..42af71a55f6a3 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java @@ -36,10 +36,8 @@ import com.facebook.presto.hive.util.HudiRealtimeSplitConverter; import com.facebook.presto.orc.metadata.OrcType; import com.facebook.presto.spi.ColumnMetadata; -import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.RecordCursor; -import com.facebook.presto.spi.SchemaTableName; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; import com.google.common.base.Joiner; @@ -867,11 +865,6 @@ public static Slice charPartitionKey(String value, String name, Type columnType) return partitionKey; } - public static SchemaTableName schemaTableName(ConnectorTableHandle tableHandle) - { - return ((HiveTableHandle) tableHandle).getSchemaTableName(); - } - public static List hiveColumnHandles(Table table) { ImmutableList.Builder columns = ImmutableList.builder(); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWritableTableHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWritableTableHandle.java index 65ac09c57c73a..8e10eaa5a9176 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWritableTableHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWritableTableHandle.java @@ -16,6 +16,8 @@ import com.facebook.presto.hive.metastore.HivePageSinkMetadata; import com.facebook.presto.hive.metastore.SortingColumn; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; @@ -30,9 +32,7 @@ public class HiveWritableTableHandle private final String schemaName; private final String tableName; private final List inputColumns; - private final String filePrefix; - - private HivePageSinkMetadata pageSinkMetadata; + private final HivePageSinkMetadata pageSinkMetadata; private final LocationHandle locationHandle; private final Optional bucketProperty; private final List preferredOrderingColumns; @@ -46,7 +46,6 @@ public HiveWritableTableHandle( String schemaName, String tableName, List inputColumns, - String filePrefix, HivePageSinkMetadata pageSinkMetadata, LocationHandle locationHandle, Optional bucketProperty, @@ -60,7 +59,6 @@ public HiveWritableTableHandle( this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null")); - this.filePrefix = requireNonNull(filePrefix, "filePrefix is null"); this.pageSinkMetadata = requireNonNull(pageSinkMetadata, "pageSinkMetadata is null"); this.locationHandle = requireNonNull(locationHandle, "locationHandle is null"); this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null"); @@ -94,10 +92,10 @@ public List getInputColumns() return inputColumns; } - @JsonProperty - public String getFilePrefix() + @JsonIgnore + public SchemaTableName getSchemaTableName() { - return filePrefix; + return new SchemaTableName(schemaName, tableName); } @JsonProperty diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java index 671a61a16d8dc..9a4eff08fa56d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java @@ -308,7 +308,7 @@ public static void checkTableIsWritable(Table table, boolean writesToNonManagedT } checkWritable( - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), Optional.empty(), getProtectMode(table), table.getParameters(), @@ -318,7 +318,7 @@ public static void checkTableIsWritable(Table table, boolean writesToNonManagedT public static void checkPartitionIsWritable(String partitionName, Partition partition) { checkWritable( - new SchemaTableName(partition.getDatabaseName(), partition.getTableName()), + partition.getSchemaTableName(), Optional.of(partitionName), getProtectMode(partition), partition.getParameters(), diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java index 3178d26aa656f..215b83cb6bb3b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java @@ -63,7 +63,6 @@ import static com.facebook.presto.hive.HiveCompressionCodec.NONE; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY; import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; @@ -106,7 +105,11 @@ public class HiveWriterFactory { private static final int MAX_BUCKET_COUNT = 100_000; private static final int BUCKET_NUMBER_PADDING = Integer.toString(MAX_BUCKET_COUNT - 1).length(); - private static final Pattern BUCKET_FILE_NAME_PATTERN = Pattern.compile(".*_bucket-(\\d+)(\\..*)?"); + private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` + Pattern.compile("(0\\d+)_\\d+.*"), + // legacy Presto naming pattern (current version matches Hive) + Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?")); private final Set fileWriterFactories; private final String schemaName; @@ -124,7 +127,7 @@ public class HiveWriterFactory private final Map additionalTableParameters; private final LocationHandle locationHandle; private final LocationService locationService; - private final String filePrefix; + private final String queryId; private final HivePageSinkMetadataProvider pageSinkMetadataProvider; private final TypeManager typeManager; @@ -163,7 +166,7 @@ public HiveWriterFactory( List sortedBy, LocationHandle locationHandle, LocationService locationService, - String filePrefix, + String queryId, HivePageSinkMetadataProvider pageSinkMetadataProvider, TypeManager typeManager, HdfsEnvironment hdfsEnvironment, @@ -191,7 +194,7 @@ public HiveWriterFactory( this.additionalTableParameters = ImmutableMap.copyOf(requireNonNull(additionalTableParameters, "additionalTableParameters is null")); this.locationHandle = requireNonNull(locationHandle, "locationHandle is null"); this.locationService = requireNonNull(locationService, "locationService is null"); - this.filePrefix = requireNonNull(filePrefix, "filePrefix is null"); + this.queryId = requireNonNull(queryId, "queryId is null"); this.pageSinkMetadataProvider = requireNonNull(pageSinkMetadataProvider, "pageSinkMetadataProvider is null"); @@ -347,15 +350,15 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt String targetFileName; if (bucketNumber.isPresent()) { // Use the bucket number for file name when fileRenaming is enabled - targetFileName = isFileRenamingEnabled(session) ? String.valueOf(bucketNumber.getAsInt()) : computeBucketedFileName(filePrefix, bucketNumber.getAsInt()) + extension; + targetFileName = isFileRenamingEnabled(session) ? String.valueOf(bucketNumber.getAsInt()) : computeBucketedFileName(queryId, bucketNumber.getAsInt()) + extension; } else { - targetFileName = filePrefix + "_" + randomUUID() + extension; + targetFileName = queryId + "_" + randomUUID() + extension; } String writeFileName; if (writeToTempFile) { - writeFileName = ".tmp.presto." + filePrefix + "_" + randomUUID() + extension; + writeFileName = ".tmp.presto." + queryId + "_" + randomUUID() + extension; } else { writeFileName = targetFileName; @@ -661,24 +664,26 @@ public LocationHandle getLocationHandle() return locationHandle; } - public static String computeBucketedFileName(String filePrefix, int bucket) + public static String computeBucketedFileName(String queryId, int bucket) { - return filePrefix + "_bucket-" + Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0'); + String paddedBucket = Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0'); + return format("0%s_0_%s", paddedBucket, queryId); } - public static int getBucketNumber(String fileName) + public static OptionalInt getBucketNumber(String fileName) { - Matcher matcher = BUCKET_FILE_NAME_PATTERN.matcher(fileName); - if (matcher.matches()) { - return parseInt(matcher.group(1)); + for (Pattern pattern : BUCKET_PATTERNS) { + Matcher matcher = pattern.matcher(fileName); + if (matcher.matches()) { + return OptionalInt.of(parseInt(matcher.group(1))); + } } // Numerical file name when "file_renaming_enabled" is true - else if (fileName.matches("\\d+")) { - return parseInt(fileName); - } - else { - throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("invalid hive bucket file name: %s", fileName)); + if (fileName.matches("\\d+")) { + return OptionalInt.of(parseInt(fileName)); } + + return OptionalInt.empty(); } public static String getFileExtension(StorageFormat storageFormat, HiveCompressionCodec compressionCodec) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index a4f3b9287d522..3d13a94f09ef8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -24,8 +24,10 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; import com.google.common.base.Suppliers; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.ListMultimap; import com.google.common.io.CharStreams; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.conf.Configuration; @@ -52,11 +54,10 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Deque; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.function.IntPredicate; import java.util.function.Supplier; @@ -341,6 +342,7 @@ private List getBucketedSplits( int readBucketCount = bucketSplitInfo.getReadBucketCount(); int tableBucketCount = bucketSplitInfo.getTableBucketCount(); int partitionBucketCount = bucketConversion.map(HiveSplit.BucketConversion::getPartitionBucketCount).orElse(tableBucketCount); + int bucketCount = max(readBucketCount, partitionBucketCount); checkState(readBucketCount <= tableBucketCount, "readBucketCount(%s) should be less than or equal to tableBucketCount(%s)", readBucketCount, tableBucketCount); @@ -354,53 +356,77 @@ private List getBucketedSplits( throw new PrestoException( HIVE_INVALID_BUCKET_FILES, format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s", - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), partitionName)); } - Map bucketToFileInfo = new HashMap<>(); + ListMultimap bucketToFileInfo = ArrayListMultimap.create(); if (!shouldCreateFilesForMissingBuckets(table, session)) { fileInfos.stream() - .forEach(fileInfo -> bucketToFileInfo.put(getBucketNumber(fileInfo.getPath().getName()), fileInfo)); + .forEach(fileInfo -> { + String fileName = fileInfo.getPath().getName(); + OptionalInt bucket = getBucketNumber(fileName); + if (bucket.isPresent()) { + bucketToFileInfo.put(bucket.getAsInt(), fileInfo); + } + else { + throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("invalid hive bucket file name: %s", fileName)); + } + }); } else { - // verify we found one file per bucket - if (fileInfos.size() != partitionBucketCount) { - throw new PrestoException( - HIVE_INVALID_BUCKET_FILES, - format("Hive table '%s' is corrupt. The number of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s", - new SchemaTableName(table.getDatabaseName(), table.getTableName()), - fileInfos.size(), - partitionBucketCount, - partitionName)); - } - - if (fileInfos.get(0).getPath().getName().matches("\\d+")) { - try { - // File names are integer if they are created when file_renaming_enabled is set to true - fileInfos.sort(Comparator.comparingInt(fileInfo -> Integer.parseInt(fileInfo.getPath().getName()))); + // build mapping of file name to bucket + for (HiveFileInfo file : fileInfos) { + String fileName = file.getPath().getName(); + OptionalInt bucket = getBucketNumber(fileName); + if (bucket.isPresent()) { + bucketToFileInfo.put(bucket.getAsInt(), file); + continue; } - catch (NumberFormatException e) { + + // legacy mode requires exactly one file per bucket + if (fileInfos.size() != partitionBucketCount) { throw new PrestoException( - HIVE_INVALID_FILE_NAMES, - format("Hive table '%s' is corrupt. Some of the filenames in the partition: %s are not integers", - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + HIVE_INVALID_BUCKET_FILES, + format("Hive table '%s' is corrupt. File '%s' does not match the standard naming pattern, and the number " + + "of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s", + table.getSchemaTableName(), + fileName, + fileInfos.size(), + partitionBucketCount, partitionName)); } - } - else { - // Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths - fileInfos.sort(null); - } - for (int i = 0; i < fileInfos.size(); i++) { - bucketToFileInfo.put(i, fileInfos.get(i)); + if (fileInfos.get(0).getPath().getName().matches("\\d+")) { + try { + // File names are integer if they are created when file_renaming_enabled is set to true + fileInfos.sort(Comparator.comparingInt(fileInfo -> Integer.parseInt(fileInfo.getPath().getName()))); + } + catch (NumberFormatException e) { + throw new PrestoException( + HIVE_INVALID_FILE_NAMES, + format("Hive table '%s' is corrupt. Some of the filenames in the partition: %s are not integers", + new SchemaTableName(table.getDatabaseName(), table.getTableName()), + partitionName)); + } + } + else { + // Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths + fileInfos.sort(null); + } + + // Use position in sorted list as the bucket number + bucketToFileInfo.clear(); + for (int i = 0; i < fileInfos.size(); i++) { + bucketToFileInfo.put(i, fileInfos.get(i)); + } + break; } } // convert files internal splits List splitList = new ArrayList<>(); - for (int bucketNumber = 0; bucketNumber < max(readBucketCount, partitionBucketCount); bucketNumber++) { + for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) { // Physical bucket #. This determine file name. It also determines the order of splits in the result. int partitionBucketNumber = bucketNumber % partitionBucketCount; if (!bucketToFileInfo.containsKey(partitionBucketNumber)) { @@ -411,7 +437,7 @@ private List getBucketedSplits( boolean containsIneligibleTableBucket = false; List eligibleTableBucketNumbers = new ArrayList<>(); - for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += max(readBucketCount, partitionBucketCount)) { + for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) { // table bucket number: this is used for evaluating "$bucket" filters. if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) { eligibleTableBucketNumbers.add(tableBucketNumber); @@ -432,10 +458,11 @@ private List getBucketedSplits( "partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")"); } if (!eligibleTableBucketNumbers.isEmpty()) { - HiveFileInfo fileInfo = bucketToFileInfo.get(partitionBucketNumber); - eligibleTableBucketNumbers.stream() - .map(tableBucketNumber -> splitFactory.createInternalHiveSplit(fileInfo, readBucketNumber, tableBucketNumber, splittable)) - .forEach(optionalSplit -> optionalSplit.ifPresent(splitList::add)); + for (HiveFileInfo fileInfo : bucketToFileInfo.get(partitionBucketNumber)) { + eligibleTableBucketNumbers.stream() + .map(tableBucketNumber -> splitFactory.createInternalHiveSplit(fileInfo, readBucketNumber, tableBucketNumber, splittable)) + .forEach(optionalSplit -> optionalSplit.ifPresent(splitList::add)); + } } } return splitList; diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 32f0d38da8fd3..b2682ed1bcb2f 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -3945,7 +3945,7 @@ protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storag outputHandle.getLocationHandle().getTargetPath().toString(), true); for (String filePath : listAllDataFiles(context, getStagingPathRoot(outputHandle))) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(outputHandle))); + assertTrue(new Path(filePath).getName().startsWith(session.getQueryId())); } // commit the table @@ -4145,7 +4145,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName Set tempFiles = listAllDataFiles(context, stagingPathRoot); assertTrue(!tempFiles.isEmpty()); for (String filePath : tempFiles) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(insertTableHandle))); + assertTrue(new Path(filePath).getName().startsWith(session.getQueryId())); } // rollback insert @@ -4182,16 +4182,6 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName } // These are protected so extensions to the hive connector can replace the handle classes - protected String getFilePrefix(ConnectorOutputTableHandle outputTableHandle) - { - return ((HiveWritableTableHandle) outputTableHandle).getFilePrefix(); - } - - protected String getFilePrefix(ConnectorInsertTableHandle insertTableHandle) - { - return ((HiveWritableTableHandle) insertTableHandle).getFilePrefix(); - } - protected Path getStagingPathRoot(ConnectorInsertTableHandle insertTableHandle) { HiveInsertTableHandle handle = (HiveInsertTableHandle) insertTableHandle; @@ -4362,7 +4352,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab Set tempFiles = listAllDataFiles(context, getStagingPathRoot(insertTableHandle)); assertTrue(!tempFiles.isEmpty()); for (String filePath : tempFiles) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(insertTableHandle))); + assertTrue(new Path(filePath).getName().startsWith(session.getQueryId())); } // rollback insert @@ -4490,7 +4480,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche Set tempFiles = listAllDataFiles(context, getStagingPathRoot(insertTableHandle)); assertTrue(!tempFiles.isEmpty()); for (String filePath : tempFiles) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(insertTableHandle))); + assertTrue(new Path(filePath).getName().startsWith(session.getQueryId())); } // verify statistics are visible from within of the current transaction diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index d4054e3230460..0c2a50310ace7 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -76,6 +76,7 @@ public void testDefaults() .setOrcCompressionCodec(HiveCompressionCodec.GZIP) .setRespectTableFormat(true) .setImmutablePartitions(false) + .setCreateEmptyBucketFiles(false) .setInsertOverwriteImmutablePartitionEnabled(false) .setFailFastOnInsertIntoImmutablePartitionsEnabled(true) .setSortedWritingEnabled(true) @@ -185,6 +186,7 @@ public void testExplicitPropertyMappings() .put("hive.orc-compression-codec", "ZSTD") .put("hive.respect-table-format", "false") .put("hive.immutable-partitions", "true") + .put("hive.create-empty-bucket-files", "true") .put("hive.insert-overwrite-immutable-partitions-enabled", "true") .put("hive.fail-fast-on-insert-into-immutable-partitions-enabled", "false") .put("hive.max-partitions-per-writers", "222") @@ -295,6 +297,7 @@ public void testExplicitPropertyMappings() .setOrcCompressionCodec(HiveCompressionCodec.ZSTD) .setRespectTableFormat(false) .setImmutablePartitions(true) + .setCreateEmptyBucketFiles(true) .setInsertOverwriteImmutablePartitionEnabled(true) .setFailFastOnInsertIntoImmutablePartitionsEnabled(false) .setMaxPartitionsPerWriter(222) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 186045aab54a8..49c482e7a018f 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -798,15 +798,18 @@ public void testCreateTableNonSupportedVarcharColumn() public void testCreatePartitionedBucketedTableAsFewRows() { // go through all storage formats to make sure the empty buckets are correctly created - testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, false)); + testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, false, false)); + testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, false, true)); // test with optimized PartitionUpdate serialization - testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, true)); + testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, true, false)); + testWithAllStorageFormats((session, format) -> testCreatePartitionedBucketedTableAsFewRows(session, format, true, true)); } private void testCreatePartitionedBucketedTableAsFewRows( Session session, HiveStorageFormat storageFormat, - boolean optimizedPartitionUpdateSerializationEnabled) + boolean optimizedPartitionUpdateSerializationEnabled, + boolean createEmpty) { String tableName = "test_create_partitioned_bucketed_table_as_few_rows"; @@ -829,7 +832,9 @@ private void testCreatePartitionedBucketedTableAsFewRows( assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled), + Session.builder(getTableWriteTestingSession(optimizedPartitionUpdateSerializationEnabled)) + .setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty)) + .build(), createTable, 3); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index 5e12ab781a345..89cda7df8f191 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -288,7 +288,6 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio SCHEMA_NAME, TABLE_NAME, getColumnHandles(), - "test", new HivePageSinkMetadata(new SchemaTableName(SCHEMA_NAME, TABLE_NAME), metastore.getTable(METASTORE_CONTEXT, SCHEMA_NAME, TABLE_NAME), ImmutableMap.of()), locationHandle, config.getHiveStorageFormat(), diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveWriterFactory.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveWriterFactory.java new file mode 100644 index 0000000000000..3ca811c5fffa8 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveWriterFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import org.testng.annotations.Test; + +import java.util.OptionalInt; + +import static com.facebook.presto.hive.HiveWriterFactory.computeBucketedFileName; +import static com.facebook.presto.hive.HiveWriterFactory.getBucketNumber; +import static org.apache.hadoop.hive.ql.exec.Utilities.getBucketIdFromFile; +import static org.testng.Assert.assertEquals; + +public class TestHiveWriterFactory +{ + @Test + public void testComputeBucketedFileName() + { + String name = computeBucketedFileName("20180102_030405_00641_x1y2z", 1234); + assertEquals(name, "001234_0_20180102_030405_00641_x1y2z"); + assertEquals(getBucketIdFromFile(name), 1234); + } + + @Test + public void testGetBucketNumber() + { + assertEquals(getBucketNumber("0234_0"), OptionalInt.of(234)); + assertEquals(getBucketNumber("000234_0"), OptionalInt.of(234)); + assertEquals(getBucketNumber("0234_99"), OptionalInt.of(234)); + assertEquals(getBucketNumber("0234_0.txt"), OptionalInt.of(234)); + assertEquals(getBucketNumber("0234_0_copy_1"), OptionalInt.of(234)); + assertEquals(getBucketNumber("20190526_072952_00009_fn7s5_bucket-00234"), OptionalInt.of(234)); + assertEquals(getBucketNumber("20190526_072952_00009_fn7s5_bucket-00234.txt"), OptionalInt.of(234)); + assertEquals(getBucketNumber("20190526_235847_87654_fn7s5_bucket-56789"), OptionalInt.of(56789)); + + assertEquals(getBucketNumber("234_99"), OptionalInt.empty()); + assertEquals(getBucketNumber("0234.txt"), OptionalInt.empty()); + assertEquals(getBucketNumber("0234.txt"), OptionalInt.empty()); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestingSemiTransactionalHiveMetastore.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestingSemiTransactionalHiveMetastore.java index 302035985db3d..685b48abc2313 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestingSemiTransactionalHiveMetastore.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestingSemiTransactionalHiveMetastore.java @@ -332,7 +332,7 @@ public synchronized void revokeTablePrivileges(MetastoreContext metastoreContext } @Override - public synchronized void declareIntentionToWrite(HdfsContext context, MetastoreContext metastoreContext, LocationHandle.WriteMode writeMode, Path stagingPathRoot, Optional tempPathRoot, String filePrefix, SchemaTableName schemaTableName, boolean temporaryTable) + public synchronized void declareIntentionToWrite(HdfsContext context, MetastoreContext metastoreContext, LocationHandle.WriteMode writeMode, Path stagingPathRoot, Optional tempPathRoot, SchemaTableName schemaTableName, boolean temporaryTable) { throw new UnsupportedOperationException("method not implemented"); } diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java index e9f033422e15b..82e69f65b9629 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBasicTableStatistics.java @@ -276,7 +276,7 @@ public void testInsertBucketed() BasicStatistics statisticsAfterCreate = getBasicStatisticsForTable(onHive(), tableName); assertThatStatisticsAreNonZero(statisticsAfterCreate); assertThat(statisticsAfterCreate.getNumRows().getAsLong()).isEqualTo(25); - assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(50); + assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(25); // no files for empty buckets // Insert into bucketed unpartitioned table is unsupported assertThatThrownBy(() -> insertNationData(onPresto(), tableName)) @@ -284,7 +284,7 @@ public void testInsertBucketed() BasicStatistics statisticsAfterInsert = getBasicStatisticsForTable(onHive(), tableName); assertThat(statisticsAfterInsert.getNumRows().getAsLong()).isEqualTo(25); - assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(50); + assertThat(statisticsAfterCreate.getNumFiles().getAsLong()).isEqualTo(25); // no files for empty buckets } finally { onPresto().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); @@ -316,7 +316,7 @@ public void testInsertBucketedPartitioned() BasicStatistics firstPartitionStatistics = getBasicStatisticsForPartition(onHive(), tableName, "n_regionkey=1"); assertThatStatisticsAreNonZero(firstPartitionStatistics); assertThat(firstPartitionStatistics.getNumRows().getAsLong()).isEqualTo(5); - assertThat(firstPartitionStatistics.getNumFiles().getAsLong()).isEqualTo(10); + assertThat(firstPartitionStatistics.getNumFiles().getAsLong()).isEqualTo(5); // no files for empty buckets onPresto().executeQuery(format("" + "INSERT INTO %s (n_nationkey, n_regionkey, n_name, n_comment) " + @@ -326,7 +326,7 @@ public void testInsertBucketedPartitioned() BasicStatistics secondPartitionStatistics = getBasicStatisticsForPartition(onHive(), tableName, "n_regionkey=2"); assertThat(secondPartitionStatistics.getNumRows().getAsLong()).isEqualTo(5); - assertThat(secondPartitionStatistics.getNumFiles().getAsLong()).isEqualTo(10); + assertThat(secondPartitionStatistics.getNumFiles().getAsLong()).isEqualTo(4); // no files for empty buckets // Insert into existing bucketed partition is not supported assertThatThrownBy(() -> insertNationData(onPresto(), tableName)) @@ -334,7 +334,7 @@ public void testInsertBucketedPartitioned() BasicStatistics secondPartitionUpdatedStatistics = getBasicStatisticsForPartition(onHive(), tableName, "n_regionkey=2"); assertThat(secondPartitionUpdatedStatistics.getNumRows().getAsLong()).isEqualTo(5); - assertThat(secondPartitionUpdatedStatistics.getNumFiles().getAsLong()).isEqualTo(10); + assertThat(secondPartitionUpdatedStatistics.getNumFiles().getAsLong()).isEqualTo(4); // no files for empty buckets } finally { onPresto().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java index 4e95eb2257d0f..18842446f8b74 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestHiveBucketedTables.java @@ -86,10 +86,30 @@ public void testIgnorePartitionBucketingIfNotBucketed() onHive().executeQuery(format("ALTER TABLE %s NOT CLUSTERED", tableName)); + assertThat(query(format("SELECT count(DISTINCT n_nationkey), count(*) FROM %s", tableName))) + .hasRowsCount(1) + .contains(row(25, 50)); + assertThat(query(format("SELECT count(*) FROM %s WHERE n_nationkey = 1", tableName))) .containsExactly(row(2)); } + @Test(groups = {BIG_QUERY}) + public void testAllowMultipleFilesPerBucket() + { + String tableName = mutableTablesState().get(BUCKETED_PARTITIONED_NATION).getNameInDatabase(); + for (int i = 0; i < 3; i++) { + populateHivePartitionedTable(tableName, NATION.getName(), "part_key = 'insert'"); + } + + assertThat(query(format("SELECT count(DISTINCT n_nationkey), count(*) FROM %s", tableName))) + .hasRowsCount(1) + .contains(row(25, 75)); + + assertThat(query(format("SELECT count(*) FROM %s WHERE n_nationkey = 1", tableName))) + .containsExactly(row(3)); + } + @Test(groups = {BIG_QUERY}) public void testIgnorePartitionBucketingIfBucketedExecutionDisabled() {