Skip to content

Commit 55d791c

Browse files
authored
PARQUET-1389: Improve value skipping at page synchronization (#514)
1 parent 1f95eca commit 55d791c

File tree

7 files changed

+109
-20
lines changed

7 files changed

+109
-20
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ private static abstract class Binding {
7272
*/
7373
abstract void skip();
7474

75+
/**
76+
* Skips n values from the underlying page
77+
*
78+
* @param n
79+
* the number of values to be skipped
80+
*/
81+
abstract void skip(int n);
82+
7583
/**
7684
* write current value to converter
7785
*/
@@ -163,6 +171,10 @@ void read() {
163171
public void skip() {
164172
dataColumn.skip();
165173
}
174+
@Override
175+
void skip(int n) {
176+
dataColumn.skip(n);
177+
}
166178
public int getDictionaryId() {
167179
return dictionaryId;
168180
}
@@ -203,6 +215,11 @@ public void skip() {
203215
current = 0;
204216
dataColumn.skip();
205217
}
218+
@Override
219+
void skip(int n) {
220+
current = 0;
221+
dataColumn.skip(n);
222+
}
206223
public float getFloat() {
207224
return current;
208225
}
@@ -222,6 +239,11 @@ public void skip() {
222239
current = 0;
223240
dataColumn.skip();
224241
}
242+
@Override
243+
void skip(int n) {
244+
current = 0;
245+
dataColumn.skip(n);
246+
}
225247
public double getDouble() {
226248
return current;
227249
}
@@ -242,6 +264,11 @@ public void skip() {
242264
dataColumn.skip();
243265
}
244266
@Override
267+
void skip(int n) {
268+
current = 0;
269+
dataColumn.skip(n);
270+
}
271+
@Override
245272
public int getInteger() {
246273
return current;
247274
}
@@ -262,6 +289,11 @@ public void skip() {
262289
dataColumn.skip();
263290
}
264291
@Override
292+
void skip(int n) {
293+
current = 0;
294+
dataColumn.skip(n);
295+
}
296+
@Override
265297
public long getLong() {
266298
return current;
267299
}
@@ -291,6 +323,11 @@ public void skip() {
291323
dataColumn.skip();
292324
}
293325
@Override
326+
void skip(int n) {
327+
current = false;
328+
dataColumn.skip(n);
329+
}
330+
@Override
294331
public boolean getBoolean() {
295332
return current;
296333
}
@@ -311,6 +348,11 @@ public void skip() {
311348
dataColumn.skip();
312349
}
313350
@Override
351+
void skip(int n) {
352+
current = null;
353+
dataColumn.skip(n);
354+
}
355+
@Override
314356
public Binary getBinary() {
315357
return current;
316358
}
@@ -511,6 +553,7 @@ public int getCurrentDefinitionLevel() {
511553

512554
private void checkRead() {
513555
int rl, dl;
556+
int skipValues = 0;
514557
for (;;) {
515558
if (isPageFullyConsumed()) {
516559
if (isFullyConsumed()) {
@@ -519,6 +562,7 @@ private void checkRead() {
519562
return;
520563
}
521564
readPage();
565+
skipValues = 0;
522566
}
523567
rl = repetitionLevelColumn.nextInt();
524568
dl = definitionLevelColumn.nextInt();
@@ -527,9 +571,10 @@ private void checkRead() {
527571
break;
528572
}
529573
if (dl == maxDefinitionLevel) {
530-
binding.skip();
574+
++skipValues;
531575
}
532576
}
577+
binding.skip(skipValues);
533578
repetitionLevel = rl;
534579
definitionLevel = dl;
535580
}

parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,17 @@ public long readLong() {
109109
* Skips the next value in the page
110110
*/
111111
abstract public void skip();
112+
113+
/**
114+
* Skips the next n value in the page
115+
*
116+
* @param n
117+
* the number of values to be skipped
118+
*/
119+
public void skip(int n) {
120+
for (int i = 0; i < n; ++i) {
121+
skip();
122+
}
123+
}
112124
}
113125

parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ public void skip() {
8888
valuesRead++;
8989
}
9090

91+
@Override
92+
public void skip(int n) {
93+
// checkRead() is invoked before incrementing valuesRead so increase valuesRead size in 2 steps
94+
valuesRead += n - 1;
95+
checkRead();
96+
++valuesRead;
97+
}
98+
9199
@Override
92100
public int readInteger() {
93101
// TODO: probably implement it separately

parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121

2222
import java.io.IOException;
23-
import java.nio.ByteBuffer;
24-
2523
import org.apache.parquet.bytes.ByteBufferInputStream;
2624
import org.apache.parquet.column.values.ValuesReader;
2725
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -64,7 +62,15 @@ public Binary readBytes() {
6462

6563
@Override
6664
public void skip() {
67-
int length = lengthReader.readInteger();
65+
skip(1);
66+
}
67+
68+
@Override
69+
public void skip(int n) {
70+
int length = 0;
71+
for (int i = 0; i < n; ++i) {
72+
length += lengthReader.readInteger();
73+
}
6874
try {
6975
in.skipFully(length);
7076
} catch (IOException e) {

parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.parquet.column.values.plain;
2020

2121
import java.io.IOException;
22-
import java.nio.ByteBuffer;
2322
import org.apache.parquet.bytes.ByteBufferInputStream;
2423
import org.apache.parquet.column.values.ValuesReader;
2524
import org.apache.parquet.io.ParquetDecodingException;
@@ -51,8 +50,13 @@ public Binary readBytes() {
5150

5251
@Override
5352
public void skip() {
53+
skip(1);
54+
}
55+
56+
@Override
57+
public void skip(int n) {
5458
try {
55-
in.skipFully(length);
59+
in.skipFully(n * length);
5660
} catch (IOException | RuntimeException e) {
5761
throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
5862
}

parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,26 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
4141
this.in = new LittleEndianDataInputStream(stream.remainingStream());
4242
}
4343

44+
@Override
45+
public void skip() {
46+
skip(1);
47+
}
48+
49+
void skipBytesFully(int n) throws IOException {
50+
int skipped = 0;
51+
while (skipped < n) {
52+
skipped += in.skipBytes(n - skipped);
53+
}
54+
}
55+
4456
public static class DoublePlainValuesReader extends PlainValuesReader {
4557

4658
@Override
47-
public void skip() {
59+
public void skip(int n) {
4860
try {
49-
in.skipBytes(8);
61+
skipBytesFully(n * 8);
5062
} catch (IOException e) {
51-
throw new ParquetDecodingException("could not skip double", e);
63+
throw new ParquetDecodingException("could not skip " + n + " double values", e);
5264
}
5365
}
5466

@@ -65,11 +77,11 @@ public double readDouble() {
6577
public static class FloatPlainValuesReader extends PlainValuesReader {
6678

6779
@Override
68-
public void skip() {
80+
public void skip(int n) {
6981
try {
70-
in.skipBytes(4);
82+
skipBytesFully(n * 4);
7183
} catch (IOException e) {
72-
throw new ParquetDecodingException("could not skip float", e);
84+
throw new ParquetDecodingException("could not skip " + n + " floats", e);
7385
}
7486
}
7587

@@ -86,11 +98,11 @@ public float readFloat() {
8698
public static class IntegerPlainValuesReader extends PlainValuesReader {
8799

88100
@Override
89-
public void skip() {
101+
public void skip(int n) {
90102
try {
91-
in.skipBytes(4);
103+
in.skipBytes(n * 4);
92104
} catch (IOException e) {
93-
throw new ParquetDecodingException("could not skip int", e);
105+
throw new ParquetDecodingException("could not skip " + n + " ints", e);
94106
}
95107
}
96108

@@ -107,11 +119,11 @@ public int readInteger() {
107119
public static class LongPlainValuesReader extends PlainValuesReader {
108120

109121
@Override
110-
public void skip() {
122+
public void skip(int n) {
111123
try {
112-
in.skipBytes(8);
124+
in.skipBytes(n * 8);
113125
} catch (IOException e) {
114-
throw new ParquetDecodingException("could not skip long", e);
126+
throw new ParquetDecodingException("could not skip " + n + " longs", e);
115127
}
116128
}
117129

parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.parquet.column.values.rle;
2020

2121
import java.io.IOException;
22-
import java.nio.ByteBuffer;
23-
2422
import org.apache.parquet.bytes.ByteBufferInputStream;
2523
import org.apache.parquet.column.values.ValuesReader;
2624

@@ -43,4 +41,8 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
4341
public void skip() {
4442
}
4543

44+
@Override
45+
public void skip(int n) {
46+
}
47+
4648
}

0 commit comments

Comments
 (0)