diff --git a/pom.xml b/pom.xml index 8408f4b1fa5ed..fef36a41cab4e 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ 1.2.1 10.12.1.1 - 1.8.1 + 1.8.1-palantir7 1.6.0 9.2.16.v20160414 3.1.0 @@ -223,6 +223,18 @@ 512m + + palantir + + Palantir Maven Repository + http://dl.bintray.com/palantir/releases + + true + + + false + + central diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 06cd9ea2d242c..5738131718a83 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Set; +import com.google.common.collect.ImmutableList; +import org.apache.parquet.filter2.compat.RowGroupFilter; import scala.Option; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; @@ -105,9 +107,13 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont if (rowGroupOffsets == null) { // then we need to apply the predicate push down filter footer = readFooter(configuration, file, range(split.getStart(), split.getEnd())); - MessageType fileSchema = footer.getFileMetaData().getSchema(); FilterCompat.Filter filter = getFilter(configuration); - blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); + ParquetFileReader reader = ParquetFileReader.open(taskAttemptContext.getConfiguration(), file, footer); + blocks = filterRowGroups( + ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS, RowGroupFilter.FilterLevel.DICTIONARY), + filter, + footer.getBlocks(), + reader); } else { // otherwise we find the row groups that were selected on the client footer = readFooter(configuration, file, NO_FILTER); @@ -146,8 +152,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont String sparkRequestedSchemaString = configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA()); this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); - this.reader = new ParquetFileReader( - configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); + this.reader = new ParquetFileReader(configuration, file, footer); for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index cb51cb499eede..7dea4e44d39e5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -539,7 +539,7 @@ private void readPageV2(DataPageV2 page) throws IOException { this.defColumn = new VectorizedRleValuesReader(bitWidth); this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn); this.defColumn.initFromBuffer( - this.pageValueCount, page.getDefinitionLevels().toByteArray()); + this.pageValueCount, page.getDefinitionLevels().toByteBuffer()); try { initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0); } catch (IOException e) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 98018b7f48bd8..949d6533c64b7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -41,6 +41,11 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori public VectorizedPlainValuesReader() { } + @Override + public void initFromPage(int valueCount, ByteBuffer byteBuffer, int offset) throws IOException { + initFromPage(valueCount, byteBuffer.array(), offset); + } + @Override public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException { this.buffer = bytes; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 62157389013bb..31511f218edcd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -27,6 +27,8 @@ import org.apache.spark.sql.execution.vectorized.ColumnVector; +import java.nio.ByteBuffer; + /** * A values reader for Parquet's run-length encoded data. This is based off of the version in * parquet-mr with these changes: @@ -49,7 +51,7 @@ private enum MODE { } // Encoded data. - private byte[] in; + private ByteBuffer in; private int end; private int offset; @@ -81,7 +83,7 @@ public VectorizedRleValuesReader(int bitWidth) { } @Override - public void initFromPage(int valueCount, byte[] page, int start) { + public void initFromPage(int valueCount, ByteBuffer page, int start) { this.offset = start; this.in = page; if (fixedWidth) { @@ -90,8 +92,8 @@ public void initFromPage(int valueCount, byte[] page, int start) { this.end = this.offset + length; } } else { - this.end = page.length; - if (this.end != this.offset) init(page[this.offset++] & 255); + this.end = page.limit(); + if (this.end != this.offset) init(page.get(this.offset++) & 255); } if (bitWidth == 0) { // 0 bit width, treat this as an RLE run of valueCount number of 0's. @@ -105,10 +107,10 @@ public void initFromPage(int valueCount, byte[] page, int start) { // Initialize the reader from a buffer. This is used for the V2 page encoding where the // definition are in its own buffer. - public void initFromBuffer(int valueCount, byte[] data) { + public void initFromBuffer(int valueCount, ByteBuffer data) { this.offset = 0; this.in = data; - this.end = data.length; + this.end = data.limit(); if (bitWidth == 0) { // 0 bit width, treat this as an RLE run of valueCount number of 0's. this.mode = MODE.RLE; @@ -527,7 +529,7 @@ private int readUnsignedVarInt() { int shift = 0; int b; do { - b = in[offset++] & 255; + b = in.get(offset++) & 255; value |= (b & 0x7F) << shift; shift += 7; } while ((b & 0x80) != 0); @@ -538,10 +540,10 @@ private int readUnsignedVarInt() { * Reads the next 4 byte little endian int. */ private int readIntLittleEndian() { - int ch4 = in[offset] & 255; - int ch3 = in[offset + 1] & 255; - int ch2 = in[offset + 2] & 255; - int ch1 = in[offset + 3] & 255; + int ch4 = in.get(offset) & 255; + int ch3 = in.get(offset + 1) & 255; + int ch2 = in.get(offset + 2) & 255; + int ch1 = in.get(offset + 3) & 255; offset += 4; return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } @@ -554,17 +556,17 @@ private int readIntLittleEndianPaddedOnBitWidth() { case 0: return 0; case 1: - return in[offset++] & 255; + return in.get(offset++) & 255; case 2: { - int ch2 = in[offset] & 255; - int ch1 = in[offset + 1] & 255; + int ch2 = in.get(offset) & 255; + int ch1 = in.get(offset + 1) & 255; offset += 2; return (ch1 << 8) + ch2; } case 3: { - int ch3 = in[offset] & 255; - int ch2 = in[offset + 1] & 255; - int ch1 = in[offset + 2] & 255; + int ch3 = in.get(offset) & 255; + int ch2 = in.get(offset + 1) & 255; + int ch1 = in.get(offset + 2) & 255; offset += 3; return (ch1 << 16) + (ch2 << 8) + (ch3 << 0); }