Skip to content

Commit 6eb42f2

Browse files
James Taylormartint
authored andcommitted
Skip reading Parquet pages using Column Indexes feature of Parquet
* Add tests verifying equivalent results with and without feature enabled * Fix column synchronization logic * Take into account empty min/max values * Fix issues around type casts of numeric values * Remove ColumnIndexFilterUtils and replace with FilteredOffsetIndex * Add support for Decimal type * Remove reference to INT96 since it's not supported for column indexes
1 parent 39100a9 commit 6eb42f2

32 files changed

+3772
-171
lines changed

core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,14 @@ static SortedRangeSet of(Range first, Range... rest)
233233
return copyOf(first.getType(), rangeList);
234234
}
235235

236+
static SortedRangeSet of(List<Range> rangeList)
237+
{
238+
if (rangeList.isEmpty()) {
239+
throw new IllegalArgumentException("cannot use empty rangeList");
240+
}
241+
return copyOf(rangeList.get(0).getType(), rangeList);
242+
}
243+
236244
private static SortedRangeSet of(Type type, Object value)
237245
{
238246
checkNotNaN(type, value);

core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ static ValueSet ofRanges(Range first, Range... rest)
7979
return SortedRangeSet.of(first, rest);
8080
}
8181

82+
static ValueSet ofRanges(List<Range> ranges)
83+
{
84+
return SortedRangeSet.of(ranges);
85+
}
86+
8287
static ValueSet copyOfRanges(Type type, Collection<Range> ranges)
8388
{
8489
return SortedRangeSet.copyOf(type, ranges);

lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,27 @@
1313
*/
1414
package io.trino.parquet;
1515

16+
import java.util.OptionalLong;
17+
1618
public abstract class DataPage
1719
extends Page
1820
{
1921
protected final int valueCount;
22+
private final OptionalLong firstRowIndex;
2023

21-
public DataPage(int uncompressedSize, int valueCount)
24+
public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex)
2225
{
2326
super(uncompressedSize);
2427
this.valueCount = valueCount;
28+
this.firstRowIndex = firstRowIndex;
29+
}
30+
31+
/**
32+
* @return the index of the first row index in this page or -1 if unset.
33+
*/
34+
public OptionalLong getFirstRowIndex()
35+
{
36+
return firstRowIndex;
2537
}
2638

2739
public int getValueCount()

lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV1.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import io.airlift.slice.Slice;
1717

18+
import java.util.OptionalLong;
19+
1820
import static com.google.common.base.MoreObjects.toStringHelper;
1921
import static java.util.Objects.requireNonNull;
2022

@@ -30,11 +32,12 @@ public DataPageV1(
3032
Slice slice,
3133
int valueCount,
3234
int uncompressedSize,
35+
OptionalLong firstRowIndex,
3336
ParquetEncoding repetitionLevelEncoding,
3437
ParquetEncoding definitionLevelEncoding,
3538
ParquetEncoding valuesEncoding)
3639
{
37-
super(uncompressedSize, valueCount);
40+
super(uncompressedSize, valueCount, firstRowIndex);
3841
this.slice = requireNonNull(slice, "slice is null");
3942
this.repetitionLevelEncoding = repetitionLevelEncoding;
4043
this.definitionLevelEncoding = definitionLevelEncoding;

lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV2.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import io.airlift.slice.Slice;
1717
import org.apache.parquet.column.statistics.Statistics;
1818

19+
import java.util.OptionalLong;
20+
1921
import static com.google.common.base.MoreObjects.toStringHelper;
2022
import static java.util.Objects.requireNonNull;
2123

@@ -40,10 +42,11 @@ public DataPageV2(
4042
ParquetEncoding dataEncoding,
4143
Slice slice,
4244
int uncompressedSize,
45+
OptionalLong firstRowIndex,
4346
Statistics<?> statistics,
4447
boolean isCompressed)
4548
{
46-
super(uncompressedSize, valueCount);
49+
super(uncompressedSize, valueCount, firstRowIndex);
4750
this.rowCount = rowCount;
4851
this.nullCount = nullCount;
4952
this.repetitionLevels = requireNonNull(repetitionLevels, "repetitionLevels slice is null");

lib/trino-parquet/src/main/java/io/trino/parquet/ParquetDataSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
*/
1414
package io.trino.parquet;
1515

16+
import com.google.common.collect.ListMultimap;
1617
import io.airlift.slice.Slice;
1718

1819
import java.io.Closeable;
1920
import java.io.IOException;
20-
import java.util.Map;
2121

2222
public interface ParquetDataSource
2323
extends Closeable
@@ -34,7 +34,7 @@ public interface ParquetDataSource
3434

3535
Slice readFully(long position, int length);
3636

37-
<K> Map<K, ChunkReader> planRead(Map<K, DiskRange> diskRanges);
37+
<K> ListMultimap<K, ChunkReader> planRead(ListMultimap<K, DiskRange> diskRanges);
3838

3939
@Override
4040
default void close()

lib/trino-parquet/src/main/java/io/trino/parquet/ParquetReaderOptions.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,29 @@ public class ParquetReaderOptions
2828
private final DataSize maxReadBlockSize;
2929
private final DataSize maxMergeDistance;
3030
private final DataSize maxBufferSize;
31+
private final boolean useColumnIndex;
3132

3233
public ParquetReaderOptions()
3334
{
3435
ignoreStatistics = false;
3536
maxReadBlockSize = DEFAULT_MAX_READ_BLOCK_SIZE;
3637
maxMergeDistance = DEFAULT_MAX_MERGE_DISTANCE;
3738
maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
39+
useColumnIndex = true;
3840
}
3941

4042
private ParquetReaderOptions(
4143
boolean ignoreStatistics,
4244
DataSize maxReadBlockSize,
4345
DataSize maxMergeDistance,
44-
DataSize maxBufferSize)
46+
DataSize maxBufferSize,
47+
boolean useColumnIndex)
4548
{
4649
this.ignoreStatistics = ignoreStatistics;
4750
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
4851
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
4952
this.maxBufferSize = requireNonNull(maxBufferSize, "maxBufferSize is null");
53+
this.useColumnIndex = useColumnIndex;
5054
}
5155

5256
public boolean isIgnoreStatistics()
@@ -64,6 +68,11 @@ public DataSize getMaxMergeDistance()
6468
return maxMergeDistance;
6569
}
6670

71+
public boolean isUseColumnIndex()
72+
{
73+
return useColumnIndex;
74+
}
75+
6776
public DataSize getMaxBufferSize()
6877
{
6978
return maxBufferSize;
@@ -75,7 +84,8 @@ public ParquetReaderOptions withIgnoreStatistics(boolean ignoreStatistics)
7584
ignoreStatistics,
7685
maxReadBlockSize,
7786
maxMergeDistance,
78-
maxBufferSize);
87+
maxBufferSize,
88+
useColumnIndex);
7989
}
8090

8191
public ParquetReaderOptions withMaxReadBlockSize(DataSize maxReadBlockSize)
@@ -84,7 +94,8 @@ public ParquetReaderOptions withMaxReadBlockSize(DataSize maxReadBlockSize)
8494
ignoreStatistics,
8595
maxReadBlockSize,
8696
maxMergeDistance,
87-
maxBufferSize);
97+
maxBufferSize,
98+
useColumnIndex);
8899
}
89100

90101
public ParquetReaderOptions withMaxMergeDistance(DataSize maxMergeDistance)
@@ -93,7 +104,8 @@ public ParquetReaderOptions withMaxMergeDistance(DataSize maxMergeDistance)
93104
ignoreStatistics,
94105
maxReadBlockSize,
95106
maxMergeDistance,
96-
maxBufferSize);
107+
maxBufferSize,
108+
useColumnIndex);
97109
}
98110

99111
public ParquetReaderOptions withMaxBufferSize(DataSize maxBufferSize)
@@ -102,6 +114,17 @@ public ParquetReaderOptions withMaxBufferSize(DataSize maxBufferSize)
102114
ignoreStatistics,
103115
maxReadBlockSize,
104116
maxMergeDistance,
105-
maxBufferSize);
117+
maxBufferSize,
118+
useColumnIndex);
119+
}
120+
121+
public ParquetReaderOptions withUseColumnIndex(boolean useColumnIndex)
122+
{
123+
return new ParquetReaderOptions(
124+
ignoreStatistics,
125+
maxReadBlockSize,
126+
maxMergeDistance,
127+
maxBufferSize,
128+
useColumnIndex);
106129
}
107130
}

lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.parquet;
1515

16+
import io.airlift.slice.Slice;
1617
import io.trino.spi.type.DecimalType;
1718
import org.apache.parquet.column.Encoding;
1819
import org.apache.parquet.io.ColumnIO;
@@ -25,13 +26,16 @@
2526
import org.apache.parquet.schema.GroupType;
2627
import org.apache.parquet.schema.MessageType;
2728

29+
import java.math.BigInteger;
30+
import java.nio.ByteBuffer;
2831
import java.util.Arrays;
2932
import java.util.HashMap;
3033
import java.util.List;
3134
import java.util.Map;
3235
import java.util.Optional;
3336

3437
import static com.google.common.base.Preconditions.checkArgument;
38+
import static io.trino.spi.type.UnscaledDecimal128Arithmetic.unscaledDecimal;
3539
import static org.apache.parquet.schema.OriginalType.DECIMAL;
3640
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
3741

@@ -241,4 +245,24 @@ public static long getShortDecimalValue(byte[] bytes)
241245

242246
return value;
243247
}
248+
249+
public static Slice getLongDecimalValue(byte[] bytes)
250+
{
251+
BigInteger value = new BigInteger(bytes);
252+
return unscaledDecimal(value);
253+
}
254+
255+
public static long getShortDecimalValue(ByteBuffer buffer)
256+
{
257+
byte[] array = new byte[buffer.remaining()];
258+
buffer.get(array);
259+
return getShortDecimalValue(array);
260+
}
261+
262+
public static Slice getLongDecimalValue(ByteBuffer buffer)
263+
{
264+
byte[] array = new byte[buffer.remaining()];
265+
buffer.get(array);
266+
return getLongDecimalValue(array);
267+
}
244268
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
*/
1414
package io.trino.parquet.predicate;
1515

16-
public class ParquetIntegerStatistics
16+
public class ParquetLongStatistics
1717
implements ParquetRangeStatistics<Long>
1818
{
1919
private final Long minimum;
2020
private final Long maximum;
2121

22-
public ParquetIntegerStatistics(Long minimum, Long maximum)
22+
public ParquetLongStatistics(Long minimum, Long maximum)
2323
{
2424
this.minimum = minimum;
2525
this.maximum = maximum;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515

1616
import io.airlift.slice.Slice;
1717

18-
public class ParquetStringStatistics
18+
public class ParquetSliceStatistics
1919
implements ParquetRangeStatistics<Slice>
2020
{
2121
private final Slice minimum;
2222
private final Slice maximum;
2323

24-
public ParquetStringStatistics(Slice minimum, Slice maximum)
24+
public ParquetSliceStatistics(Slice minimum, Slice maximum)
2525
{
2626
this.minimum = minimum;
2727
this.maximum = maximum;

0 commit comments

Comments
 (0)