Skip to content

Commit e7db9e2

Browse files
gszadovszkyzivanfi
authored andcommitted
PARQUET-1201: Column indexes (#527)
This is a squashed feature branch merge including the changes listed below. The detailed history can be found in the 'column-indexes' branch. * PARQUET-1211: Column indexes: read/write API (#456) * PARQUET-1212: Column indexes: Show indexes in tools (#479) * PARQUET-1213: Column indexes: Limit index size (#480) * PARQUET-1214: Column indexes: Truncate min/max values (#481) * PARQUET-1364: Invalid row indexes for pages starting with nulls (#507) * PARQUET-1310: Column indexes: Filtering (#509) * PARQUET-1386: Fix issues of NaN and +-0.0 in case of float/double column indexes (#515) * PARQUET-1389: Improve value skipping at page synchronization (#514) * PARQUET-1381: Fix missing endRecord after merging columnIndex
1 parent cded3e5 commit e7db9e2

File tree

83 files changed

+10429
-1560
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+10429
-1560
lines changed

parquet-cli/src/main/java/org/apache/parquet/cli/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.parquet.cli.commands.ConvertCommand;
3333
import org.apache.parquet.cli.commands.ParquetMetadataCommand;
3434
import org.apache.parquet.cli.commands.SchemaCommand;
35+
import org.apache.parquet.cli.commands.ShowColumnIndexCommand;
3536
import org.apache.parquet.cli.commands.ShowDictionaryCommand;
3637
import org.apache.parquet.cli.commands.ShowPagesCommand;
3738
import org.apache.parquet.cli.commands.ToAvroCommand;
@@ -87,6 +88,7 @@ public class Main extends Configured implements Tool {
8788
jc.addCommand("to-avro", new ToAvroCommand(console));
8889
jc.addCommand("cat", new CatCommand(console, 0));
8990
jc.addCommand("head", new CatCommand(console, 10));
91+
jc.addCommand("column-index", new ShowColumnIndexCommand(console));
9092
}
9193

9294
@Override
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.cli.commands;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
import org.apache.parquet.cli.BaseCommand;
30+
import org.apache.parquet.hadoop.ParquetFileReader;
31+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
32+
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
33+
import org.apache.parquet.hadoop.util.HadoopInputFile;
34+
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
35+
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
36+
import org.apache.parquet.io.InputFile;
37+
import org.slf4j.Logger;
38+
39+
import com.beust.jcommander.Parameter;
40+
import com.beust.jcommander.Parameters;
41+
import com.google.common.base.Preconditions;
42+
import com.google.common.collect.Lists;
43+
44+
/**
45+
* parquet-cli command to print column and offset indexes.
46+
*/
47+
@Parameters(commandDescription = "Prints the column and offset indexes of a Parquet file")
48+
public class ShowColumnIndexCommand extends BaseCommand {
49+
public ShowColumnIndexCommand(Logger console) {
50+
super(console);
51+
}
52+
53+
@Parameter(description = "<parquet path>")
54+
List<String> files;
55+
56+
@Parameter(names = { "-c", "--column" }, description = "Shows the column/offset indexes for the given column only")
57+
List<String> ColumnPaths;
58+
59+
@Parameter(names = { "-r",
60+
"--row-group" }, description = "Shows the column/offset indexes for the given row-groups only; "
61+
+ "row-groups are referenced by their indexes from 0")
62+
List<String> rowGroupIndexes;
63+
64+
@Parameter(names = { "-i", "--column-index" }, description = "Shows the column indexes; "
65+
+ "active by default unless -o is used")
66+
boolean showColumnIndex;
67+
68+
@Parameter(names = { "-o", "--offset-index" }, description = "Shows the offset indexes; "
69+
+ "active by default unless -i is used")
70+
boolean showOffsetIndex;
71+
72+
@Override
73+
public List<String> getExamples() {
74+
return Lists.newArrayList(
75+
"# Show only column indexes for column 'col' from a Parquet file",
76+
"-c col -i sample.parquet");
77+
}
78+
79+
@Override
80+
public int run() throws IOException {
81+
Preconditions.checkArgument(files != null && files.size() >= 1,
82+
"A Parquet file is required.");
83+
Preconditions.checkArgument(files.size() == 1,
84+
"Cannot process multiple Parquet files.");
85+
86+
InputFile in = HadoopInputFile.fromPath(qualifiedPath(files.get(0)), getConf());
87+
if (!showColumnIndex && !showOffsetIndex) {
88+
showColumnIndex = true;
89+
showOffsetIndex = true;
90+
}
91+
92+
Set<String> rowGroupIndexSet = new HashSet<>();
93+
if (rowGroupIndexes != null) {
94+
rowGroupIndexSet.addAll(rowGroupIndexes);
95+
}
96+
97+
try (ParquetFileReader reader = ParquetFileReader.open(in)) {
98+
boolean firstBlock = true;
99+
int rowGroupIndex = 0;
100+
for (BlockMetaData block : reader.getFooter().getBlocks()) {
101+
if (!rowGroupIndexSet.isEmpty() && !rowGroupIndexSet.contains(Integer.toString(rowGroupIndex))) {
102+
++rowGroupIndex;
103+
continue;
104+
}
105+
if (!firstBlock) {
106+
console.info("");
107+
}
108+
firstBlock = false;
109+
console.info("row-group {}:", rowGroupIndex);
110+
for (ColumnChunkMetaData column : getColumns(block)) {
111+
String path = column.getPath().toDotString();
112+
if (showColumnIndex) {
113+
console.info("column index for column {}:", path);
114+
ColumnIndex columnIndex = reader.readColumnIndex(column);
115+
if (columnIndex == null) {
116+
console.info("NONE");
117+
} else {
118+
console.info(columnIndex.toString());
119+
}
120+
}
121+
if (showOffsetIndex) {
122+
console.info("offset index for column {}:", path);
123+
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
124+
if (offsetIndex == null) {
125+
console.info("NONE");
126+
} else {
127+
console.info(offsetIndex.toString());
128+
}
129+
}
130+
}
131+
++rowGroupIndex;
132+
}
133+
}
134+
return 0;
135+
}
136+
137+
private List<ColumnChunkMetaData> getColumns(BlockMetaData block) {
138+
List<ColumnChunkMetaData> columns = block.getColumns();
139+
if (ColumnPaths == null || ColumnPaths.isEmpty()) {
140+
return columns;
141+
}
142+
Map<String, ColumnChunkMetaData> pathMap = new HashMap<>();
143+
for (ColumnChunkMetaData column : columns) {
144+
pathMap.put(column.getPath().toDotString(), column);
145+
}
146+
147+
List<ColumnChunkMetaData> filtered = new ArrayList<>();
148+
for (String path : ColumnPaths) {
149+
ColumnChunkMetaData column = pathMap.get(path);
150+
if (column != null) {
151+
filtered.add(column);
152+
}
153+
}
154+
return filtered;
155+
}
156+
157+
}

parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ public interface ColumnReader {
4141

4242
/**
4343
* @return the totalCount of values to be consumed
44+
* @deprecated will be removed in 2.0.0; Total values might not be able to be counted before reading the values (e.g.
45+
* in case of column index based filtering)
4446
*/
47+
@Deprecated
4548
long getTotalValueCount();
4649

4750
/**

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class ParquetProperties {
4747
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
4848
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
4949
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
50+
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
5051

5152
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
5253

@@ -83,10 +84,11 @@ public static WriterVersion fromString(String name) {
8384
private final boolean estimateNextSizeCheck;
8485
private final ByteBufferAllocator allocator;
8586
private final ValuesWriterFactory valuesWriterFactory;
87+
private final int columnIndexTruncateLength;
8688

8789
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
8890
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
89-
ValuesWriterFactory writerFactory) {
91+
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
9092
this.pageSizeThreshold = pageSize;
9193
this.initialSlabSize = CapacityByteArrayOutputStream
9294
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -99,6 +101,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
99101
this.allocator = allocator;
100102

101103
this.valuesWriterFactory = writerFactory;
104+
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
102105
}
103106

104107
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -163,7 +166,7 @@ public ColumnWriteStore newColumnWriteStore(MessageType schema,
163166
PageWriteStore pageStore) {
164167
switch (writerVersion) {
165168
case PARQUET_1_0:
166-
return new ColumnWriteStoreV1(pageStore, this);
169+
return new ColumnWriteStoreV1(schema, pageStore, this);
167170
case PARQUET_2_0:
168171
return new ColumnWriteStoreV2(schema, pageStore, this);
169172
default:
@@ -183,6 +186,10 @@ public ValuesWriterFactory getValuesWriterFactory() {
183186
return valuesWriterFactory;
184187
}
185188

189+
public int getColumnIndexTruncateLength() {
190+
return columnIndexTruncateLength;
191+
}
192+
186193
public boolean estimateNextSizeCheck() {
187194
return estimateNextSizeCheck;
188195
}
@@ -205,6 +212,7 @@ public static class Builder {
205212
private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
206213
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
207214
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
215+
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
208216

209217
private Builder() {
210218
}
@@ -299,11 +307,17 @@ public Builder withValuesWriterFactory(ValuesWriterFactory factory) {
299307
return this;
300308
}
301309

310+
public Builder withColumnIndexTruncateLength(int length) {
311+
Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
312+
this.columnIndexTruncateLength = length;
313+
return this;
314+
}
315+
302316
public ParquetProperties build() {
303317
ParquetProperties properties =
304318
new ParquetProperties(writerVersion, pageSize, dictPageSize,
305319
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
306-
estimateNextSizeCheck, allocator, valuesWriterFactory);
320+
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
307321
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
308322
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
309323
// we'd like to decouple that and won't need to pass an object to properties and then pass the

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.parquet.column.impl;
2020

21+
import java.util.Optional;
22+
import java.util.PrimitiveIterator;
23+
2124
import org.apache.parquet.VersionParser;
2225
import org.apache.parquet.VersionParser.ParsedVersion;
2326
import org.apache.parquet.VersionParser.VersionParseException;
@@ -72,7 +75,14 @@ public ColumnReadStoreImpl(PageReadStore pageReadStore,
7275

7376
@Override
7477
public ColumnReader getColumnReader(ColumnDescriptor path) {
75-
return newMemColumnReader(path, pageReadStore.getPageReader(path));
78+
PrimitiveConverter converter = getPrimitiveConverter(path);
79+
PageReader pageReader = pageReadStore.getPageReader(path);
80+
Optional<PrimitiveIterator.OfLong> rowIndexes = pageReadStore.getRowIndexes();
81+
if (rowIndexes.isPresent()) {
82+
return new SynchronizingColumnReader(path, pageReader, converter, writerVersion, rowIndexes.get());
83+
} else {
84+
return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
85+
}
7686
}
7787

7888
public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {

0 commit comments

Comments
 (0)