Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public enum HiveErrorCode
HIVE_INVALID_ENCRYPTION_METADATA(42, EXTERNAL),
HIVE_UNSUPPORTED_ENCRYPTION_OPERATION(43, USER_ERROR),
MALFORMED_HIVE_FILE_STATISTICS(44, INTERNAL_ERROR),
HIVE_INVALID_FILE_NAMES(45, INTERNAL_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ private void doCreateEmptyPartition(ConnectorSession session, String schema, Str
ImmutableList.of(),
0,
0,
0)));
0,
writeInfo.getWritePath().getName().matches("\\d+"))));

hiveMetadata.finishInsert(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.github.luben.zstd.Zstd;
import com.google.common.base.Joiner;
Expand All @@ -31,11 +32,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.LongStream;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.HiveErrorCode.MALFORMED_HIVE_FILE_STATISTICS;
import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled;
import static com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
Expand Down Expand Up @@ -112,23 +115,57 @@ public static Map<String, String> updatePartitionMetadataWithFileNamesAndSizes(P
ImmutableMap.Builder<String, String> partitionMetadata = ImmutableMap.builder();
List<FileWriteInfo> fileWriteInfos = new ArrayList<>(partitionUpdate.getFileWriteInfos());

if (!partitionUpdate.containsNumberedFileNames()) {
// Filenames starting with ".tmp.presto" will be renamed in TableFinishOperator. So it doesn't make sense to store the filenames in manifest
return metadata;
}

// Sort the file infos based on fileName
fileWriteInfos.sort(Comparator.comparing(info -> Integer.valueOf(info.getWriteFileName())));

List<String> fileNames = fileWriteInfos.stream().map(FileWriteInfo::getWriteFileName).collect(toImmutableList());
List<Long> fileSizes = fileWriteInfos.stream().map(FileWriteInfo::getFileSize).filter(Optional::isPresent).map(Optional::get).collect(toImmutableList());

if (fileSizes.size() < fileNames.size()) {
if (fileSizes.isEmpty()) {
// These files may not have been written by OrcFileWriter. So file sizes not available.
return metadata;
}
throw new PrestoException(
MALFORMED_HIVE_FILE_STATISTICS,
format(
"During manifest creation for partition= %s, filename count= %s is not equal to filesizes count= %s",
partitionUpdate.getName(),
fileNames.size(),
fileSizes.size()));
}

// Compress the file names into a consolidated string
String fileNames = compressFileNames(fileWriteInfos.stream().map(FileWriteInfo::getWriteFileName).collect(toImmutableList()));
partitionMetadata.put(FILE_NAMES, compressFileNames(fileNames));

// Compress the file sizes
String fileSizes = compressFileSizes(fileWriteInfos.stream().map(FileWriteInfo::getFileSize).map(Optional::get).collect(toImmutableList()));
partitionMetadata.put(FILE_SIZES, compressFileSizes(fileSizes));

partitionMetadata.put(FILE_NAMES, fileNames);
partitionMetadata.put(FILE_SIZES, fileSizes);
partitionMetadata.put(MANIFEST_VERSION, VERSION_1);
partitionMetadata.putAll(metadata);

return partitionMetadata.build();
}

public static OptionalLong getManifestSizeInBytes(ConnectorSession session, PartitionUpdate partitionUpdate, Map<String, String> parameters)
{
if (isFileRenamingEnabled(session) && partitionUpdate.containsNumberedFileNames()) {
if (parameters.containsKey(MANIFEST_VERSION)) {
return OptionalLong.of(parameters.get(FILE_NAMES).length() + parameters.get(FILE_SIZES).length());
}
List<FileWriteInfo> fileWriteInfos = partitionUpdate.getFileWriteInfos();
return OptionalLong.of(compressFileNames(fileWriteInfos.stream().map(FileWriteInfo::getWriteFileName).collect(toImmutableList())).length()
+ compressFileSizes(fileWriteInfos.stream().map(FileWriteInfo::getFileSize).filter(Optional::isPresent).map(Optional::get).collect(toImmutableList())).length());
}

return OptionalLong.empty();
}

static String compressFileNames(List<String> fileNames)
{
if (fileNames.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.hive;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.Domain;
Expand Down Expand Up @@ -182,7 +181,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_ENCRYPTION_OPERATION;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveManifestUtils.createPartitionManifest;
import static com.facebook.presto.hive.HiveManifestUtils.getManifestSizeInBytes;
import static com.facebook.presto.hive.HiveManifestUtils.updatePartitionMetadataWithFileNamesAndSizes;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.HivePartitioningHandle.createHiveCompatiblePartitioningHandle;
Expand All @@ -199,8 +198,10 @@
import static com.facebook.presto.hive.HiveSessionProperties.getVirtualBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isBucketExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isPreferManifestsToListFiles;
import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
Expand Down Expand Up @@ -1625,7 +1626,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
}
for (PartitionUpdate update : partitionUpdates) {
Map<String, String> partitionParameters = partitionEncryptionParameters;
if (HiveSessionProperties.isFileRenamingEnabled(session)) {
if (isPreferManifestsToListFiles(session) && isFileRenamingEnabled(session)) {
// Store list of file names and sizes in partition metadata when prefer_manifests_to_list_files and file_renaming_enabled are set to true
partitionParameters = updatePartitionMetadataWithFileNamesAndSizes(update, partitionParameters);
}
Partition partition = partitionObjectBuilder.buildPartitionObject(session, table, update, prestoVersion, partitionParameters);
Expand Down Expand Up @@ -1684,11 +1686,12 @@ private ImmutableList<PartitionUpdate> computePartitionUpdatesForMissingBuckets(
locationHandle.getWritePath(),
locationHandle.getTargetPath(),
fileNamesForMissingBuckets.stream()
.map(fileName -> new FileWriteInfo(fileName, fileName, Optional.empty()))
.map(fileName -> new FileWriteInfo(fileName, fileName, Optional.of(0L)))
.collect(toImmutableList()),
0,
0,
0));
0,
isFileRenamingEnabled(session)));
}

ImmutableList.Builder<PartitionUpdate> partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder();
Expand All @@ -1708,11 +1711,12 @@ private ImmutableList<PartitionUpdate> computePartitionUpdatesForMissingBuckets(
partitionUpdate.getWritePath(),
partitionUpdate.getTargetPath(),
fileNamesForMissingBuckets.stream()
.map(fileName -> new FileWriteInfo(fileName, fileName, Optional.empty()))
.map(fileName -> new FileWriteInfo(fileName, fileName, Optional.of(0L)))
.collect(toImmutableList()),
0,
0,
0));
0,
isFileRenamingEnabled(session)));
}
return partitionUpdatesForMissingBucketsBuilder.build();
}
Expand All @@ -1732,7 +1736,7 @@ private List<String> computeFileNamesForMissingBuckets(
String fileExtension = getFileExtension(fromHiveStorageFormat(storageFormat), compressionCodec);
ImmutableList.Builder<String> missingFileNamesBuilder = ImmutableList.builder();
for (int i = 0; i < bucketCount; i++) {
String targetFileName = HiveSessionProperties.isFileRenamingEnabled(session) ? String.valueOf(i) : HiveWriterFactory.computeBucketedFileName(filePrefix, i) + fileExtension;
String targetFileName = isFileRenamingEnabled(session) ? String.valueOf(i) : HiveWriterFactory.computeBucketedFileName(filePrefix, i) + fileExtension;
if (!existingFileNames.contains(targetFileName)) {
missingFileNamesBuilder.add(targetFileName);
}
Expand Down Expand Up @@ -1933,15 +1937,13 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
.map(encryptionInfo -> encryptionInfo.getDwrfEncryptionMetadata().map(DwrfEncryptionMetadata::getExtraMetadata).orElseGet(ImmutableMap::of))
.orElseGet(ImmutableMap::of);

if (HiveSessionProperties.isPreferManifestsToListFiles(session)) {
// Store list of file names and sizes in partition metadata when prefer_manifests_to_list_files property is set
if (isPreferManifestsToListFiles(session) && isFileRenamingEnabled(session)) {
// Store list of file names and sizes in partition metadata when prefer_manifests_to_list_files and file_renaming_enabled are set to true
extraPartitionMetadata = updatePartitionMetadataWithFileNamesAndSizes(partitionUpdate, extraPartitionMetadata);
}

// TODO: Put the manifest blob in partition parameters
// Track the manifest blob size
Optional<Page> manifestBlob = createPartitionManifest(partitionUpdate);
manifestBlob.ifPresent(manifest -> hivePartitionStats.addManifestSizeInBytes(manifest.getRetainedSizeInBytes()));
getManifestSizeInBytes(session, partitionUpdate, extraPartitionMetadata).ifPresent(hivePartitionStats::addManifestSizeInBytes);

// insert into new partition or overwrite existing partition
Partition partition = partitionObjectBuilder.buildPartitionObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled;
import static com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -191,7 +192,7 @@ public HivePageSink(

this.session = requireNonNull(session, "session is null");
this.hiveMetadataUpdater = requireNonNull(hiveMetadataUpdater, "hiveMetadataUpdater is null");
this.fileRenamingEnabled = HiveSessionProperties.isFileRenamingEnabled(session);
this.fileRenamingEnabled = isFileRenamingEnabled(session);
}

@Override
Expand Down Expand Up @@ -418,7 +419,8 @@ private void updateFileInfo(List<Slice> partitionUpdatesWithRenamedFileNames, Se
ImmutableList.of(fileInfoWithRenamedFileName),
partitionUpdate.getRowCount(),
partitionUpdate.getInMemoryDataSizeInBytes(),
partitionUpdate.getOnDiskDataSizeInBytes());
partitionUpdate.getOnDiskDataSizeInBytes(),
true);
partitionUpdatesWithRenamedFileNames.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdateWithRenamedFileName)));

hiveMetadataUpdater.removeResultFuture(writerIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public PartitionUpdate getPartitionUpdate()
ImmutableList.of(new FileWriteInfo(fileWriteInfo.getWriteFileName(), fileWriteInfo.getTargetFileName(), fileStatistics.map(statisticsPage -> getFileSize(statisticsPage, 0)))),
rowCount,
inputSizeInBytes,
fileWriter.getWrittenBytes());
fileWriter.getWrittenBytes(),
fileWriteInfo.getWriteFileName().matches("\\d+"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveSessionProperties.getSortedWriteTempPathSubdirectoryCount;
import static com.facebook.presto.hive.HiveSessionProperties.isFailFastOnInsertIntoImmutablePartitionsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveType.toHiveTypes;
import static com.facebook.presto.hive.HiveWriteUtils.checkPartitionIsWritable;
Expand Down Expand Up @@ -333,7 +334,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
String targetFileName;
if (bucketNumber.isPresent()) {
// Use the bucket number for file name when fileRenaming is enabled
targetFileName = HiveSessionProperties.isFileRenamingEnabled(session) ? String.valueOf(bucketNumber.getAsInt()) : computeBucketedFileName(filePrefix, bucketNumber.getAsInt()) + extension;
targetFileName = isFileRenamingEnabled(session) ? String.valueOf(bucketNumber.getAsInt()) : computeBucketedFileName(filePrefix, bucketNumber.getAsInt()) + extension;
}
else {
targetFileName = filePrefix + "_" + randomUUID() + extension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class PartitionUpdate
private final long rowCount;
private final long inMemoryDataSizeInBytes;
private final long onDiskDataSizeInBytes;
private final boolean containsNumberedFileNames;

@JsonCreator
public PartitionUpdate(
Expand All @@ -51,7 +52,8 @@ public PartitionUpdate(
@JsonProperty("fileWriteInfos") List<FileWriteInfo> fileWriteInfos,
@JsonProperty("rowCount") long rowCount,
@JsonProperty("inMemoryDataSizeInBytes") long inMemoryDataSizeInBytes,
@JsonProperty("onDiskDataSizeInBytes") long onDiskDataSizeInBytes)
@JsonProperty("onDiskDataSizeInBytes") long onDiskDataSizeInBytes,
@JsonProperty("containsNumberedFileNames") boolean containsNumberedFileNames)
{
this(
name,
Expand All @@ -61,7 +63,8 @@ public PartitionUpdate(
fileWriteInfos,
rowCount,
inMemoryDataSizeInBytes,
onDiskDataSizeInBytes);
onDiskDataSizeInBytes,
containsNumberedFileNames);
}

public PartitionUpdate(
Expand All @@ -72,7 +75,8 @@ public PartitionUpdate(
List<FileWriteInfo> fileWriteInfos,
long rowCount,
long inMemoryDataSizeInBytes,
long onDiskDataSizeInBytes)
long onDiskDataSizeInBytes,
boolean containsNumberedFileNames)
{
this.name = requireNonNull(name, "name is null");
this.updateMode = requireNonNull(updateMode, "updateMode is null");
Expand All @@ -85,6 +89,7 @@ public PartitionUpdate(
this.inMemoryDataSizeInBytes = inMemoryDataSizeInBytes;
checkArgument(onDiskDataSizeInBytes >= 0, "onDiskDataSizeInBytes is negative: %d", onDiskDataSizeInBytes);
this.onDiskDataSizeInBytes = onDiskDataSizeInBytes;
this.containsNumberedFileNames = containsNumberedFileNames;
}

@JsonProperty
Expand Down Expand Up @@ -145,6 +150,12 @@ public long getOnDiskDataSizeInBytes()
return onDiskDataSizeInBytes;
}

@JsonProperty
public boolean containsNumberedFileNames()
{
return containsNumberedFileNames;
}

@Override
public String toString()
{
Expand All @@ -157,6 +168,7 @@ public String toString()
.add("rowCount", rowCount)
.add("inMemoryDataSizeInBytes", inMemoryDataSizeInBytes)
.add("onDiskDataSizeInBytes", onDiskDataSizeInBytes)
.add("containsNumberedFileNames", containsNumberedFileNames)
.toString();
}

Expand All @@ -175,6 +187,7 @@ public static List<PartitionUpdate> mergePartitionUpdates(Iterable<PartitionUpda
long totalRowCount = 0;
long totalInMemoryDataSizeInBytes = 0;
long totalOnDiskDataSizeInBytes = 0;
boolean containsNumberedFileNames = true;
for (PartitionUpdate partition : partitionGroup) {
// verify partitions have the same new flag, write path and target path
// this shouldn't happen but could if another user added a partition during the write
Expand All @@ -187,6 +200,7 @@ public static List<PartitionUpdate> mergePartitionUpdates(Iterable<PartitionUpda
totalRowCount += partition.getRowCount();
totalInMemoryDataSizeInBytes += partition.getInMemoryDataSizeInBytes();
totalOnDiskDataSizeInBytes += partition.getOnDiskDataSizeInBytes();
containsNumberedFileNames &= partition.containsNumberedFileNames();
}

partitionUpdates.add(new PartitionUpdate(firstPartition.getName(),
Expand All @@ -196,7 +210,8 @@ public static List<PartitionUpdate> mergePartitionUpdates(Iterable<PartitionUpda
allFileWriterInfos.build(),
totalRowCount,
totalInMemoryDataSizeInBytes,
totalOnDiskDataSizeInBytes));
totalOnDiskDataSizeInBytes,
containsNumberedFileNames));
}
return partitionUpdates.build();
}
Expand Down
Loading