Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) {
return newMemColumnReader(path, pageReadStore.getPageReader(path));
}

private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
PrimitiveConverter converter = getPrimitiveConverter(path);
return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,9 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
}
}

void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException {
ColumnChunkPageWriter pageWriter = writers.get(path);
pageWriter.writeToFileWriter(writer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -56,6 +57,7 @@
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
Expand Down Expand Up @@ -1160,27 +1162,8 @@ public void addChunk(ChunkDescriptor descriptor) {
* @throws IOException if there is an error while reading from the stream
*/
public List<Chunk> readAll(SeekableInputStream f) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);

int fullAllocations = length / options.getMaxAllocationSize();
int lastAllocationSize = length % options.getMaxAllocationSize();

int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);

for (int i = 0; i < fullAllocations; i += 1) {
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
}

if (lastAllocationSize > 0) {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
}

for (ByteBuffer buffer : buffers) {
f.readFully(buffer);
buffer.flip();
}
List<Chunk> result = new ArrayList<>(chunks.size());
List<ByteBuffer> buffers = readBlocks(f, offset, length);

// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
Expand All @@ -1206,4 +1189,72 @@ public long endPos() {

}

/**
* @param f file to read the blocks from
* @return the ByteBuffer blocks
* @throws IOException if there is an error while reading from the stream
*/
List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws IOException {
f.seek(offset);

int fullAllocations = length / options.getMaxAllocationSize();
int lastAllocationSize = length % options.getMaxAllocationSize();

int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);

for (int i = 0; i < fullAllocations; i++) {
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
}

if (lastAllocationSize > 0) {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
}

for (ByteBuffer buffer : buffers) {
f.readFully(buffer);
buffer.flip();
}
return buffers;
}

Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) {
BlockMetaData block = blocks.get(blockIndex);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());

return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize()))
.map(chunk -> readChunk(f, chunk));
}

private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor) {
try {
List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f);
return chunk.readAllPages();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[] path) {
for (ColumnChunkMetaData column : block.getColumns()) {
if (Arrays.equals(column.getPath().toArray(), path)) {
return Optional.of(column);
}
}
return Optional.empty();
}

public int blocksCount() {
return blocks.size();
}

public BlockMetaData getBlockMetaData(int blockIndex) {
return blocks.get(blockIndex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -41,13 +44,22 @@
import org.apache.parquet.Preconditions;
import org.apache.parquet.Strings;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
Expand All @@ -57,6 +69,7 @@
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.BlocksCombiner;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.InputFile;
Expand Down Expand Up @@ -519,6 +532,108 @@ public void appendFile(InputFile file) throws IOException {
ParquetFileReader.open(file).appendTo(this);
}

public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor, String createdBy, long maxBlockSize) throws IOException {
List<ParquetFileReader> readers = getReaders(inputFiles);
try {
ByteBufferAllocator allocator = new HeapByteBufferAllocator();
ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(), schema, createdBy);
this.start();
List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers, maxBlockSize);
for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) {
ColumnDescriptor path = schema.getColumns().get(columnIndex);
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema, allocator);
ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(store, ParquetProperties.builder().build());
for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
ParquetFileReader parquetFileReader = smallBlock.getReader();
try {
Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path);
ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
if (columnChunkPageReader.isPresent()) {
ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
consumeTriplet(columnWriter, columnReader);
}
} else {
MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
String[] parentPath = getExisingParentPath(path, inputFileSchema);
int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
columnWriter.writeNull(rep, def);
}
}
} catch (Exception e) {
LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
}
}
if (columnIndex == 0) {
this.startBlock(smallBlocks.getRowCount());
}
columnWriteStoreV1.flush();
store.flushToFileWriter(path, this);
}
this.endBlock();
}
this.end(Collections.emptyMap());
}finally {
BlocksCombiner.closeReaders(readers);
}
return 0;
}

private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema) {
List<String> parentPath = Arrays.asList(path.getPath());
while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new String[parentPath.size()]))) {
parentPath = parentPath.subList(0, parentPath.size() - 1);
}
return parentPath.toArray(new String[parentPath.size()]);
}

private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws IOException {
List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
for (InputFile inputFile : inputFiles) {
readers.add(ParquetFileReader.open(inputFile));
}
return readers;
}

private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
int definitionLevel = columnReader.getCurrentDefinitionLevel();
int repetitionLevel = columnReader.getCurrentRepetitionLevel();
ColumnDescriptor column = columnReader.getDescriptor();
PrimitiveType type = column.getPrimitiveType();
if (definitionLevel < column.getMaxDefinitionLevel()) {
columnWriter.writeNull(repetitionLevel, definitionLevel);
} else {
switch (type.getPrimitiveTypeName()) {
case INT32:
columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
break;
case INT64:
columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
break;
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
break;
case BOOLEAN:
columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
break;
case FLOAT:
columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
break;
case DOUBLE:
columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
break;
default:
throw new IllegalArgumentException("Unknown primitive type " + type);
}
}
columnReader.consume();
}

/**
* @param file a file stream to read from
* @param rowGroups row groups to copy
Expand Down
Loading