Skip to content

Commit ab42fe5

Browse files
authored
Revert "PARQUET-1381: Add merge blocks command to parquet-tools (#512)" (#621)
This reverts commit 863a081. The design of this feature has conceptional problems and also works incorrectly. See PARQUET-1381 for more details.
1 parent f799893 commit ab42fe5

File tree

7 files changed

+24
-659
lines changed

7 files changed

+24
-659
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) {
8585
}
8686
}
8787

88-
public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
88+
private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
8989
PrimitiveConverter converter = getPrimitiveConverter(path);
9090
return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
9191
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,4 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
292292
}
293293
}
294294

295-
void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException {
296-
ColumnChunkPageWriter pageWriter = writers.get(path);
297-
pageWriter.writeToFileWriter(writer);
298-
}
299-
300295
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 21 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Map.Entry;
45-
import java.util.Optional;
4645
import java.util.Set;
4746
import java.util.concurrent.Callable;
4847
import java.util.concurrent.ExecutionException;
@@ -67,7 +66,6 @@
6766
import org.apache.parquet.column.page.DataPageV2;
6867
import org.apache.parquet.column.page.DictionaryPage;
6968
import org.apache.parquet.column.page.DictionaryPageReadStore;
70-
import org.apache.parquet.column.page.PageReader;
7169
import org.apache.parquet.column.page.PageReadStore;
7270
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
7371
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -1408,7 +1406,27 @@ public void addChunk(ChunkDescriptor descriptor) {
14081406
* @throws IOException if there is an error while reading from the stream
14091407
*/
14101408
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
1411-
List<ByteBuffer> buffers = readBlocks(f, offset, length);
1409+
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
1410+
f.seek(offset);
1411+
1412+
int fullAllocations = length / options.getMaxAllocationSize();
1413+
int lastAllocationSize = length % options.getMaxAllocationSize();
1414+
1415+
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
1416+
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
1417+
1418+
for (int i = 0; i < fullAllocations; i += 1) {
1419+
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
1420+
}
1421+
1422+
if (lastAllocationSize > 0) {
1423+
buffers.add(options.getAllocator().allocate(lastAllocationSize));
1424+
}
1425+
1426+
for (ByteBuffer buffer : buffers) {
1427+
f.readFully(buffer);
1428+
buffer.flip();
1429+
}
14121430

14131431
// report in a counter the data we just scanned
14141432
BenchmarkCounter.incrementBytesRead(length);
@@ -1428,72 +1446,4 @@ public long endPos() {
14281446

14291447
}
14301448

1431-
/**
1432-
* @param f file to read the blocks from
1433-
* @return the ByteBuffer blocks
1434-
* @throws IOException if there is an error while reading from the stream
1435-
*/
1436-
List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws IOException {
1437-
f.seek(offset);
1438-
1439-
int fullAllocations = length / options.getMaxAllocationSize();
1440-
int lastAllocationSize = length % options.getMaxAllocationSize();
1441-
1442-
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
1443-
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
1444-
1445-
for (int i = 0; i < fullAllocations; i++) {
1446-
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
1447-
}
1448-
1449-
if (lastAllocationSize > 0) {
1450-
buffers.add(options.getAllocator().allocate(lastAllocationSize));
1451-
}
1452-
1453-
for (ByteBuffer buffer : buffers) {
1454-
f.readFully(buffer);
1455-
buffer.flip();
1456-
}
1457-
return buffers;
1458-
}
1459-
1460-
Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) {
1461-
BlockMetaData block = blocks.get(blockIndex);
1462-
if (block.getRowCount() == 0) {
1463-
throw new RuntimeException("Illegal row group of 0 rows");
1464-
}
1465-
Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
1466-
1467-
return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize()))
1468-
.map(chunk -> readChunk(f, chunk));
1469-
}
1470-
1471-
private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor) {
1472-
try {
1473-
List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
1474-
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
1475-
Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f, null);
1476-
return chunk.readAllPages();
1477-
} catch (IOException e) {
1478-
throw new RuntimeException(e);
1479-
}
1480-
}
1481-
1482-
private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[] path) {
1483-
for (ColumnChunkMetaData column : block.getColumns()) {
1484-
if (Arrays.equals(column.getPath().toArray(), path)) {
1485-
return Optional.of(column);
1486-
}
1487-
}
1488-
return Optional.empty();
1489-
}
1490-
1491-
public int blocksCount() {
1492-
return blocks.size();
1493-
}
1494-
1495-
public BlockMetaData getBlockMetaData(int blockIndex) {
1496-
return blocks.get(blockIndex);
1497-
}
1498-
14991449
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java

Lines changed: 0 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,12 @@
2626
import java.io.IOException;
2727
import java.nio.charset.StandardCharsets;
2828
import java.util.ArrayList;
29-
import java.util.Arrays;
30-
import java.util.Collections;
3129
import java.util.HashMap;
3230
import java.util.HashSet;
3331
import java.util.LinkedHashSet;
3432
import java.util.List;
3533
import java.util.Map;
3634
import java.util.Map.Entry;
37-
import java.util.Optional;
3835
import java.util.Set;
3936

4037
import org.apache.hadoop.conf.Configuration;
@@ -45,23 +42,14 @@
4542
import org.apache.parquet.Preconditions;
4643
import org.apache.parquet.Strings;
4744
import org.apache.parquet.Version;
48-
import org.apache.parquet.bytes.ByteBufferAllocator;
4945
import org.apache.parquet.bytes.BytesInput;
5046
import org.apache.parquet.bytes.BytesUtils;
51-
import org.apache.parquet.bytes.HeapByteBufferAllocator;
5247
import org.apache.parquet.column.ColumnDescriptor;
53-
import org.apache.parquet.column.ColumnReader;
54-
import org.apache.parquet.column.ColumnWriteStore;
55-
import org.apache.parquet.column.ColumnWriter;
5648
import org.apache.parquet.column.Encoding;
5749
import org.apache.parquet.column.EncodingStats;
5850
import org.apache.parquet.column.ParquetProperties;
59-
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
60-
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
6151
import org.apache.parquet.column.page.DictionaryPage;
62-
import org.apache.parquet.column.page.PageReader;
6352
import org.apache.parquet.column.statistics.Statistics;
64-
import org.apache.parquet.example.DummyRecordConverter;
6553
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
6654
import org.apache.parquet.hadoop.metadata.ColumnPath;
6755
import org.apache.parquet.format.Util;
@@ -72,7 +60,6 @@
7260
import org.apache.parquet.hadoop.metadata.FileMetaData;
7361
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
7462
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
75-
import org.apache.parquet.hadoop.util.BlocksCombiner;
7663
import org.apache.parquet.hadoop.util.HadoopOutputFile;
7764
import org.apache.parquet.hadoop.util.HadoopStreams;
7865
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
@@ -669,116 +656,6 @@ public void appendFile(InputFile file) throws IOException {
669656
}
670657
}
671658

672-
public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor, String createdBy, long maxBlockSize) throws IOException {
673-
List<ParquetFileReader> readers = getReaders(inputFiles);
674-
try {
675-
ByteBufferAllocator allocator = new HeapByteBufferAllocator();
676-
ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(), schema, createdBy);
677-
this.start();
678-
List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers, maxBlockSize);
679-
for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
680-
for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) {
681-
ColumnDescriptor path = schema.getColumns().get(columnIndex);
682-
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema, allocator, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
683-
ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(schema, store, ParquetProperties.builder().build());
684-
for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
685-
ParquetFileReader parquetFileReader = smallBlock.getReader();
686-
try {
687-
Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path);
688-
ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
689-
if (columnChunkPageReader.isPresent()) {
690-
ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
691-
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
692-
consumeTriplet(columnWriteStoreV1, columnWriter, columnReader);
693-
}
694-
} else {
695-
MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
696-
String[] parentPath = getExisingParentPath(path, inputFileSchema);
697-
int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
698-
int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
699-
for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
700-
columnWriter.writeNull(rep, def);
701-
if (def == 0) {
702-
// V1 pages also respect record boundaries so we have to mark them
703-
columnWriteStoreV1.endRecord();
704-
}
705-
}
706-
}
707-
} catch (Exception e) {
708-
LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
709-
}
710-
}
711-
if (columnIndex == 0) {
712-
this.startBlock(smallBlocks.getRowCount());
713-
}
714-
columnWriteStoreV1.flush();
715-
store.flushToFileWriter(path, this);
716-
}
717-
this.endBlock();
718-
}
719-
this.end(Collections.emptyMap());
720-
}finally {
721-
BlocksCombiner.closeReaders(readers);
722-
}
723-
return 0;
724-
}
725-
726-
private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema) {
727-
List<String> parentPath = Arrays.asList(path.getPath());
728-
while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new String[parentPath.size()]))) {
729-
parentPath = parentPath.subList(0, parentPath.size() - 1);
730-
}
731-
return parentPath.toArray(new String[parentPath.size()]);
732-
}
733-
734-
private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws IOException {
735-
List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
736-
for (InputFile inputFile : inputFiles) {
737-
readers.add(ParquetFileReader.open(inputFile));
738-
}
739-
return readers;
740-
}
741-
742-
private void consumeTriplet(ColumnWriteStore columnWriteStore, ColumnWriter columnWriter, ColumnReader columnReader) {
743-
int definitionLevel = columnReader.getCurrentDefinitionLevel();
744-
int repetitionLevel = columnReader.getCurrentRepetitionLevel();
745-
ColumnDescriptor column = columnReader.getDescriptor();
746-
PrimitiveType type = column.getPrimitiveType();
747-
if (definitionLevel < column.getMaxDefinitionLevel()) {
748-
columnWriter.writeNull(repetitionLevel, definitionLevel);
749-
} else {
750-
switch (type.getPrimitiveTypeName()) {
751-
case INT32:
752-
columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
753-
break;
754-
case INT64:
755-
columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
756-
break;
757-
case BINARY:
758-
case FIXED_LEN_BYTE_ARRAY:
759-
case INT96:
760-
columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
761-
break;
762-
case BOOLEAN:
763-
columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
764-
break;
765-
case FLOAT:
766-
columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
767-
break;
768-
case DOUBLE:
769-
columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
770-
break;
771-
default:
772-
throw new IllegalArgumentException("Unknown primitive type " + type);
773-
}
774-
}
775-
columnReader.consume();
776-
if (repetitionLevel == 0) {
777-
// V1 pages also respect record boundaries so we have to mark them
778-
columnWriteStore.endRecord();
779-
}
780-
}
781-
782659
/**
783660
* @param file a file stream to read from
784661
* @param rowGroups row groups to copy

0 commit comments

Comments
 (0)