Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.8.1</parquet.version>
<parquet.version>1.8.1-palantir7</parquet.version>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.2.16.v20160414</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
Expand Down Expand Up @@ -223,6 +223,18 @@
<CodeCacheSize>512m</CodeCacheSize>
</properties>
<repositories>
<repository>
<id>palantir</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
<name>Palantir Maven Repository</name>
<url>http://dl.bintray.com/palantir/releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>central</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import org.apache.parquet.filter2.compat.RowGroupFilter;
import scala.Option;

import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
Expand Down Expand Up @@ -105,9 +107,13 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (rowGroupOffsets == null) {
// then we need to apply the predicate push down filter
footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
MessageType fileSchema = footer.getFileMetaData().getSchema();
FilterCompat.Filter filter = getFilter(configuration);
blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
ParquetFileReader reader = ParquetFileReader.open(taskAttemptContext.getConfiguration(), file, footer);
blocks = filterRowGroups(
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS, RowGroupFilter.FilterLevel.DICTIONARY),
filter,
footer.getBlocks(),
reader);
} else {
// otherwise we find the row groups that were selected on the client
footer = readFooter(configuration, file, NO_FILTER);
Expand Down Expand Up @@ -146,8 +152,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
this.reader = new ParquetFileReader(
configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
this.reader = new ParquetFileReader(configuration, file, footer);
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ private void readPageV2(DataPageV2 page) throws IOException {
this.defColumn = new VectorizedRleValuesReader(bitWidth);
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
this.defColumn.initFromBuffer(
this.pageValueCount, page.getDefinitionLevels().toByteArray());
this.pageValueCount, page.getDefinitionLevels().toByteBuffer());
try {
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
public VectorizedPlainValuesReader() {
}

@Override
public void initFromPage(int valueCount, ByteBuffer byteBuffer, int offset) throws IOException {
initFromPage(valueCount, byteBuffer.array(), offset);
}

@Override
public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
this.buffer = bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.spark.sql.execution.vectorized.ColumnVector;

import java.nio.ByteBuffer;

/**
* A values reader for Parquet's run-length encoded data. This is based off of the version in
* parquet-mr with these changes:
Expand All @@ -49,7 +51,7 @@ private enum MODE {
}

// Encoded data.
private byte[] in;
private ByteBuffer in;
private int end;
private int offset;

Expand Down Expand Up @@ -81,7 +83,7 @@ public VectorizedRleValuesReader(int bitWidth) {
}

@Override
public void initFromPage(int valueCount, byte[] page, int start) {
public void initFromPage(int valueCount, ByteBuffer page, int start) {
this.offset = start;
this.in = page;
if (fixedWidth) {
Expand All @@ -90,8 +92,8 @@ public void initFromPage(int valueCount, byte[] page, int start) {
this.end = this.offset + length;
}
} else {
this.end = page.length;
if (this.end != this.offset) init(page[this.offset++] & 255);
this.end = page.limit();
if (this.end != this.offset) init(page.get(this.offset++) & 255);
}
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
Expand All @@ -105,10 +107,10 @@ public void initFromPage(int valueCount, byte[] page, int start) {

// Initialize the reader from a buffer. This is used for the V2 page encoding where the
// definition are in its own buffer.
public void initFromBuffer(int valueCount, byte[] data) {
public void initFromBuffer(int valueCount, ByteBuffer data) {
this.offset = 0;
this.in = data;
this.end = data.length;
this.end = data.limit();
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
Expand Down Expand Up @@ -527,7 +529,7 @@ private int readUnsignedVarInt() {
int shift = 0;
int b;
do {
b = in[offset++] & 255;
b = in.get(offset++) & 255;
value |= (b & 0x7F) << shift;
shift += 7;
} while ((b & 0x80) != 0);
Expand All @@ -538,10 +540,10 @@ private int readUnsignedVarInt() {
* Reads the next 4 byte little endian int.
*/
private int readIntLittleEndian() {
int ch4 = in[offset] & 255;
int ch3 = in[offset + 1] & 255;
int ch2 = in[offset + 2] & 255;
int ch1 = in[offset + 3] & 255;
int ch4 = in.get(offset) & 255;
int ch3 = in.get(offset + 1) & 255;
int ch2 = in.get(offset + 2) & 255;
int ch1 = in.get(offset + 3) & 255;
offset += 4;
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
Expand All @@ -554,17 +556,17 @@ private int readIntLittleEndianPaddedOnBitWidth() {
case 0:
return 0;
case 1:
return in[offset++] & 255;
return in.get(offset++) & 255;
case 2: {
int ch2 = in[offset] & 255;
int ch1 = in[offset + 1] & 255;
int ch2 = in.get(offset) & 255;
int ch1 = in.get(offset + 1) & 255;
offset += 2;
return (ch1 << 8) + ch2;
}
case 3: {
int ch3 = in[offset] & 255;
int ch2 = in[offset + 1] & 255;
int ch1 = in[offset + 2] & 255;
int ch3 = in.get(offset) & 255;
int ch2 = in.get(offset + 1) & 255;
int ch1 = in.get(offset + 2) & 255;
offset += 3;
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
}
Expand Down