diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java index e42785c2771a..62507745aabe 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java @@ -13,8 +13,11 @@ */ package io.prestosql.plugin.hive; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Streams; import com.google.common.io.CharStreams; import com.google.common.util.concurrent.ListenableFuture; @@ -32,7 +35,6 @@ import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorSession; -import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.predicate.TupleDomain; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -58,11 +60,14 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.IntPredicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -86,6 +91,8 @@ import static io.prestosql.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED; import static io.prestosql.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Integer.parseInt; +import static java.lang.Math.max; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER; @@ -93,6 +100,12 @@ public class BackgroundHiveSplitLoader implements HiveSplitLoader { + private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` + Pattern.compile("(0\\d+)_\\d+.*"), + // legacy Presto naming pattern (current version matches Hive) + Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?")); + private static final ListenableFuture COMPLETED_FUTURE = immediateFuture(null); private final Table table; @@ -413,10 +426,11 @@ private List getBucketedSplits(Path path, FileSystem fileSyst { int readBucketCount = bucketSplitInfo.getReadBucketCount(); int tableBucketCount = bucketSplitInfo.getTableBucketCount(); - int partitionBucketCount = bucketConversion.isPresent() ? bucketConversion.get().getPartitionBucketCount() : tableBucketCount; + int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount); + int bucketCount = max(readBucketCount, partitionBucketCount); // list all files in the partition - ArrayList files = new ArrayList<>(partitionBucketCount); + List files = new ArrayList<>(partitionBucketCount); try { Iterators.addAll(files, new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, FAIL)); } @@ -425,27 +439,46 @@ private List getBucketedSplits(Path path, FileSystem fileSyst throw new PrestoException( HIVE_INVALID_BUCKET_FILES, format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s", - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), splitFactory.getPartitionName())); } - // verify we found one file per bucket - if (files.size() != partitionBucketCount) { - throw new PrestoException( - HIVE_INVALID_BUCKET_FILES, - format("Hive table '%s' is corrupt. The number of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s", - new SchemaTableName(table.getDatabaseName(), table.getTableName()), - files.size(), - partitionBucketCount, - splitFactory.getPartitionName())); - } + // build mapping of file name to bucket + ListMultimap bucketFiles = ArrayListMultimap.create(); + for (LocatedFileStatus file : files) { + String fileName = file.getPath().getName(); + OptionalInt bucket = getBucketNumber(fileName); + if (bucket.isPresent()) { + bucketFiles.put(bucket.getAsInt(), file); + continue; + } + + // legacy mode requires exactly one file per bucket + if (files.size() != partitionBucketCount) { + throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format( + "Hive table '%s' is corrupt. File '%s' does not match the standard naming pattern, and the number " + + "of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s", + table.getSchemaTableName(), + fileName, + files.size(), + partitionBucketCount, + splitFactory.getPartitionName())); + } - // Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths - files.sort(null); + // sort FileStatus objects per `org.apache.hadoop.hive.ql.metadata.Table#getSortedPaths()` + files.sort(null); + + // use position in sorted list as the bucket number + bucketFiles.clear(); + for (int i = 0; i < files.size(); i++) { + bucketFiles.put(i, files.get(i)); + } + break; + } // convert files internal splits List splitList = new ArrayList<>(); - for (int bucketNumber = 0; bucketNumber < Math.max(readBucketCount, partitionBucketCount); bucketNumber++) { + for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) { // Physical bucket #. This determine file name. It also determines the order of splits in the result. int partitionBucketNumber = bucketNumber % partitionBucketCount; // Logical bucket #. Each logical bucket corresponds to a "bucket" from engine's perspective. @@ -453,7 +486,7 @@ private List getBucketedSplits(Path path, FileSystem fileSyst boolean containsEligibleTableBucket = false; boolean containsIneligibleTableBucket = false; - for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += Math.max(readBucketCount, partitionBucketCount)) { + for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) { // table bucket number: this is used for evaluating "$bucket" filters. if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) { containsEligibleTableBucket = true; @@ -474,14 +507,27 @@ private List getBucketedSplits(Path path, FileSystem fileSyst "partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")"); } if (containsEligibleTableBucket) { - LocatedFileStatus file = files.get(partitionBucketNumber); - splitFactory.createInternalHiveSplit(file, readBucketNumber) - .ifPresent(splitList::add); + for (LocatedFileStatus file : bucketFiles.get(partitionBucketNumber)) { + splitFactory.createInternalHiveSplit(file, readBucketNumber) + .ifPresent(splitList::add); + } } } return splitList; } + @VisibleForTesting + static OptionalInt getBucketNumber(String name) + { + for (Pattern pattern : BUCKET_PATTERNS) { + Matcher matcher = pattern.matcher(name); + if (matcher.matches()) { + return OptionalInt.of(parseInt(matcher.group(1))); + } + } + return OptionalInt.empty(); + } + private static List getTargetPathsFromSymlink(FileSystem fileSystem, Path symlinkDir) { try { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/CachingDirectoryLister.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/CachingDirectoryLister.java index 2b6ca483fcf2..572aad18958c 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/CachingDirectoryLister.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/CachingDirectoryLister.java @@ -74,15 +74,13 @@ private static SchemaTableName parseTableName(String tableName) public RemoteIterator list(FileSystem fs, Table table, Path path) throws IOException { - SchemaTableName schemaTableName = new SchemaTableName(table.getDatabaseName(), table.getTableName()); - List files = cache.getIfPresent(path); if (files != null) { return simpleRemoteIterator(files); } RemoteIterator iterator = fs.listLocatedStatus(path); - if (!tableNames.contains(schemaTableName)) { + if (!tableNames.contains(table.getSchemaTableName())) { return iterator; } return cachingRemoteIterator(iterator, path); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java index 07f576381aab..5962b00702fa 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java @@ -97,6 +97,7 @@ public class HiveConfig private HiveCompressionCodec hiveCompressionCodec = HiveCompressionCodec.GZIP; private boolean respectTableFormat = true; private boolean immutablePartitions; + private boolean createEmptyBucketFiles = true; private int maxPartitionsPerWriter = 100; private int maxOpenSortFiles = 50; private int writeValidationThreads = 16; @@ -609,6 +610,19 @@ public HiveConfig setImmutablePartitions(boolean immutablePartitions) return this; } + public boolean isCreateEmptyBucketFiles() + { + return createEmptyBucketFiles; + } + + @Config("hive.create-empty-bucket-files") + @ConfigDescription("Create empty files for buckets that have no data") + public HiveConfig setCreateEmptyBucketFiles(boolean createEmptyBucketFiles) + { + this.createEmptyBucketFiles = createEmptyBucketFiles; + return this; + } + @Min(1) public int getMaxPartitionsPerWriter() { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java index a8de4dca6ed7..49603e4d7584 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java @@ -30,7 +30,6 @@ public HiveInsertTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("inputColumns") List inputColumns, - @JsonProperty("filePrefix") String filePrefix, @JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata, @JsonProperty("locationHandle") LocationHandle locationHandle, @JsonProperty("bucketProperty") Optional bucketProperty, @@ -41,7 +40,6 @@ public HiveInsertTableHandle( schemaName, tableName, inputColumns, - filePrefix, pageSinkMetadata, locationHandle, bucketProperty, diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java index e4cd12e12665..613df353bd17 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java @@ -17,7 +17,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.base.Suppliers; -import com.google.common.base.Verify; import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -46,7 +45,6 @@ import io.prestosql.plugin.hive.metastore.thrift.ThriftMetastoreUtil; import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider; import io.prestosql.spi.PrestoException; -import io.prestosql.spi.StandardErrorCode; import io.prestosql.spi.block.Block; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ColumnMetadata; @@ -114,7 +112,6 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -149,6 +146,7 @@ import static io.prestosql.plugin.hive.HiveSessionProperties.getHiveStorageFormat; import static io.prestosql.plugin.hive.HiveSessionProperties.isBucketExecutionEnabled; import static io.prestosql.plugin.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite; +import static io.prestosql.plugin.hive.HiveSessionProperties.isCreateEmptyBucketFiles; import static io.prestosql.plugin.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount; import static io.prestosql.plugin.hive.HiveSessionProperties.isRespectTableFormat; import static io.prestosql.plugin.hive.HiveSessionProperties.isSortedWritingEnabled; @@ -177,12 +175,12 @@ import static io.prestosql.plugin.hive.HiveUtil.encodeViewData; import static io.prestosql.plugin.hive.HiveUtil.getPartitionKeyColumnHandles; import static io.prestosql.plugin.hive.HiveUtil.hiveColumnHandles; -import static io.prestosql.plugin.hive.HiveUtil.schemaTableName; import static io.prestosql.plugin.hive.HiveUtil.toPartitionValues; import static io.prestosql.plugin.hive.HiveUtil.verifyPartitionTypeSupported; import static io.prestosql.plugin.hive.HiveWriteUtils.checkTableIsWritable; import static io.prestosql.plugin.hive.HiveWriteUtils.initializeSerializer; import static io.prestosql.plugin.hive.HiveWriteUtils.isWritableType; +import static io.prestosql.plugin.hive.HiveWriterFactory.computeBucketedFileName; import static io.prestosql.plugin.hive.PartitionUpdate.UpdateMode.APPEND; import static io.prestosql.plugin.hive.PartitionUpdate.UpdateMode.NEW; import static io.prestosql.plugin.hive.PartitionUpdate.UpdateMode.OVERWRITE; @@ -305,7 +303,7 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName if (getSourceTableNameFromSystemTable(tableName).isPresent()) { // We must not allow system table due to how permissions are checked in SystemTableAwareAccessControl.checkCanSelectFromTable() - throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, format("Unexpected table %s present in Hive metastore", tableName)); + throw new PrestoException(NOT_SUPPORTED, "Unexpected table present in Hive metastore: " + tableName); } verifyOnline(tableName, Optional.empty(), getProtectMode(table.get()), table.get().getParameters()); @@ -369,7 +367,9 @@ private Optional getPartitionsSystemTable(ConnectorSession session, return Optional.empty(); } - List partitionColumns = getPartitionColumns(sourceTableName); + Table sourceTable = metastore.getTable(sourceTableName.getSchemaName(), sourceTableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(sourceTableName)); + List partitionColumns = getPartitionKeyColumnHandles(sourceTable); if (partitionColumns.isEmpty()) { return Optional.empty(); } @@ -411,18 +411,10 @@ private Optional getPartitionsSystemTable(ConnectorSession session, })); } - private List getPartitionColumns(SchemaTableName tableName) - { - Table sourceTable = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()).get(); - return getPartitionKeyColumnHandles(sourceTable); - } - @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle) { - requireNonNull(tableHandle, "tableHandle is null"); - SchemaTableName tableName = schemaTableName(tableHandle); - return getTableMetadata(tableName); + return getTableMetadata(((HiveTableHandle) tableHandle).getSchemaTableName()); } private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) @@ -477,12 +469,11 @@ private ConnectorTableMetadata doGetTableMetadata(SchemaTableName tableName) } // Bucket properties - Optional bucketProperty = table.get().getStorage().getBucketProperty(); - if (bucketProperty.isPresent()) { - properties.put(BUCKET_COUNT_PROPERTY, bucketProperty.get().getBucketCount()); - properties.put(BUCKETED_BY_PROPERTY, bucketProperty.get().getBucketedBy()); - properties.put(SORTED_BY_PROPERTY, bucketProperty.get().getSortedBy()); - } + table.get().getStorage().getBucketProperty().ifPresent(property -> { + properties.put(BUCKET_COUNT_PROPERTY, property.getBucketCount()); + properties.put(BUCKETED_BY_PROPERTY, property.getBucketedBy()); + properties.put(SORTED_BY_PROPERTY, property.getSortedBy()); + }); // ORC format specific properties String orcBloomFilterColumns = table.get().getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY); @@ -513,7 +504,7 @@ public Optional getInfo(ConnectorTableLayoutHandle layoutHandle) return Optional.of(new HiveInputInfo( tableLayoutHandle.getPartitions().get().stream() .map(HivePartition::getPartitionId) - .collect(Collectors.toList()), + .collect(toList()), false)); } @@ -543,16 +534,11 @@ private List listSchemas(ConnectorSession session, Optional sche @Override public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { - SchemaTableName tableName = schemaTableName(tableHandle); - Optional table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); - if (!table.isPresent()) { - throw new TableNotFoundException(tableName); - } - ImmutableMap.Builder columnHandles = ImmutableMap.builder(); - for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) { - columnHandles.put(columnHandle.getName(), columnHandle); - } - return columnHandles.build(); + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); + Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); + return hiveColumnHandles(table).stream() + .collect(toImmutableMap(HiveColumnHandle::getName, identity())); } @SuppressWarnings("TryWithIdenticalCatches") @@ -922,15 +908,9 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl private void failIfAvroSchemaIsSet(HiveTableHandle handle) { - String tableName = handle.getTableName(); - String schemaName = handle.getSchemaName(); - Optional
table = metastore.getTable(schemaName, tableName); - - if (!table.isPresent()) { - throw new TableNotFoundException(new SchemaTableName(schemaName, tableName)); - } - - if (table.get().getParameters().get(AVRO_SCHEMA_URL_KEY) != null) { + Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName()) + .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); + if (table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null) { throw new PrestoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema url is set"); } } @@ -953,11 +933,9 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { HiveTableHandle handle = (HiveTableHandle) tableHandle; - SchemaTableName tableName = schemaTableName(tableHandle); - Optional
target = metastore.getTable(handle.getSchemaName(), handle.getTableName()); if (!target.isPresent()) { - throw new TableNotFoundException(tableName); + throw new TableNotFoundException(handle.getSchemaTableName()); } metastore.dropTable(session, handle.getSchemaName(), handle.getTableName()); } @@ -966,12 +944,10 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle) { verifyJvmTimeZone(); - HiveTableHandle handle = (HiveTableHandle) tableHandle; - SchemaTableName tableName = handle.getSchemaTableName(); - + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); metastore.getTable(tableName.getSchemaName(), tableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(tableName)); - return handle; + return tableHandle; } @Override @@ -1075,7 +1051,6 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto schemaName, tableName, columnHandles, - session.getQueryId(), metastore.generatePageSinkMetadata(schemaTableName), locationHandle, tableStorageFormat, @@ -1086,7 +1061,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto tableProperties); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); - metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), result.getFilePrefix(), schemaTableName); + metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), schemaTableName); return result; } @@ -1119,10 +1094,10 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); - if (handle.getBucketProperty().isPresent()) { - ImmutableList partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates); + if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback - partitionUpdates = PartitionUpdate.mergePartitionUpdates(Iterables.concat(partitionUpdates, partitionUpdatesForMissingBuckets)); + partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate)); createEmptyFile(session, partitionUpdate.getWritePath(), table, partition, partitionUpdate.getFileNames()); @@ -1149,7 +1124,7 @@ public Optional finishCreateTable(ConnectorSession sess if (!handle.getPartitionedBy().isEmpty()) { if (isRespectTableFormat(session)) { - Verify.verify(handle.getPartitionStorageFormat() == handle.getTableStorageFormat()); + verify(handle.getPartitionStorageFormat() == handle.getTableStorageFormat()); } for (PartitionUpdate update : partitionUpdates) { Partition partition = buildPartitionObject(session, table, update); @@ -1171,10 +1146,10 @@ public Optional finishCreateTable(ConnectorSession sess return Optional.of(new HiveWrittenPartitions( partitionUpdates.stream() .map(PartitionUpdate::getName) - .collect(Collectors.toList()))); + .collect(toList()))); } - private ImmutableList computePartitionUpdatesForMissingBuckets( + private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, Table table, @@ -1190,7 +1165,6 @@ private ImmutableList computePartitionUpdatesForMissingBuckets( table, storageFormat, partitionUpdate.getTargetPath(), - handle.getFilePrefix(), bucketCount, partitionUpdate); partitionUpdatesForMissingBucketsBuilder.add(new PartitionUpdate( @@ -1211,7 +1185,6 @@ private List computeFileNamesForMissingBuckets( Table table, HiveStorageFormat storageFormat, Path targetPath, - String filePrefix, int bucketCount, PartitionUpdate partitionUpdate) { @@ -1225,7 +1198,7 @@ private List computeFileNamesForMissingBuckets( Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); ImmutableList.Builder missingFileNamesBuilder = ImmutableList.builder(); for (int i = 0; i < bucketCount; i++) { - String fileName = HiveWriterFactory.computeBucketedFileName(filePrefix, i) + fileExtension; + String fileName = computeBucketedFileName(session.getQueryId(), i) + fileExtension; if (!fileNames.contains(fileName)) { missingFileNamesBuilder.add(fileName); } @@ -1275,41 +1248,36 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl { verifyJvmTimeZone(); - SchemaTableName tableName = schemaTableName(tableHandle); - Optional
table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()); - if (!table.isPresent()) { - throw new TableNotFoundException(tableName); - } + SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName(); + Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); - checkTableIsWritable(table.get(), writesToNonManagedTablesEnabled); + checkTableIsWritable(table, writesToNonManagedTablesEnabled); - for (Column column : table.get().getDataColumns()) { + for (Column column : table.getDataColumns()) { if (!isWritableType(column.getType())) { - throw new PrestoException( - NOT_SUPPORTED, - format("Inserting into Hive table %s.%s with column type %s not supported", table.get().getDatabaseName(), table.get().getTableName(), column.getType())); + throw new PrestoException(NOT_SUPPORTED, format("Inserting into Hive table %s with column type %s not supported", tableName, column.getType())); } } - List handles = hiveColumnHandles(table.get()).stream() + List handles = hiveColumnHandles(table).stream() .filter(columnHandle -> !columnHandle.isHidden()) .collect(toList()); - HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table.get()); - LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table.get()); + HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table); + LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table); HiveInsertTableHandle result = new HiveInsertTableHandle( tableName.getSchemaName(), tableName.getTableName(), handles, - session.getQueryId(), metastore.generatePageSinkMetadata(tableName), locationHandle, - table.get().getStorage().getBucketProperty(), + table.getStorage().getBucketProperty(), tableStorageFormat, - isRespectTableFormat(session) ? tableStorageFormat : HiveSessionProperties.getHiveStorageFormat(session)); + isRespectTableFormat(session) ? tableStorageFormat : getHiveStorageFormat(session)); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); - metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), result.getFilePrefix(), tableName); + metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), tableName); return result; } @@ -1326,25 +1294,23 @@ public Optional finishInsert(ConnectorSession session, HiveStorageFormat tableStorageFormat = handle.getTableStorageFormat(); partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); - Optional
table = metastore.getTable(handle.getSchemaName(), handle.getTableName()); - if (!table.isPresent()) { - throw new TableNotFoundException(new SchemaTableName(handle.getSchemaName(), handle.getTableName())); - } - if (!table.get().getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) && isRespectTableFormat(session)) { + Table table = metastore.getTable(handle.getSchemaName(), handle.getTableName()) + .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); + if (!table.getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) && isRespectTableFormat(session)) { throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Table format changed during insert"); } - if (handle.getBucketProperty().isPresent()) { - ImmutableList partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table.get(), partitionUpdates); + if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback - partitionUpdates = PartitionUpdate.mergePartitionUpdates(Iterables.concat(partitionUpdates, partitionUpdatesForMissingBuckets)); + partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { - Optional partition = table.get().getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table.get(), partitionUpdate)); - createEmptyFile(session, partitionUpdate.getWritePath(), table.get(), partition, partitionUpdate.getFileNames()); + Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate)); + createEmptyFile(session, partitionUpdate.getWritePath(), table, partition, partitionUpdate.getFileNames()); } } - List partitionedBy = table.get().getPartitionColumns().stream() + List partitionedBy = table.getPartitionColumns().stream() .map(Column::getName) .collect(toImmutableList()); Map columnTypes = handle.getInputColumns().stream() @@ -1386,7 +1352,7 @@ else if (partitionUpdate.getUpdateMode() == APPEND) { } else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode() == OVERWRITE) { // insert into new partition or overwrite existing partition - Partition partition = buildPartitionObject(session, table.get(), partitionUpdate); + Partition partition = buildPartitionObject(session, table, partitionUpdate); if (!partition.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) { throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert"); } @@ -1408,7 +1374,7 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode return Optional.of(new HiveWrittenPartitions( partitionUpdates.stream() .map(PartitionUpdate::getName) - .collect(Collectors.toList()))); + .collect(toList()))); } private Partition buildPartitionObject(ConnectorSession session, Table table, PartitionUpdate partitionUpdate) @@ -1425,7 +1391,7 @@ private Partition buildPartitionObject(ConnectorSession session, Table table, Pa .withStorage(storage -> storage .setStorageFormat(isRespectTableFormat(session) ? table.getStorage().getStorageFormat() : - fromHiveStorageFormat(HiveSessionProperties.getHiveStorageFormat(session))) + fromHiveStorageFormat(getHiveStorageFormat(session))) .setLocation(partitionUpdate.getTargetPath().toString()) .setBucketProperty(table.getStorage().getBucketProperty()) .setSerdeParameters(table.getStorage().getSerdeParameters())) @@ -1463,9 +1429,9 @@ private PartitionStatistics createPartitionStatistics( return new PartitionStatistics(basicStatistics, columnStatistics); } - private Map getColumnStatistics(Map, ComputedStatistics> partitionComputedStatistics, List partitionValues) + private static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) { - return Optional.ofNullable(partitionComputedStatistics.get(partitionValues)) + return Optional.ofNullable(statistics.get(partitionValues)) .map(ComputedStatistics::getColumnStatistics) .orElse(ImmutableMap.of()); } @@ -1682,7 +1648,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa hiveBucketHandle.getReadBucketCount(), hiveBucketHandle.getColumns().stream() .map(HiveColumnHandle::getHiveType) - .collect(Collectors.toList()), + .collect(toList()), OptionalInt.empty()), hiveBucketHandle.getColumns().stream() .map(ColumnHandle.class::cast) @@ -1789,9 +1755,7 @@ static TupleDomain createPredicate(List partitionCol return withColumnDomains( partitionColumns.stream() - .collect(Collectors.toMap( - identity(), - column -> buildColumnDomain(column, partitions)))); + .collect(toMap(identity(), column -> buildColumnDomain(column, partitions)))); } private static Domain buildColumnDomain(ColumnHandle column, List partitions) @@ -1857,11 +1821,11 @@ public Optional getInsertLayout(ConnectorSession sessio hiveBucketHandle.get().getTableBucketCount(), hiveBucketHandle.get().getColumns().stream() .map(HiveColumnHandle::getHiveType) - .collect(Collectors.toList()), + .collect(toList()), OptionalInt.of(hiveBucketHandle.get().getTableBucketCount())); List partitionColumns = hiveBucketHandle.get().getColumns().stream() .map(HiveColumnHandle::getName) - .collect(Collectors.toList()); + .collect(toList()); return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns)); } @@ -1933,6 +1897,7 @@ private List getColumnStatisticMetadata(String columnNa .collect(toImmutableList()); } + @Override public void createRole(ConnectorSession session, String role, Optional grantor) { // roles are case insensitive in Hive diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java index 1d94aea551d1..7774d2339957 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java @@ -39,7 +39,6 @@ public HiveOutputTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("inputColumns") List inputColumns, - @JsonProperty("filePrefix") String filePrefix, @JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata, @JsonProperty("locationHandle") LocationHandle locationHandle, @JsonProperty("tableStorageFormat") HiveStorageFormat tableStorageFormat, @@ -53,7 +52,6 @@ public HiveOutputTableHandle( schemaName, tableName, inputColumns, - filePrefix, pageSinkMetadata, locationHandle, bucketProperty, diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java index d79408ecb2a0..4f8c270af208 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java @@ -142,7 +142,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean sortedBy, handle.getLocationHandle(), locationService, - handle.getFilePrefix(), + session.getQueryId(), new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), memoizeMetastore(metastore, perTransactionMetastoreCacheMaximumSize)), typeManager, hdfsEnvironment, diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java index fc8c38eef367..49694afa65da 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java @@ -60,6 +60,7 @@ public final class HiveSessionProperties private static final String ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY = "orc_optimized_writer_max_dictionary_memory"; private static final String HIVE_STORAGE_FORMAT = "hive_storage_format"; private static final String RESPECT_TABLE_FORMAT = "respect_table_format"; + private static final String CREATE_EMPTY_BUCKET_FILES = "create_empty_bucket_files"; private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names"; private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; @@ -218,6 +219,11 @@ public HiveSessionProperties(HiveConfig hiveConfig, OrcFileWriterConfig orcFileW "Write new partitions using table format rather than default storage format", hiveConfig.isRespectTableFormat(), false), + booleanProperty( + CREATE_EMPTY_BUCKET_FILES, + "Create empty files for buckets that have no data", + hiveConfig.isCreateEmptyBucketFiles(), + false), booleanProperty( PARQUET_USE_COLUMN_NAME, "Experimental: Parquet: Access Parquet columns using names from the file", @@ -417,6 +423,11 @@ public static boolean isRespectTableFormat(ConnectorSession session) return session.getProperty(RESPECT_TABLE_FORMAT, Boolean.class); } + public static boolean isCreateEmptyBucketFiles(ConnectorSession session) + { + return session.getProperty(CREATE_EMPTY_BUCKET_FILES, Boolean.class); + } + public static boolean isUseParquetColumnNames(ConnectorSession session) { return session.getProperty(PARQUET_USE_COLUMN_NAME, Boolean.class); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveUtil.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveUtil.java index 75c4292af0b1..5589030621e1 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveUtil.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveUtil.java @@ -29,9 +29,7 @@ import io.prestosql.plugin.hive.util.FooterAwareRecordReader; import io.prestosql.spi.ErrorCodeSupplier; import io.prestosql.spi.PrestoException; -import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.RecordCursor; -import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.predicate.NullableValue; import io.prestosql.spi.type.CharType; import io.prestosql.spi.type.DecimalType; @@ -792,11 +790,6 @@ public static Slice charPartitionKey(String value, String name, Type columnType) return partitionKey; } - public static SchemaTableName schemaTableName(ConnectorTableHandle tableHandle) - { - return ((HiveTableHandle) tableHandle).getSchemaTableName(); - } - public static List hiveColumnHandles(Table table) { ImmutableList.Builder columns = ImmutableList.builder(); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java index 056ad3295a80..4443495c3a75 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java @@ -13,9 +13,11 @@ */ package io.prestosql.plugin.hive; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.prestosql.plugin.hive.metastore.HivePageSinkMetadata; +import io.prestosql.spi.connector.SchemaTableName; import java.util.List; import java.util.Optional; @@ -27,8 +29,7 @@ public class HiveWritableTableHandle private final String schemaName; private final String tableName; private final List inputColumns; - private final String filePrefix; - private HivePageSinkMetadata pageSinkMetadata; + private final HivePageSinkMetadata pageSinkMetadata; private final LocationHandle locationHandle; private final Optional bucketProperty; private final HiveStorageFormat tableStorageFormat; @@ -38,7 +39,6 @@ public HiveWritableTableHandle( String schemaName, String tableName, List inputColumns, - String filePrefix, HivePageSinkMetadata pageSinkMetadata, LocationHandle locationHandle, Optional bucketProperty, @@ -48,7 +48,6 @@ public HiveWritableTableHandle( this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null")); - this.filePrefix = requireNonNull(filePrefix, "filePrefix is null"); this.pageSinkMetadata = requireNonNull(pageSinkMetadata, "pageSinkMetadata is null"); this.locationHandle = requireNonNull(locationHandle, "locationHandle is null"); this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null"); @@ -68,16 +67,16 @@ public String getTableName() return tableName; } - @JsonProperty - public List getInputColumns() + @JsonIgnore + public SchemaTableName getSchemaTableName() { - return inputColumns; + return new SchemaTableName(schemaName, tableName); } @JsonProperty - public String getFilePrefix() + public List getInputColumns() { - return filePrefix; + return inputColumns; } @JsonProperty diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriteUtils.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriteUtils.java index e3b305cfb588..7e642a0b8b2e 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriteUtils.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriteUtils.java @@ -377,7 +377,7 @@ public static void checkTableIsWritable(Table table, boolean writesToNonManagedT } checkWritable( - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), Optional.empty(), getProtectMode(table), table.getParameters(), @@ -387,7 +387,7 @@ public static void checkTableIsWritable(Table table, boolean writesToNonManagedT public static void checkPartitionIsWritable(String partitionName, Partition partition) { checkWritable( - new SchemaTableName(partition.getDatabaseName(), partition.getTableName()), + partition.getSchemaTableName(), Optional.of(partitionName), getProtectMode(partition), partition.getParameters(), diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java index d55d8d8dc4d7..b1f067337e8e 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java @@ -111,7 +111,7 @@ public class HiveWriterFactory private final HiveStorageFormat partitionStorageFormat; private final LocationHandle locationHandle; private final LocationService locationService; - private final String filePrefix; + private final String queryId; private final HivePageSinkMetadataProvider pageSinkMetadataProvider; private final TypeManager typeManager; @@ -149,7 +149,7 @@ public HiveWriterFactory( List sortedBy, LocationHandle locationHandle, LocationService locationService, - String filePrefix, + String queryId, HivePageSinkMetadataProvider pageSinkMetadataProvider, TypeManager typeManager, HdfsEnvironment hdfsEnvironment, @@ -172,7 +172,7 @@ public HiveWriterFactory( this.partitionStorageFormat = requireNonNull(partitionStorageFormat, "partitionStorageFormat is null"); this.locationHandle = requireNonNull(locationHandle, "locationHandle is null"); this.locationService = requireNonNull(locationService, "locationService is null"); - this.filePrefix = requireNonNull(filePrefix, "filePrefix is null"); + this.queryId = requireNonNull(queryId, "queryId is null"); this.pageSinkMetadataProvider = requireNonNull(pageSinkMetadataProvider, "pageSinkMetadataProvider is null"); @@ -267,10 +267,10 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt String fileName; if (bucketNumber.isPresent()) { - fileName = computeBucketedFileName(filePrefix, bucketNumber.getAsInt()); + fileName = computeBucketedFileName(queryId, bucketNumber.getAsInt()); } else { - fileName = filePrefix + "_" + randomUUID(); + fileName = queryId + "_" + randomUUID(); } List partitionValues = createPartitionValues(partitionColumnTypes, partitionColumns, position); @@ -586,9 +586,10 @@ private void validateSchema(Optional partitionName, Properties schema) } } - public static String computeBucketedFileName(String filePrefix, int bucket) + public static String computeBucketedFileName(String queryId, int bucket) { - return filePrefix + "_bucket-" + Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0'); + String paddedBucket = Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0'); + return format("0%s_0_%s", paddedBucket, queryId); } public static String getFileExtension(JobConf conf, StorageFormat storageFormat) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Partition.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Partition.java index c4d90d38deb3..730d9ec7025b 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Partition.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Partition.java @@ -14,9 +14,11 @@ package io.prestosql.plugin.hive.metastore; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.prestosql.spi.connector.SchemaTableName; import javax.annotation.concurrent.Immutable; @@ -67,6 +69,12 @@ public String getTableName() return tableName; } + @JsonIgnore + public SchemaTableName getSchemaTableName() + { + return new SchemaTableName(databaseName, tableName); + } + @JsonProperty public List getValues() { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index c41e76f11e5f..7ec4ddf8e3c4 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -66,6 +66,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CORRUPTED_COLUMN_STATISTICS; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; @@ -194,7 +195,7 @@ public synchronized Map getPartitionStatistics(Stri return ImmutableMap.of(); } TableSource tableSource = getTableSource(databaseName, tableName); - Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); + Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(table.get().getSchemaTableName(), k -> new HashMap<>()); ImmutableSet.Builder partitionNamesToQuery = ImmutableSet.builder(); ImmutableMap.Builder resultBuilder = ImmutableMap.builder(); for (String partitionName : partitionNames) { @@ -356,12 +357,11 @@ public synchronized void createTable( setShared(); // When creating a table, it should never have partition actions. This is just a sanity check. checkNoPartitionAction(table.getDatabaseName(), table.getTableName()); - SchemaTableName schemaTableName = new SchemaTableName(table.getDatabaseName(), table.getTableName()); - Action oldTableAction = tableActions.get(schemaTableName); + Action oldTableAction = tableActions.get(table.getSchemaTableName()); TableAndMore tableAndMore = new TableAndMore(table, Optional.of(principalPrivileges), currentPath, Optional.empty(), ignoreExisting, statistics, statistics); if (oldTableAction == null) { HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName()); - tableActions.put(schemaTableName, new Action<>(ActionType.ADD, tableAndMore, context)); + tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, context)); return; } switch (oldTableAction.getType()) { @@ -370,7 +370,7 @@ public synchronized void createTable( case ADD: case ALTER: case INSERT_EXISTING: - throw new TableAlreadyExistsException(schemaTableName); + throw new TableAlreadyExistsException(table.getSchemaTableName()); default: throw new IllegalStateException("Unknown action type"); } @@ -494,7 +494,7 @@ public synchronized void truncateUnpartitionedTable(ConnectorSession session, St Path path = new Path(table.get().getStorage().getLocation()); HdfsContext context = new HdfsContext(session, databaseName, tableName); setExclusive((delegate, hdfsEnvironment) -> { - RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(hdfsEnvironment, context, path, ImmutableList.of(""), false); + RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(hdfsEnvironment, context, path, ImmutableSet.of(""), false); if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) { throw new PrestoException(HIVE_FILESYSTEM_ERROR, format( "Error deleting from unpartitioned table %s. These items can not be deleted: %s", @@ -547,7 +547,7 @@ private Optional> doGetPartitionNames(String databaseName, String t default: throw new UnsupportedOperationException("Unknown table source"); } - Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); + Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(table.get().getSchemaTableName(), k -> new HashMap<>()); ImmutableList.Builder resultBuilder = ImmutableList.builder(); // alter/remove newly-altered/dropped partitions from the results from underlying metastore for (String partitionName : partitionNames) { @@ -864,7 +864,7 @@ public synchronized void revokeTablePrivileges(String databaseName, String table setExclusive((delegate, hdfsEnvironment) -> delegate.revokeTablePrivileges(databaseName, tableName, grantee, privileges)); } - public synchronized void declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Path stagingPathRoot, String filePrefix, SchemaTableName schemaTableName) + public synchronized void declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Path stagingPathRoot, SchemaTableName schemaTableName) { setShared(); if (writeMode == WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY) { @@ -874,7 +874,7 @@ public synchronized void declareIntentionToWrite(ConnectorSession session, Write } } HdfsContext context = new HdfsContext(session, schemaTableName.getSchemaName(), schemaTableName.getTableName()); - declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(writeMode, context, stagingPathRoot, filePrefix, schemaTableName)); + declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(writeMode, context, session.getQueryId(), stagingPathRoot, schemaTableName)); } public synchronized void commit() @@ -995,7 +995,7 @@ private void commitShared() // fileRenameFutures must all come back before any file system cleanups are carried out. // Otherwise, files that should be deleted may be created after cleanup is done. - committer.executeCleanupTasksForAbort(committer.extractFilePrefixes(declaredIntentionsToWrite)); + committer.executeCleanupTasksForAbort(declaredIntentionsToWrite); committer.executeRenameTasksForAbort(); @@ -1122,7 +1122,7 @@ private void prepareAddTable(HdfsContext context, TableAndMore tableAndMore) addTableOperations.add(new CreateTableOperation(table, tableAndMore.getPrincipalPrivileges(), tableAndMore.isIgnoreExisting())); if (!isPrestoView(table)) { updateStatisticsOperations.add(new UpdateStatisticsOperation( - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), Optional.empty(), tableAndMore.getStatisticsUpdate(), false)); @@ -1141,7 +1141,7 @@ private void prepareInsertExistingTable(HdfsContext context, TableAndMore tableA asyncRename(hdfsEnvironment, renameExecutor, fileRenameCancelled, fileRenameFutures, context, currentPath, targetPath, tableAndMore.getFileNames().get()); } updateStatisticsOperations.add(new UpdateStatisticsOperation( - new SchemaTableName(table.getDatabaseName(), table.getTableName()), + table.getSchemaTableName(), Optional.empty(), tableAndMore.getStatisticsUpdate(), true)); @@ -1248,9 +1248,8 @@ private void prepareAddPartition(HdfsContext context, PartitionAndMore partition Path currentPath = partitionAndMore.getCurrentLocation(); Path targetPath = new Path(targetLocation); - SchemaTableName schemaTableName = new SchemaTableName(partition.getDatabaseName(), partition.getTableName()); PartitionAdder partitionAdder = partitionAdders.computeIfAbsent( - schemaTableName, + partition.getSchemaTableName(), ignored -> new PartitionAdder(partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE)); if (pathExists(context, hdfsEnvironment, currentPath)) { @@ -1283,16 +1282,19 @@ private void prepareInsertExistingPartition(HdfsContext context, PartitionAndMor asyncRename(hdfsEnvironment, renameExecutor, fileRenameCancelled, fileRenameFutures, context, currentPath, targetPath, partitionAndMore.getFileNames()); } updateStatisticsOperations.add(new UpdateStatisticsOperation( - new SchemaTableName(partition.getDatabaseName(), partition.getTableName()), + partition.getSchemaTableName(), Optional.of(getPartitionName(partition.getDatabaseName(), partition.getTableName(), partition.getValues())), partitionAndMore.getStatisticsUpdate(), true)); } - private void executeCleanupTasksForAbort(List filePrefixes) + private void executeCleanupTasksForAbort(Collection declaredIntentionsToWrite) { + Set queryIds = declaredIntentionsToWrite.stream() + .map(DeclaredIntentionToWrite::getQueryId) + .collect(toImmutableSet()); for (DirectoryCleanUpTask cleanUpTask : cleanUpTasksForAbort) { - recursiveDeleteFilesAndLog(cleanUpTask.getContext(), cleanUpTask.getPath(), filePrefixes, cleanUpTask.isDeleteEmptyDirectory(), "temporary directory commit abort"); + recursiveDeleteFilesAndLog(cleanUpTask.getContext(), cleanUpTask.getPath(), queryIds, cleanUpTask.isDeleteEmptyDirectory(), "temporary directory commit abort"); } } @@ -1328,7 +1330,7 @@ private void deleteEmptyStagingDirectories(List declar continue; } Path path = declaredIntentionToWrite.getRootPath(); - recursiveDeleteFilesAndLog(declaredIntentionToWrite.getContext(), path, ImmutableList.of(), true, "staging directory cleanup"); + recursiveDeleteFilesAndLog(declaredIntentionToWrite.getContext(), path, ImmutableSet.of(), true, "staging directory cleanup"); } } @@ -1469,15 +1471,6 @@ private void executeIrreversibleMetastoreOperations() throw prestoException; } } - - private List extractFilePrefixes(List declaredIntentionsToWrite) - { - Set filePrefixSet = new HashSet<>(); - for (DeclaredIntentionToWrite declaredIntentionToWrite : declaredIntentionsToWrite) { - filePrefixSet.add(declaredIntentionToWrite.getFilePrefix()); - } - return ImmutableList.copyOf(filePrefixSet); - } } @GuardedBy("this") @@ -1500,13 +1493,12 @@ private void rollbackShared() // In the case of DIRECT_TO_TARGET_NEW_DIRECTORY, if the directory is not guaranteed to be unique // for the query, it is possible that another query or compute engine may see the directory, wrote // data to it, and exported it through metastore. Therefore it may be argued that cleanup of staging - // directories must be carried out conservatively. To be safe, we only delete files that start with - // the unique prefix for queries in this transaction. - + // directories must be carried out conservatively. To be safe, we only delete files that start or + // end with the query IDs in this transaction. recursiveDeleteFilesAndLog( declaredIntentionToWrite.getContext(), rootPath, - ImmutableList.of(declaredIntentionToWrite.getFilePrefix()), + ImmutableSet.of(declaredIntentionToWrite.getQueryId()), true, format("staging/target_new directory rollback for table %s", declaredIntentionToWrite.getSchemaTableName())); break; @@ -1546,14 +1538,14 @@ private void rollbackShared() schemaTableName.getTableName()); } - // delete any file that starts with the unique prefix of this query + // delete any file that starts or ends with the query ID for (Path path : pathsToClean) { // TODO: It is a known deficiency that some empty directory does not get cleaned up in S3. // We can not delete any of the directories here since we do not know who created them. recursiveDeleteFilesAndLog( declaredIntentionToWrite.getContext(), path, - ImmutableList.of(declaredIntentionToWrite.getFilePrefix()), + ImmutableSet.of(declaredIntentionToWrite.getQueryId()), false, format("target_existing directory rollback for table %s", schemaTableName)); } @@ -1694,13 +1686,13 @@ private static void asyncRename( } } - private void recursiveDeleteFilesAndLog(HdfsContext context, Path directory, List filePrefixes, boolean deleteEmptyDirectories, String reason) + private void recursiveDeleteFilesAndLog(HdfsContext context, Path directory, Set queryIds, boolean deleteEmptyDirectories, String reason) { RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles( hdfsEnvironment, context, directory, - filePrefixes, + queryIds, deleteEmptyDirectories); if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) { logCleanupFailure( @@ -1720,19 +1712,19 @@ else if (deleteEmptyDirectories && !recursiveDeleteResult.isDirectoryNoLongerExi /** * Attempt to recursively remove eligible files and/or directories in {@code directory}. *

- * When {@code filePrefixes} is not present, all files (but not necessarily directories) will be - * ineligible. If all files shall be deleted, you can use an empty string as {@code filePrefixes}. + * When {@code queryIds} is not present, all files (but not necessarily directories) will be + * ineligible. If all files shall be deleted, you can use an empty string as {@code queryIds}. *

* When {@code deleteEmptySubDirectory} is true, any empty directory (including directories that - * were originally empty, and directories that become empty after files prefixed with - * {@code filePrefixes} are deleted) will be eligible. + * were originally empty, and directories that become empty after files prefixed or suffixed with + * {@code queryIds} are deleted) will be eligible. *

* This method will not delete anything that's neither a directory nor a file. * - * @param filePrefixes prefix of files that should be deleted + * @param queryIds prefix or suffix of files that should be deleted * @param deleteEmptyDirectories whether empty directories should be deleted */ - private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEnvironment, HdfsContext context, Path directory, List filePrefixes, boolean deleteEmptyDirectories) + private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEnvironment, HdfsContext context, Path directory, Set queryIds, boolean deleteEmptyDirectories) { FileSystem fileSystem; try { @@ -1748,10 +1740,10 @@ private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEn return new RecursiveDeleteResult(false, notDeletedItems.build()); } - return doRecursiveDeleteFiles(fileSystem, directory, filePrefixes, deleteEmptyDirectories); + return doRecursiveDeleteFiles(fileSystem, directory, queryIds, deleteEmptyDirectories); } - private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSystem, Path directory, List filePrefixes, boolean deleteEmptyDirectories) + private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSystem, Path directory, Set queryIds, boolean deleteEmptyDirectories) { // don't delete hidden presto directories if (directory.getName().startsWith(".presto")) { @@ -1777,7 +1769,7 @@ private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSyste boolean eligible = false; // never delete presto dot files if (!fileName.startsWith(".presto")) { - eligible = filePrefixes.stream().anyMatch(fileName::startsWith); + eligible = queryIds.stream().anyMatch(id -> fileName.startsWith(id) || fileName.endsWith(id)); } if (eligible) { if (!deleteIfExists(fileSystem, filePath, false)) { @@ -1790,7 +1782,7 @@ private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSyste } } else if (fileStatus.isDirectory()) { - RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, fileStatus.getPath(), filePrefixes, deleteEmptyDirectories); + RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, fileStatus.getPath(), queryIds, deleteEmptyDirectories); if (!subResult.isDirectoryNoLongerExists()) { allDescendentsDeleted = false; } @@ -2125,16 +2117,16 @@ private static class DeclaredIntentionToWrite { private final WriteMode mode; private final HdfsContext context; - private final String filePrefix; + private final String queryId; private final Path rootPath; private final SchemaTableName schemaTableName; - public DeclaredIntentionToWrite(WriteMode mode, HdfsContext context, Path stagingPathRoot, String filePrefix, SchemaTableName schemaTableName) + public DeclaredIntentionToWrite(WriteMode mode, HdfsContext context, String queryId, Path stagingPathRoot, SchemaTableName schemaTableName) { this.mode = requireNonNull(mode, "mode is null"); this.context = requireNonNull(context, "context is null"); + this.queryId = requireNonNull(queryId, "queryId is null"); this.rootPath = requireNonNull(stagingPathRoot, "stagingPathRoot is null"); - this.filePrefix = requireNonNull(filePrefix, "filePrefix is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); } @@ -2148,9 +2140,9 @@ public HdfsContext getContext() return context; } - public String getFilePrefix() + public String getQueryId() { - return filePrefix; + return queryId; } public Path getRootPath() @@ -2169,7 +2161,7 @@ public String toString() return toStringHelper(this) .add("mode", mode) .add("context", context) - .add("filePrefix", filePrefix) + .add("queryId", queryId) .add("rootPath", rootPath) .add("schemaTableName", schemaTableName) .toString(); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Table.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Table.java index a17088fc8b33..b5ff9ccf598c 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Table.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/Table.java @@ -14,9 +14,11 @@ package io.prestosql.plugin.hive.metastore; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.prestosql.spi.connector.SchemaTableName; import javax.annotation.concurrent.Immutable; @@ -83,6 +85,12 @@ public String getTableName() return tableName; } + @JsonIgnore + public SchemaTableName getSchemaTableName() + { + return new SchemaTableName(databaseName, tableName); + } + @JsonProperty public String getOwner() { diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java index c5649ff2d4d8..787b713d8176 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHive.java @@ -3009,7 +3009,7 @@ protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storag // verify all new files start with the unique prefix HdfsContext context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName()); for (String filePath : listAllDataFiles(context, getStagingPathRoot(outputHandle))) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(outputHandle))); + assertThat(new Path(filePath).getName()).startsWith(session.getQueryId()); } // commit the table @@ -3186,7 +3186,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName Set tempFiles = listAllDataFiles(context, stagingPathRoot); assertTrue(!tempFiles.isEmpty()); for (String filePath : tempFiles) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(insertTableHandle))); + assertThat(new Path(filePath).getName()).startsWith(session.getQueryId()); } // rollback insert @@ -3220,16 +3220,6 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName } // These are protected so extensions to the hive connector can replace the handle classes - protected String getFilePrefix(ConnectorOutputTableHandle outputTableHandle) - { - return ((HiveWritableTableHandle) outputTableHandle).getFilePrefix(); - } - - protected String getFilePrefix(ConnectorInsertTableHandle insertTableHandle) - { - return ((HiveWritableTableHandle) insertTableHandle).getFilePrefix(); - } - protected Path getStagingPathRoot(ConnectorInsertTableHandle insertTableHandle) { HiveInsertTableHandle handle = (HiveInsertTableHandle) insertTableHandle; @@ -3382,7 +3372,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab Set tempFiles = listAllDataFiles(context, getStagingPathRoot(insertTableHandle)); assertTrue(!tempFiles.isEmpty()); for (String filePath : tempFiles) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(insertTableHandle))); + assertThat(new Path(filePath).getName()).startsWith(session.getQueryId()); } // rollback insert @@ -3497,7 +3487,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche Set tempFiles = listAllDataFiles(context, getStagingPathRoot(insertTableHandle)); assertTrue(!tempFiles.isEmpty()); for (String filePath : tempFiles) { - assertTrue(new Path(filePath).getName().startsWith(getFilePrefix(insertTableHandle))); + assertThat(new Path(filePath).getName()).startsWith(session.getQueryId()); } // verify statistics are visible from within of the current transaction diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java index 34242455824f..0f364d7aaff0 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -51,6 +51,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -63,6 +64,7 @@ import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.prestosql.plugin.hive.BackgroundHiveSplitLoader.BucketSplitInfo.createBucketSplitInfo; +import static io.prestosql.plugin.hive.BackgroundHiveSplitLoader.getBucketNumber; import static io.prestosql.plugin.hive.HiveColumnHandle.pathColumnHandle; import static io.prestosql.plugin.hive.HiveTestUtils.SESSION; import static io.prestosql.plugin.hive.HiveType.HIVE_INT; @@ -247,6 +249,23 @@ public void testCachedDirectoryLister() assertEquals(cachingDirectoryLister.getMissCount(), 1); } + @Test + public void testGetBucketNumber() + { + assertEquals(getBucketNumber("0234_0"), OptionalInt.of(234)); + assertEquals(getBucketNumber("000234_0"), OptionalInt.of(234)); + assertEquals(getBucketNumber("0234_99"), OptionalInt.of(234)); + assertEquals(getBucketNumber("0234_0.txt"), OptionalInt.of(234)); + assertEquals(getBucketNumber("0234_0_copy_1"), OptionalInt.of(234)); + assertEquals(getBucketNumber("20190526_072952_00009_fn7s5_bucket-00234"), OptionalInt.of(234)); + assertEquals(getBucketNumber("20190526_072952_00009_fn7s5_bucket-00234.txt"), OptionalInt.of(234)); + assertEquals(getBucketNumber("20190526_235847_87654_fn7s5_bucket-56789"), OptionalInt.of(56789)); + + assertEquals(getBucketNumber("234_99"), OptionalInt.empty()); + assertEquals(getBucketNumber("0234.txt"), OptionalInt.empty()); + assertEquals(getBucketNumber("0234.txt"), OptionalInt.empty()); + } + private static List drain(HiveSplitSource source) throws Exception { diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java index a2cc398e335f..4d4870df6a10 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java @@ -74,6 +74,7 @@ public void testDefaults() .setHiveCompressionCodec(HiveCompressionCodec.GZIP) .setRespectTableFormat(true) .setImmutablePartitions(false) + .setCreateEmptyBucketFiles(true) .setSortedWritingEnabled(true) .setMaxPartitionsPerWriter(100) .setMaxOpenSortFiles(50) @@ -161,6 +162,7 @@ public void testExplicitPropertyMappings() .put("hive.compression-codec", "NONE") .put("hive.respect-table-format", "false") .put("hive.immutable-partitions", "true") + .put("hive.create-empty-bucket-files", "false") .put("hive.max-partitions-per-writers", "222") .put("hive.max-open-sort-files", "333") .put("hive.write-validation-threads", "11") @@ -247,6 +249,7 @@ public void testExplicitPropertyMappings() .setHiveCompressionCodec(HiveCompressionCodec.NONE) .setRespectTableFormat(false) .setImmutablePartitions(true) + .setCreateEmptyBucketFiles(false) .setMaxPartitionsPerWriter(222) .setMaxOpenSortFiles(333) .setWriteValidationThreads(11) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java index cea628ec710b..5d6b63f25287 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java @@ -760,6 +760,12 @@ public void testCreatePartitionedBucketedTableAsFewRows() } private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveStorageFormat storageFormat) + { + testCreatePartitionedBucketedTableAsFewRows(session, storageFormat, true); + testCreatePartitionedBucketedTableAsFewRows(session, storageFormat, false); + } + + private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveStorageFormat storageFormat, boolean createEmpty) { String tableName = "test_create_partitioned_bucketed_table_as_few_rows"; @@ -782,7 +788,9 @@ private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveSt assertUpdate( // make sure that we will get one file per bucket regardless of writer count configured - getParallelWriteSession(), + Session.builder(getParallelWriteSession()) + .setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty)) + .build(), createTable, 3); diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java index 834bfb4c2d0d..936b2236ca71 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java @@ -242,7 +242,6 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio SCHEMA_NAME, TABLE_NAME, getColumnHandles(), - "test", new HivePageSinkMetadata(new SchemaTableName(SCHEMA_NAME, TABLE_NAME), metastore.getTable(SCHEMA_NAME, TABLE_NAME), ImmutableMap.of()), locationHandle, config.getHiveStorageFormat(), diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java new file mode 100644 index 000000000000..65b230c76e91 --- /dev/null +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.hive; + +import org.testng.annotations.Test; + +import static io.prestosql.plugin.hive.HiveWriterFactory.computeBucketedFileName; +import static org.apache.hadoop.hive.ql.exec.Utilities.getBucketIdFromFile; +import static org.testng.Assert.assertEquals; + +public class TestHiveWriterFactory +{ + @Test + public void testComputeBucketedFileName() + { + String name = computeBucketedFileName("20180102_030405_00641_x1y2z", 1234); + assertEquals(name, "001234_0_20180102_030405_00641_x1y2z"); + assertEquals(getBucketIdFromFile(name), 1234); + } +} diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveBucketedTables.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveBucketedTables.java index 79082e0c5fe1..48fc3d46f5b4 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveBucketedTables.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveBucketedTables.java @@ -69,10 +69,30 @@ public void testIgnorePartitionBucketingIfNotBucketed() onHive().executeQuery(format("ALTER TABLE %s NOT CLUSTERED", tableName)); + assertThat(query(format("SELECT count(DISTINCT n_nationkey), count(*) FROM %s", tableName))) + .hasRowsCount(1) + .contains(row(25, 50)); + assertThat(query(format("SELECT count(*) FROM %s WHERE n_nationkey = 1", tableName))) .containsExactly(row(2)); } + @Test(groups = {BIG_QUERY}) + public void testAllowMultipleFilesPerBucket() + { + String tableName = mutableTablesState().get(BUCKETED_PARTITIONED_NATION).getNameInDatabase(); + for (int i = 0; i < 3; i++) { + populateHivePartitionedTable(tableName, NATION.getName(), "part_key = 'insert'"); + } + + assertThat(query(format("SELECT count(DISTINCT n_nationkey), count(*) FROM %s", tableName))) + .hasRowsCount(1) + .contains(row(25, 75)); + + assertThat(query(format("SELECT count(*) FROM %s WHERE n_nationkey = 1", tableName))) + .containsExactly(row(3)); + } + private static void populateHivePartitionedTable(String destination, String source, String partition) { String queryStatement = format("INSERT INTO TABLE %s PARTITION (%s) SELECT * FROM %s", destination, partition, source);