Skip to content

Commit 9f3a815

Browse files
committed
PARQUET-1381 Add merge blocks command to parquet-tools
1 parent 45e3ce5 commit 9f3a815

File tree

7 files changed

+391
-24
lines changed

7 files changed

+391
-24
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) {
7575
return newMemColumnReader(path, pageReadStore.getPageReader(path));
7676
}
7777

78-
private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
78+
public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
7979
PrimitiveConverter converter = getPrimitiveConverter(path);
8080
return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
8181
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,4 +262,9 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
262262
}
263263
}
264264

265+
public void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException {
266+
ColumnChunkPageWriter pageWriter = writers.get(path);
267+
pageWriter.writeToFileWriter(writer);
268+
}
269+
265270
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Map.Entry;
43+
import java.util.Optional;
4344
import java.util.Set;
4445
import java.util.concurrent.Callable;
4546
import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@
5657
import org.apache.parquet.bytes.ByteBufferInputStream;
5758
import org.apache.parquet.column.Encoding;
5859
import org.apache.parquet.column.page.DictionaryPageReadStore;
60+
import org.apache.parquet.column.page.PageReader;
5961
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
6062
import org.apache.parquet.filter2.compat.FilterCompat;
6163
import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1160,27 +1162,8 @@ public void addChunk(ChunkDescriptor descriptor) {
11601162
* @throws IOException if there is an error while reading from the stream
11611163
*/
11621164
public List<Chunk> readAll(SeekableInputStream f) throws IOException {
1163-
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
1164-
f.seek(offset);
1165-
1166-
int fullAllocations = length / options.getMaxAllocationSize();
1167-
int lastAllocationSize = length % options.getMaxAllocationSize();
1168-
1169-
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
1170-
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
1171-
1172-
for (int i = 0; i < fullAllocations; i += 1) {
1173-
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
1174-
}
1175-
1176-
if (lastAllocationSize > 0) {
1177-
buffers.add(options.getAllocator().allocate(lastAllocationSize));
1178-
}
1179-
1180-
for (ByteBuffer buffer : buffers) {
1181-
f.readFully(buffer);
1182-
buffer.flip();
1183-
}
1165+
List<Chunk> result = new ArrayList<>(chunks.size());
1166+
List<ByteBuffer> buffers = readBlocks(f, offset, length);
11841167

11851168
// report in a counter the data we just scanned
11861169
BenchmarkCounter.incrementBytesRead(length);
@@ -1206,4 +1189,72 @@ public long endPos() {
12061189

12071190
}
12081191

1192+
/**
1193+
* @param f file to read the blocks from
1194+
* @return the ByteBuffer blocks
1195+
* @throws IOException if there is an error while reading from the stream
1196+
*/
1197+
public List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws IOException {
1198+
f.seek(offset);
1199+
1200+
int fullAllocations = length / options.getMaxAllocationSize();
1201+
int lastAllocationSize = length % options.getMaxAllocationSize();
1202+
1203+
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
1204+
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
1205+
1206+
for (int i = 0; i < fullAllocations; i++) {
1207+
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
1208+
}
1209+
1210+
if (lastAllocationSize > 0) {
1211+
buffers.add(options.getAllocator().allocate(lastAllocationSize));
1212+
}
1213+
1214+
for (ByteBuffer buffer : buffers) {
1215+
f.readFully(buffer);
1216+
buffer.flip();
1217+
}
1218+
return buffers;
1219+
}
1220+
1221+
public Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) throws IOException {
1222+
BlockMetaData block = blocks.get(blockIndex);
1223+
if (block.getRowCount() == 0) {
1224+
throw new RuntimeException("Illegal row group of 0 rows");
1225+
}
1226+
Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
1227+
1228+
return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize()))
1229+
.map(chunk -> readChunk(f, chunk));
1230+
}
1231+
1232+
private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor) {
1233+
try {
1234+
List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
1235+
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
1236+
Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f);
1237+
return chunk.readAllPages();
1238+
} catch (IOException e) {
1239+
throw new RuntimeException(e);
1240+
}
1241+
}
1242+
1243+
private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[] path) {
1244+
for (ColumnChunkMetaData column : block.getColumns()) {
1245+
if (Arrays.equals(column.getPath().toArray(), path)) {
1246+
return Optional.of(column);
1247+
}
1248+
}
1249+
return Optional.empty();
1250+
}
1251+
1252+
public int blocksCount() {
1253+
return blocks.size();
1254+
}
1255+
1256+
public BlockMetaData getBlockMetaData(int blockIndex) {
1257+
return blocks.get(blockIndex);
1258+
}
1259+
12091260
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@
2121
import static org.apache.parquet.format.Util.writeFileMetaData;
2222
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
2323
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
24+
import static org.apache.parquet.hadoop.util.ParquetTripletUtils.consumeTriplet;
2425

2526
import java.io.IOException;
2627
import java.nio.charset.Charset;
2728
import java.util.ArrayList;
29+
import java.util.Arrays;
2830
import java.util.HashMap;
2931
import java.util.HashSet;
3032
import java.util.LinkedHashSet;
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.Map.Entry;
36+
import java.util.Optional;
3437
import java.util.Set;
3538

3639
import org.apache.hadoop.conf.Configuration;
@@ -41,13 +44,22 @@
4144
import org.apache.parquet.Preconditions;
4245
import org.apache.parquet.Strings;
4346
import org.apache.parquet.Version;
47+
import org.apache.parquet.bytes.ByteBufferAllocator;
4448
import org.apache.parquet.bytes.BytesInput;
4549
import org.apache.parquet.bytes.BytesUtils;
50+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
4651
import org.apache.parquet.column.ColumnDescriptor;
52+
import org.apache.parquet.column.ColumnReader;
53+
import org.apache.parquet.column.ColumnWriter;
4754
import org.apache.parquet.column.Encoding;
4855
import org.apache.parquet.column.EncodingStats;
56+
import org.apache.parquet.column.ParquetProperties;
57+
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
58+
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
4959
import org.apache.parquet.column.page.DictionaryPage;
60+
import org.apache.parquet.column.page.PageReader;
5061
import org.apache.parquet.column.statistics.Statistics;
62+
import org.apache.parquet.example.DummyRecordConverter;
5163
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
5264
import org.apache.parquet.hadoop.metadata.ColumnPath;
5365
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -57,6 +69,7 @@
5769
import org.apache.parquet.hadoop.metadata.FileMetaData;
5870
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
5971
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
72+
import org.apache.parquet.hadoop.util.BlocksCombiner;
6073
import org.apache.parquet.hadoop.util.HadoopOutputFile;
6174
import org.apache.parquet.hadoop.util.HadoopStreams;
6275
import org.apache.parquet.io.InputFile;
@@ -519,6 +532,70 @@ public void appendFile(InputFile file) throws IOException {
519532
ParquetFileReader.open(file).appendTo(this);
520533
}
521534

535+
public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor, String createdBy, long maxBlockSize) throws IOException {
536+
List<ParquetFileReader> readers = getReaders(inputFiles);
537+
ByteBufferAllocator allocator = new HeapByteBufferAllocator();
538+
ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(), schema, createdBy);
539+
this.start();
540+
List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers, maxBlockSize);
541+
for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
542+
for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) {
543+
ColumnDescriptor path = schema.getColumns().get(columnIndex);
544+
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema, allocator);
545+
ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(store, ParquetProperties.builder().build());
546+
for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
547+
ParquetFileReader parquetFileReader = smallBlock.getReader();
548+
try {
549+
Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path);
550+
ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
551+
if (columnChunkPageReader.isPresent()) {
552+
ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
553+
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
554+
consumeTriplet(columnWriter, columnReader);
555+
}
556+
} else {
557+
MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
558+
String[] parentPath = getExisingParentPath(path, inputFileSchema);
559+
int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
560+
int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
561+
for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
562+
columnWriter.writeNull(rep, def);
563+
}
564+
}
565+
} catch (Exception e) {
566+
LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
567+
}
568+
}
569+
if (columnIndex == 0) {
570+
this.startBlock(smallBlocks.getRowCount());
571+
}
572+
columnWriteStoreV1.flush();
573+
store.flushToFileWriter(path, this);
574+
}
575+
this.endBlock();
576+
}
577+
this.end(new HashMap<>());
578+
579+
BlocksCombiner.closeReaders(readers);
580+
return 0;
581+
}
582+
583+
private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema) {
584+
List<String> parentPath = Arrays.asList(path.getPath());
585+
while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new String[parentPath.size()]))) {
586+
parentPath = parentPath.subList(0, parentPath.size() - 1);
587+
}
588+
return parentPath.toArray(new String[parentPath.size()]);
589+
}
590+
591+
private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws IOException {
592+
List<ParquetFileReader> readers = new ArrayList<>();
593+
for (InputFile inputFile : inputFiles) {
594+
readers.add(ParquetFileReader.open(inputFile));
595+
}
596+
return readers;
597+
}
598+
522599
/**
523600
* @param file a file stream to read from
524601
* @param rowGroups row groups to copy
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop.util;
20+
21+
import org.apache.parquet.hadoop.ParquetFileReader;
22+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
23+
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
import static java.util.Collections.unmodifiableList;
29+
30+
public class BlocksCombiner {
31+
32+
public static List<SmallBlocksUnion> combineLargeBlocks(List<ParquetFileReader> readers, long maxBlockSize) {
33+
List<SmallBlocksUnion> blocks = new ArrayList<>();
34+
long largeBlockSize = 0;
35+
long largeBlockRecords = 0;
36+
List<SmallBlock> smallBlocks = new ArrayList<>();
37+
for (ParquetFileReader reader : readers) {
38+
for (int blockIndex = 0; blockIndex < reader.blocksCount(); blockIndex++) {
39+
BlockMetaData block = reader.getBlockMetaData(blockIndex);
40+
if (!smallBlocks.isEmpty() && largeBlockSize + block.getTotalByteSize() > maxBlockSize) {
41+
blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
42+
smallBlocks = new ArrayList<>();
43+
largeBlockSize = 0;
44+
largeBlockRecords = 0;
45+
}
46+
largeBlockSize += block.getTotalByteSize();
47+
largeBlockRecords += block.getRowCount();
48+
smallBlocks.add(new SmallBlock(reader, blockIndex));
49+
}
50+
}
51+
if (!smallBlocks.isEmpty()) {
52+
blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
53+
}
54+
return unmodifiableList(blocks);
55+
}
56+
57+
public static void closeReaders(List<ParquetFileReader> readers) {
58+
readers.forEach(r -> {
59+
try {
60+
r.close();
61+
} catch (IOException e) {
62+
}
63+
});
64+
}
65+
66+
public static class SmallBlocksUnion {
67+
private final List<SmallBlock> blocks;
68+
private final long rowCount;
69+
70+
public SmallBlocksUnion(List<SmallBlock> blocks, long rowCount) {
71+
this.blocks = blocks;
72+
this.rowCount = rowCount;
73+
}
74+
75+
public List<SmallBlock> getBlocks() {
76+
return blocks;
77+
}
78+
79+
public long getRowCount() {
80+
return rowCount;
81+
}
82+
}
83+
84+
public static class SmallBlock {
85+
private final ParquetFileReader reader;
86+
private final int blockIndex;
87+
88+
public SmallBlock(ParquetFileReader reader, int blockIndex) {
89+
this.reader = reader;
90+
this.blockIndex = blockIndex;
91+
}
92+
93+
public ParquetFileReader getReader() {
94+
return reader;
95+
}
96+
97+
public int getBlockIndex() {
98+
return blockIndex;
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)