Skip to content

PARQUET-2366: Optimize random seek during rewriting #1174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.rewrite;

import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* A cacher for caching file indexes(ColumnIndex, OffsetIndex, BloomFilter)
*/
class IndexCacher {
private final ParquetFileReader fileReader;
private final Set<ColumnPath> columnPathSet;
private final boolean prefetchBlockAllIndexes;

// Only used when prefetchBlockAllIndexes is true
private Map<ColumnPath, ColumnIndex> columnIndexCache;
private Map<ColumnPath, OffsetIndex> offsetIndexCache;
private Map<ColumnPath, BloomFilter> bloomIndexCache;

IndexCacher(
ParquetFileReader fileReader,
Set<ColumnPath> columnPathSet,
boolean prefetchBlockAllIndexes) {
this.fileReader = fileReader;
this.columnPathSet = columnPathSet;
this.prefetchBlockAllIndexes = prefetchBlockAllIndexes;
}

void setCurrentBlockMetadata(BlockMetaData blockMetaData) throws IOException {
if (prefetchBlockAllIndexes) {
free();
this.columnIndexCache = readAllColumnIndexes(blockMetaData);
this.offsetIndexCache = readAllOffsetIndexes(blockMetaData);
this.bloomIndexCache = readAllBloomFilters(blockMetaData);
}
}

ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException {
if (prefetchBlockAllIndexes) {
return columnIndexCache.remove(chunk.getPath());
}

return fileReader.readColumnIndex(chunk);
}

OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException {
if (prefetchBlockAllIndexes) {
return offsetIndexCache.remove(chunk.getPath());
}

return fileReader.readOffsetIndex(chunk);
}

BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException {
if (prefetchBlockAllIndexes) {
return bloomIndexCache.remove(chunk.getPath());
}

return fileReader.readBloomFilter(chunk);
}

void free() {
if (columnIndexCache != null) {
columnIndexCache.clear();
columnIndexCache = null;
}

if (offsetIndexCache != null) {
offsetIndexCache.clear();
offsetIndexCache = null;
}

if (bloomIndexCache != null) {
bloomIndexCache.clear();
bloomIndexCache = null;
}
}

private Map<ColumnPath, ColumnIndex> readAllColumnIndexes(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, ColumnIndex> columnIndexMap = new HashMap<>(columnPathSet.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columnPathSet.contains(chunk.getPath())) {
columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk));
}
}

return columnIndexMap;
}

private Map<ColumnPath, OffsetIndex> readAllOffsetIndexes(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, OffsetIndex> offsetIndexMap = new HashMap<>(columnPathSet.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columnPathSet.contains(chunk.getPath())) {
offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk));
}
}

return offsetIndexMap;
}

private Map<ColumnPath, BloomFilter> readAllBloomFilters(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, BloomFilter> bloomFilterMap = new HashMap<>(columnPathSet.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columnPathSet.contains(chunk.getPath())) {
bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk));
}
}

return bloomFilterMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,29 @@ public class ParquetRewriter implements Closeable {
private final byte[] pageBuffer = new byte[pageBufferSize];
// Configurations for the new file
private CompressionCodecName newCodecName = null;
private List<String> pruneColumns = null;
private Map<ColumnPath, MaskMode> maskColumns = null;
private Set<ColumnPath> encryptColumns = null;
private boolean encryptMode = false;
private Map<String, String> extraMetaData = new HashMap<>();
private final Map<String, String> extraMetaData = new HashMap<>();
// Writer to rewrite the input files
private ParquetFileWriter writer;
private final ParquetFileWriter writer;
// Number of blocks written which is used to keep track of the actual row group ordinal
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
private Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
// Schema of input files (should be the same) and to write to the output file
private MessageType schema = null;
private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
// The reader for the current input file
private TransParquetFileReader reader = null;
// The metadata of current reader being processed
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
// Unique created_by information from all input files
private Set<String> allOriginalCreatedBys = new HashSet<>();
private final Set<String> allOriginalCreatedBys = new HashSet<>();
// Whether prefetch all block indexes
private final boolean prefetchBlockAllIndexes;

public ParquetRewriter(RewriteOptions options) throws IOException {
Configuration conf = options.getConf();
Expand All @@ -129,8 +131,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
initNextReader();

newCodecName = options.getNewCodecName();
pruneColumns = options.getPruneColumns();

List<String> pruneColumns = options.getPruneColumns();
// Prune columns if specified
if (pruneColumns != null && !pruneColumns.isEmpty()) {
List<String> paths = new ArrayList<>();
Expand All @@ -145,6 +146,9 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
schema = pruneColumnsInSchema(schema, prunePaths);
}

this.descriptorsMap =
schema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));

if (options.getMaskColumns() != null) {
this.maskColumns = new HashMap<>();
for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
Expand All @@ -157,6 +161,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
this.encryptMode = true;
}

this.prefetchBlockAllIndexes = options.prefetchBlockAllIndexes();

ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode,
DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
Expand All @@ -178,6 +184,8 @@ public ParquetRewriter(TransParquetFileReader reader,
this.writer = writer;
this.meta = meta;
this.schema = schema;
this.descriptorsMap =
schema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
this.newCodecName = codecName;
originalCreatedBy = originalCreatedBy == null ? meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
Expand All @@ -188,6 +196,7 @@ public ParquetRewriter(TransParquetFileReader reader,
this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
}
}
this.prefetchBlockAllIndexes = false;
}

// Open all input files to validate their schemas are compatible to merge
Expand Down Expand Up @@ -247,24 +256,24 @@ public void close() throws IOException {

public void processBlocks() throws IOException {
while (reader != null) {
processBlocksFromReader();
IndexCacher indexCacher = new IndexCacher(reader, descriptorsMap.keySet(), prefetchBlockAllIndexes);
processBlocksFromReader(indexCacher);
indexCacher.free();
initNextReader();
}
}

private void processBlocksFromReader() throws IOException {
private void processBlocksFromReader(IndexCacher indexCacher) throws IOException {
PageReadStore store = reader.readNextRowGroup();
ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));

int blockId = 0;
while (store != null) {
writer.startBlock(store.getRowCount());

BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
indexCacher.setCurrentBlockMetadata(blockMetaData);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();

for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
Expand Down Expand Up @@ -314,13 +323,20 @@ private void processBlocksFromReader() throws IOException {

// Translate compression and/or encryption
writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn);
processChunk(
chunk,
newCodecName,
columnChunkEncryptorRunTime,
encryptColumn,
indexCacher.getBloomFilter(chunk),
indexCacher.getColumnIndex(chunk),
indexCacher.getOffsetIndex(chunk));
writer.endColumn();
} else {
// Nothing changed, simply copy the binary data.
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
BloomFilter bloomFilter = indexCacher.getBloomFilter(chunk);
ColumnIndex columnIndex = indexCacher.getColumnIndex(chunk);
OffsetIndex offsetIndex = indexCacher.getOffsetIndex(chunk);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
}

Expand All @@ -338,7 +354,10 @@ private void processBlocksFromReader() throws IOException {
private void processChunk(ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
boolean encryptColumn) throws IOException {
boolean encryptColumn,
BloomFilter bloomFilter,
ColumnIndex columnIndex,
OffsetIndex offsetIndex) throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
CompressionCodecFactory.BytesInputCompressor compressor = null;
Expand All @@ -364,9 +383,6 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
}

ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
if (bloomFilter != null) {
writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
*/
public class RewriteOptions {

final Configuration conf;
final List<Path> inputFiles;
final Path outputFile;
final List<String> pruneColumns;
final CompressionCodecName newCodecName;
final Map<String, MaskMode> maskColumns;
final List<String> encryptColumns;
final FileEncryptionProperties fileEncryptionProperties;
private final Configuration conf;
private final List<Path> inputFiles;
private final Path outputFile;
private final List<String> pruneColumns;
private final CompressionCodecName newCodecName;
private final Map<String, MaskMode> maskColumns;
private final List<String> encryptColumns;
private final FileEncryptionProperties fileEncryptionProperties;
private final boolean prefetchBlockAllIndexes;

private RewriteOptions(Configuration conf,
List<Path> inputFiles,
Expand All @@ -49,7 +50,8 @@ private RewriteOptions(Configuration conf,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
List<String> encryptColumns,
FileEncryptionProperties fileEncryptionProperties) {
FileEncryptionProperties fileEncryptionProperties,
boolean prefetchBlockAllIndexes) {
this.conf = conf;
this.inputFiles = inputFiles;
this.outputFile = outputFile;
Expand All @@ -58,6 +60,7 @@ private RewriteOptions(Configuration conf,
this.maskColumns = maskColumns;
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
this.prefetchBlockAllIndexes = prefetchBlockAllIndexes;
}

public Configuration getConf() {
Expand Down Expand Up @@ -92,16 +95,21 @@ public FileEncryptionProperties getFileEncryptionProperties() {
return fileEncryptionProperties;
}

public boolean prefetchBlockAllIndexes() {
return prefetchBlockAllIndexes;
}

// Builder to create a RewriterOptions.
public static class Builder {
private Configuration conf;
private List<Path> inputFiles;
private Path outputFile;
private final Configuration conf;
private final List<Path> inputFiles;
private final Path outputFile;
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
private Map<String, MaskMode> maskColumns;
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
private boolean prefetchBlockAllIndexes;

/**
* Create a builder to create a RewriterOptions.
Expand Down Expand Up @@ -213,6 +221,19 @@ public Builder addInputFile(Path path) {
return this;
}

/**
* Whether enable prefetch block indexes into cache.
* <p>
* This could reduce the random seek while rewriting, disabled by default.
*
* @param prefetchBlockAllIndexes enable or not
* @return self
*/
public Builder prefetchBlockAllIndex(boolean prefetchBlockAllIndexes) {
this.prefetchBlockAllIndexes = prefetchBlockAllIndexes;
return this;
}

/**
* Build the RewriterOptions.
*
Expand Down Expand Up @@ -255,7 +276,8 @@ public RewriteOptions build() {
newCodecName,
maskColumns,
encryptColumns,
fileEncryptionProperties);
fileEncryptionProperties,
prefetchBlockAllIndexes);
}
}

Expand Down
Loading