Skip to content

Commit d8a92cb

Browse files
committed
PARQUET-1381: Add merge blocks command to parquet-tools
1 parent d692ce3 commit d8a92cb

File tree

7 files changed

+644
-24
lines changed

7 files changed

+644
-24
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) {
7575
return newMemColumnReader(path, pageReadStore.getPageReader(path));
7676
}
7777

78-
private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
78+
public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
7979
PrimitiveConverter converter = getPrimitiveConverter(path);
8080
return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
8181
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,4 +262,9 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
262262
}
263263
}
264264

265+
public void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException {
266+
ColumnChunkPageWriter pageWriter = writers.get(path);
267+
pageWriter.writeToFileWriter(writer);
268+
}
269+
265270
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Map.Entry;
43+
import java.util.Optional;
4344
import java.util.Set;
4445
import java.util.concurrent.Callable;
4546
import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@
5657
import org.apache.parquet.bytes.ByteBufferInputStream;
5758
import org.apache.parquet.column.Encoding;
5859
import org.apache.parquet.column.page.DictionaryPageReadStore;
60+
import org.apache.parquet.column.page.PageReader;
5961
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
6062
import org.apache.parquet.filter2.compat.FilterCompat;
6163
import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1160,27 +1162,8 @@ public void addChunk(ChunkDescriptor descriptor) {
11601162
* @throws IOException if there is an error while reading from the stream
11611163
*/
11621164
public List<Chunk> readAll(SeekableInputStream f) throws IOException {
1163-
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
1164-
f.seek(offset);
1165-
1166-
int fullAllocations = length / options.getMaxAllocationSize();
1167-
int lastAllocationSize = length % options.getMaxAllocationSize();
1168-
1169-
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
1170-
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
1171-
1172-
for (int i = 0; i < fullAllocations; i += 1) {
1173-
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
1174-
}
1175-
1176-
if (lastAllocationSize > 0) {
1177-
buffers.add(options.getAllocator().allocate(lastAllocationSize));
1178-
}
1179-
1180-
for (ByteBuffer buffer : buffers) {
1181-
f.readFully(buffer);
1182-
buffer.flip();
1183-
}
1165+
List<Chunk> result = new ArrayList<>(chunks.size());
1166+
List<ByteBuffer> buffers = readBlocks(f, offset, length);
11841167

11851168
// report in a counter the data we just scanned
11861169
BenchmarkCounter.incrementBytesRead(length);
@@ -1206,4 +1189,72 @@ public long endPos() {
12061189

12071190
}
12081191

1192+
/**
1193+
* @param f file to read the blocks from
1194+
* @return the ByteBuffer blocks
1195+
* @throws IOException if there is an error while reading from the stream
1196+
*/
1197+
public List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws IOException {
1198+
f.seek(offset);
1199+
1200+
int fullAllocations = length / options.getMaxAllocationSize();
1201+
int lastAllocationSize = length % options.getMaxAllocationSize();
1202+
1203+
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
1204+
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
1205+
1206+
for (int i = 0; i < fullAllocations; i++) {
1207+
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
1208+
}
1209+
1210+
if (lastAllocationSize > 0) {
1211+
buffers.add(options.getAllocator().allocate(lastAllocationSize));
1212+
}
1213+
1214+
for (ByteBuffer buffer : buffers) {
1215+
f.readFully(buffer);
1216+
buffer.flip();
1217+
}
1218+
return buffers;
1219+
}
1220+
1221+
public Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) throws IOException {
1222+
BlockMetaData block = blocks.get(blockIndex);
1223+
if (block.getRowCount() == 0) {
1224+
throw new RuntimeException("Illegal row group of 0 rows");
1225+
}
1226+
Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
1227+
1228+
return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize()))
1229+
.map(chunk -> readChunk(f, chunk));
1230+
}
1231+
1232+
private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor) {
1233+
try {
1234+
List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
1235+
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
1236+
Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f);
1237+
return chunk.readAllPages();
1238+
} catch (IOException e) {
1239+
throw new RuntimeException(e);
1240+
}
1241+
}
1242+
1243+
private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[] path) {
1244+
for (ColumnChunkMetaData column : block.getColumns()) {
1245+
if (Arrays.equals(column.getPath().toArray(), path)) {
1246+
return Optional.of(column);
1247+
}
1248+
}
1249+
return Optional.empty();
1250+
}
1251+
1252+
public int blocksCount() {
1253+
return blocks.size();
1254+
}
1255+
1256+
public BlockMetaData getBlockMetaData(int blockIndex) {
1257+
return blocks.get(blockIndex);
1258+
}
1259+
12091260
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import java.io.IOException;
2626
import java.nio.charset.Charset;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
3031
import java.util.LinkedHashSet;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Map.Entry;
35+
import java.util.Optional;
3436
import java.util.Set;
3537

3638
import org.apache.hadoop.conf.Configuration;
@@ -41,13 +43,22 @@
4143
import org.apache.parquet.Preconditions;
4244
import org.apache.parquet.Strings;
4345
import org.apache.parquet.Version;
46+
import org.apache.parquet.bytes.ByteBufferAllocator;
4447
import org.apache.parquet.bytes.BytesInput;
4548
import org.apache.parquet.bytes.BytesUtils;
49+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
4650
import org.apache.parquet.column.ColumnDescriptor;
51+
import org.apache.parquet.column.ColumnReader;
52+
import org.apache.parquet.column.ColumnWriter;
4753
import org.apache.parquet.column.Encoding;
4854
import org.apache.parquet.column.EncodingStats;
55+
import org.apache.parquet.column.ParquetProperties;
56+
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
57+
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
4958
import org.apache.parquet.column.page.DictionaryPage;
59+
import org.apache.parquet.column.page.PageReader;
5060
import org.apache.parquet.column.statistics.Statistics;
61+
import org.apache.parquet.example.DummyRecordConverter;
5162
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
5263
import org.apache.parquet.hadoop.metadata.ColumnPath;
5364
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -57,6 +68,7 @@
5768
import org.apache.parquet.hadoop.metadata.FileMetaData;
5869
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
5970
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
71+
import org.apache.parquet.hadoop.util.BlocksCombiner;
6072
import org.apache.parquet.hadoop.util.HadoopOutputFile;
6173
import org.apache.parquet.hadoop.util.HadoopStreams;
6274
import org.apache.parquet.io.InputFile;
@@ -519,6 +531,106 @@ public void appendFile(InputFile file) throws IOException {
519531
ParquetFileReader.open(file).appendTo(this);
520532
}
521533

534+
public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor, String createdBy, long maxBlockSize) throws IOException {
535+
List<ParquetFileReader> readers = getReaders(inputFiles);
536+
ByteBufferAllocator allocator = new HeapByteBufferAllocator();
537+
ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(), schema, createdBy);
538+
this.start();
539+
List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers, maxBlockSize);
540+
for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
541+
for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) {
542+
ColumnDescriptor path = schema.getColumns().get(columnIndex);
543+
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema, allocator);
544+
ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(store, ParquetProperties.builder().build());
545+
for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
546+
ParquetFileReader parquetFileReader = smallBlock.getReader();
547+
try {
548+
Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path);
549+
ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
550+
if (columnChunkPageReader.isPresent()) {
551+
ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
552+
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
553+
consumeTriplet(columnWriter, columnReader);
554+
}
555+
} else {
556+
MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
557+
String[] parentPath = getExisingParentPath(path, inputFileSchema);
558+
int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
559+
int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
560+
for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
561+
columnWriter.writeNull(rep, def);
562+
}
563+
}
564+
} catch (Exception e) {
565+
LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
566+
}
567+
}
568+
if (columnIndex == 0) {
569+
this.startBlock(smallBlocks.getRowCount());
570+
}
571+
columnWriteStoreV1.flush();
572+
store.flushToFileWriter(path, this);
573+
}
574+
this.endBlock();
575+
}
576+
this.end(new HashMap<>());
577+
578+
BlocksCombiner.closeReaders(readers);
579+
return 0;
580+
}
581+
582+
private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema) {
583+
List<String> parentPath = Arrays.asList(path.getPath());
584+
while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new String[parentPath.size()]))) {
585+
parentPath = parentPath.subList(0, parentPath.size() - 1);
586+
}
587+
return parentPath.toArray(new String[parentPath.size()]);
588+
}
589+
590+
private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws IOException {
591+
List<ParquetFileReader> readers = new ArrayList<>();
592+
for (InputFile inputFile : inputFiles) {
593+
readers.add(ParquetFileReader.open(inputFile));
594+
}
595+
return readers;
596+
}
597+
598+
private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
599+
int definitionLevel = columnReader.getCurrentDefinitionLevel();
600+
int repetitionLevel = columnReader.getCurrentRepetitionLevel();
601+
ColumnDescriptor column = columnReader.getDescriptor();
602+
PrimitiveType type = column.getPrimitiveType();
603+
if (definitionLevel < column.getMaxDefinitionLevel()) {
604+
columnWriter.writeNull(repetitionLevel, definitionLevel);
605+
} else {
606+
switch (type.getPrimitiveTypeName()) {
607+
case INT32:
608+
columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
609+
break;
610+
case INT64:
611+
columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
612+
break;
613+
case BINARY:
614+
case FIXED_LEN_BYTE_ARRAY:
615+
case INT96:
616+
columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
617+
break;
618+
case BOOLEAN:
619+
columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
620+
break;
621+
case FLOAT:
622+
columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
623+
break;
624+
case DOUBLE:
625+
columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
626+
break;
627+
default:
628+
throw new IllegalArgumentException("Unknown primitive type " + type);
629+
}
630+
}
631+
columnReader.consume();
632+
}
633+
522634
/**
523635
* @param file a file stream to read from
524636
* @param rowGroups row groups to copy
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop.util;
20+
21+
import org.apache.parquet.hadoop.ParquetFileReader;
22+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
23+
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
import static java.util.Collections.unmodifiableList;
29+
30+
public class BlocksCombiner {
31+
32+
public static List<SmallBlocksUnion> combineLargeBlocks(List<ParquetFileReader> readers, long maxBlockSize) {
33+
List<SmallBlocksUnion> blocks = new ArrayList<>();
34+
long largeBlockSize = 0;
35+
long largeBlockRecords = 0;
36+
List<SmallBlock> smallBlocks = new ArrayList<>();
37+
for (ParquetFileReader reader : readers) {
38+
for (int blockIndex = 0; blockIndex < reader.blocksCount(); blockIndex++) {
39+
BlockMetaData block = reader.getBlockMetaData(blockIndex);
40+
if (!smallBlocks.isEmpty() && largeBlockSize + block.getTotalByteSize() > maxBlockSize) {
41+
blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
42+
smallBlocks = new ArrayList<>();
43+
largeBlockSize = 0;
44+
largeBlockRecords = 0;
45+
}
46+
largeBlockSize += block.getTotalByteSize();
47+
largeBlockRecords += block.getRowCount();
48+
smallBlocks.add(new SmallBlock(reader, blockIndex));
49+
}
50+
}
51+
if (!smallBlocks.isEmpty()) {
52+
blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
53+
}
54+
return unmodifiableList(blocks);
55+
}
56+
57+
public static void closeReaders(List<ParquetFileReader> readers) {
58+
readers.forEach(r -> {
59+
try {
60+
r.close();
61+
} catch (IOException e) {
62+
}
63+
});
64+
}
65+
66+
public static class SmallBlocksUnion {
67+
private final List<SmallBlock> blocks;
68+
private final long rowCount;
69+
70+
public SmallBlocksUnion(List<SmallBlock> blocks, long rowCount) {
71+
this.blocks = blocks;
72+
this.rowCount = rowCount;
73+
}
74+
75+
public List<SmallBlock> getBlocks() {
76+
return blocks;
77+
}
78+
79+
public long getRowCount() {
80+
return rowCount;
81+
}
82+
}
83+
84+
public static class SmallBlock {
85+
private final ParquetFileReader reader;
86+
private final int blockIndex;
87+
88+
public SmallBlock(ParquetFileReader reader, int blockIndex) {
89+
this.reader = reader;
90+
this.blockIndex = blockIndex;
91+
}
92+
93+
public ParquetFileReader getReader() {
94+
return reader;
95+
}
96+
97+
public int getBlockIndex() {
98+
return blockIndex;
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)