diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml b/contrib/storage-hive/hive-exec-shade/pom.xml
index 6f511adf717..98fd4b81507 100644
--- a/contrib/storage-hive/hive-exec-shade/pom.xml
+++ b/contrib/storage-hive/hive-exec-shade/pom.xml
@@ -31,6 +31,20 @@
jar
contrib/hive-storage-plugin/hive-exec-shaded
+
+ 1.8.3
+
+
+
+
+
+ org.apache.parquet
+ parquet-hadoop-bundle
+ ${hive.parquet.version}
+
+
+
+
org.apache.hive
@@ -68,11 +82,6 @@
-
-
- org.apache.parquet
- parquet-column
-
@@ -83,7 +92,7 @@
org.apache.hive:hive-exec
- org.apache.parquet:parquet-column
+ org.apache.parquet:parquet-hadoop-bundle
commons-codec:commons-codec
com.fasterxml.jackson.core:jackson-databind
com.fasterxml.jackson.core:jackson-annotations
@@ -117,6 +126,10 @@
org.apache.parquet.
hive.org.apache.parquet.
+
+ shaded.parquet.
+ hive.shaded.parquet.
+
org.apache.avro.
hive.org.apache.avro.
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2205c2f4cd4..7701e76165c 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -249,92 +249,17 @@
org.apache.parquet
parquet-hadoop
- ${parquet.version}
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-common
-
org.apache.parquet
parquet-format
- 2.3.0-incubating
-
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-common
-
-
org.apache.parquet
parquet-common
${parquet.version}
-
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-common
-
-
-
-
- org.apache.parquet
- parquet-jackson
- ${parquet.version}
-
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-common
-
-
-
-
- org.apache.parquet
- parquet-encoding
- ${parquet.version}
-
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-common
-
-
-
-
- org.apache.parquet
- parquet-generator
- ${parquet.version}
-
-
- org.apache.hadoop
- hadoop-client
-
-
- org.apache.hadoop
- hadoop-common
-
-
javax.inject
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 9e561ad364a..ebceefb4355 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -113,7 +113,7 @@ private static > LogicalExpression createEqualPredicate(
// can drop when left's max < right's min, or right's max < left's min
final C leftMin = leftStat.genericGetMin();
final C rightMin = rightStat.genericGetMin();
- return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0;
+ return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0);
}) {
@Override
public String toString() {
@@ -132,7 +132,7 @@ private static > LogicalExpression createGTPredicate(
return new ParquetComparisonPredicate(left, right, (leftStat, rightStat) -> {
// can drop when left's max <= right's min.
final C rightMin = rightStat.genericGetMin();
- return leftStat.genericGetMax().compareTo(rightMin) <= 0;
+ return leftStat.compareMaxToValue(rightMin) <= 0;
});
}
@@ -146,7 +146,7 @@ private static > LogicalExpression createGEPredicate(
return new ParquetComparisonPredicate(left, right, (leftStat, rightStat) -> {
// can drop when left's max < right's min.
final C rightMin = rightStat.genericGetMin();
- return leftStat.genericGetMax().compareTo(rightMin) < 0;
+ return leftStat.compareMaxToValue(rightMin) < 0;
});
}
@@ -160,7 +160,7 @@ private static > LogicalExpression createLTPredicate(
return new ParquetComparisonPredicate(left, right, (leftStat, rightStat) -> {
// can drop when right's max <= left's min.
final C leftMin = leftStat.genericGetMin();
- return rightStat.genericGetMax().compareTo(leftMin) <= 0;
+ return rightStat.compareMaxToValue(leftMin) <= 0;
});
}
@@ -173,7 +173,7 @@ private static > LogicalExpression createLEPredicate(
return new ParquetComparisonPredicate(left, right, (leftStat, rightStat) -> {
// can drop when right's max < left's min.
final C leftMin = leftStat.genericGetMin();
- return rightStat.genericGetMax().compareTo(leftMin) < 0;
+ return rightStat.compareMaxToValue(leftMin) < 0;
});
}
@@ -188,8 +188,8 @@ private static > LogicalExpression createNEPredicate(
// can drop when there is only one unique value.
final C leftMax = leftStat.genericGetMax();
final C rightMax = rightStat.genericGetMax();
- return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 &&
- leftStat.genericGetMax().compareTo(rightMax) == 0;
+ return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 &&
+ leftStat.compareMaxToValue(rightMax) == 0;
});
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
index 9b041020f65..547dc067045 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
@@ -23,6 +23,7 @@
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.Statistics;
import java.util.ArrayList;
@@ -114,7 +115,7 @@ private static > LogicalExpression createIsNotNullPredic
private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate(expr,
//if max value is not true or if there are all nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount())
+ (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
);
}
@@ -124,7 +125,7 @@ private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate(expr,
//if min value is not false or if there are all nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount())
+ (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
);
}
@@ -134,7 +135,7 @@ private static LogicalExpression createIsFalsePredicate(LogicalExpression expr)
private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate(expr,
//if min value is not false or if there are no nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
+ (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
);
}
@@ -144,7 +145,7 @@ private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr
private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate(expr,
//if max value is not true or if there are no nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
+ (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index 7ff1036443f..f804a7b06f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -43,7 +43,7 @@ static boolean isNullOrEmpty(Statistics stat) {
* False if at least one row is not null.
*/
static boolean isAllNulls(Statistics stat, long rowCount) {
- return stat.getNumNulls() == rowCount;
+ return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
}
/**
@@ -54,7 +54,7 @@ static boolean isAllNulls(Statistics stat, long rowCount) {
* False if the parquet file hasn't nulls.
*/
static boolean hasNoNulls(Statistics stat) {
- return stat.getNumNulls() == 0;
+ return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 6a320b85b38..dc09ce1b695 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -33,11 +33,12 @@
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -50,6 +51,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
public abstract class AbstractParquetScanBatchCreator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
@@ -146,11 +150,15 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
- Configuration newConf = new Configuration(conf);
+ conf = new Configuration(conf);
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
- return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER);
+ conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
+ ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
+ return reader.getFooter();
+ }
}
private boolean isComplex(ParquetMetadata footer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index dcd40cf9107..79294daea78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -21,14 +21,13 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
public class ColumnDataReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -58,11 +57,7 @@ public BytesInput getPageAsBytesInput(int pageLength) throws IOException{
public void loadPage(DrillBuf target, int pageLength) throws IOException {
target.clear();
- ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
- int lengthLeftToRead = pageLength;
- while (lengthLeftToRead > 0) {
- lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead);
- }
+ HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
target.writerIndex(pageLength);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index ea34c7d8b85..d1562c48c0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -42,6 +42,7 @@
import com.google.common.base.Preconditions;
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
public class FooterGatherer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -160,7 +161,8 @@ public static Footer readFooter(final Configuration config, final FileStatus sta
footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
}
- ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes));
+ final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
+ ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER);
Footer footer = new Footer(status.getPath(), metadata);
return footer;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 09f1b26e24a..ba6aac9431f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -17,10 +17,11 @@
*/
package org.apache.drill.exec.store.parquet;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
@@ -30,44 +31,42 @@
/**
* {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release
* {@link ByteBuffer} objects.
- * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
+ * To properly release an allocated {@link DrillBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
* that was passed to the Parquet library.
*/
public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
private final BufferAllocator allocator;
- private final HashMap allocatedBuffers = new HashMap<>();
+ private final Map allocatedBuffers = new IdentityHashMap<>();
- public ParquetDirectByteBufferAllocator(OperatorContext o){
- allocator = o.getAllocator();
+ public ParquetDirectByteBufferAllocator(OperatorContext o) {
+ this(o.getAllocator());
}
public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
this.allocator = allocator;
}
-
@Override
public ByteBuffer allocate(int sz) {
- ByteBuf bb = allocator.buffer(sz);
- ByteBuffer b = bb.nioBuffer(0, sz);
- final Key key = new Key(b);
- allocatedBuffers.put(key, bb);
- logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash);
- return b;
+ DrillBuf drillBuf = allocator.buffer(sz);
+ ByteBuffer byteBuffer = drillBuf.nioBuffer(0, sz);
+ allocatedBuffers.put(byteBuffer, drillBuf);
+ logger.debug("{}: Allocated {} bytes. Allocated DrillBuf with id {} and ByteBuffer {}", this, sz, drillBuf.getId(), System.identityHashCode(byteBuffer));
+ return byteBuffer;
}
@Override
- public void release(ByteBuffer b) {
- final Key key = new Key(b);
- final ByteBuf bb = allocatedBuffers.get(key);
+ public void release(ByteBuffer byteBuffer) {
+ final DrillBuf drillBuf = allocatedBuffers.remove(byteBuffer);
// The ByteBuffer passed in may already have been freed or not allocated by this allocator.
// If it is not found in the allocated buffers, do nothing
- if(bb != null) {
- logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash);
- bb.release();
- allocatedBuffers.remove(key);
+ if (drillBuf != null) {
+ logger.debug("{}: Freed DrillBuf with id {} and ByteBuffer {}", this, drillBuf.getId(), System.identityHashCode(byteBuffer));
+ drillBuf.release();
+ } else {
+ logger.warn("{}: ByteBuffer {} is not present", this, System.identityHashCode(byteBuffer));
}
}
@@ -75,41 +74,4 @@ public void release(ByteBuffer b) {
public boolean isDirect() {
return true;
}
-
- /**
- * ByteBuffer wrapper that computes a fixed hashcode.
- *
- * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding
- * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.
- * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case
- * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to
- * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}.
- */
- private class Key {
- final int hash;
- final ByteBuffer buffer;
-
- Key(final ByteBuffer buffer) {
- this.buffer = buffer;
- // remember, we can't use buffer.hashCode()
- this.hash = System.identityHashCode(buffer);
- }
-
- @Override
- public int hashCode() {
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof Key)) {
- return false;
- }
- final Key key = (Key) obj;
- return hash == key.hash && buffer == key.buffer;
- }
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 0e40c9e360b..09179266081 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -54,8 +54,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -241,8 +243,15 @@ private void newSchema() throws IOException {
// once PARQUET-1006 will be resolved
pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
pageSize, new ParquetDirectByteBufferAllocator(oContext));
- store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary,
- writerVersion, new ParquetDirectByteBufferAllocator(oContext));
+ ParquetProperties parquetProperties = ParquetProperties.builder()
+ .withPageSize(pageSize)
+ .withDictionaryEncoding(enableDictionary)
+ .withDictionaryPageSize(initialPageBufferSize)
+ .withWriterVersion(writerVersion)
+ .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
+ .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+ .build();
+ store = new ColumnWriteStoreV1(pageStore, parquetProperties);
MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index bf75695b6be..01d06444b42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBufUtil;
import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -30,6 +31,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
@@ -250,7 +252,7 @@ private DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompr
}
public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException {
- return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
+ return BytesInput.from(buf.nioBuffer(offset, length));
}
@@ -319,41 +321,44 @@ public boolean next() throws IOException {
byteLength = pageHeader.uncompressed_page_size;
- final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
+ final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
readPosInBytes = 0;
if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
- repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ repetitionLevels.initFromPage(currentPageCount, in);
// we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
// a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
// read the first zero here to simplify the reading processes, and start reading the first value the same as all
// of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
// the first list of repetition levels
- readPosInBytes = repetitionLevels.getNextOffset();
+ readPosInBytes = in.position();
repetitionLevels.readInteger();
}
- if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+ if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
parentColumnReader.currDefLevel = -1;
definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
- readPosInBytes = definitionLevels.getNextOffset();
+ definitionLevels.initFromPage(currentPageCount, in);
+ readPosInBytes = in.position();
if (!valueEncoding.usesDictionary()) {
valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ valueReader.initFromPage(currentPageCount, in);
}
}
- if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+ if (valueReader == null && parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ valueReader.initFromPage(currentPageCount, in);
}
if (valueEncoding.usesDictionary()) {
// initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
// actually copying the values out into the vectors
+ Preconditions.checkState(readPosInBytes < pageData.capacity());
+ int index = (int)readPosInBytes;
+ ByteBuffer byteBuffer = pageData.nioBuffer(index, pageData.capacity() - index);
dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
- dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ dictionaryLengthDeterminingReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
dictionaryValueReader = new DictionaryValuesReader(dictionary);
- dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ dictionaryValueReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
parentColumnReader.usingDictionary = true;
} else {
parentColumnReader.usingDictionary = false;
@@ -445,25 +450,29 @@ public void clear(){
* @throws IOException An IO related condition
*/
void resetDefinitionLevelReader(int skipCount) throws IOException {
- if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
- throw new UnsupportedOperationException("Unsupoorted Operation");
- }
+ Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel() == 1);
+ Preconditions.checkState(currentPageCount > 0);
+ final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
- final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
- final int defStartPos = repetitionLevels != null ? repetitionLevels.getNextOffset() : 0;
+
+ final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
+
+ if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+ repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+ repetitionLevels.initFromPage(currentPageCount, in);
+ repetitionLevels.readInteger();
+ }
+
definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
parentColumnReader.currDefLevel = -1;
// Now reinitialize the underlying decoder
- assert currentPageCount > 0 : "Page count should be strictly upper than zero";
- definitionLevels.initFromPage(currentPageCount, pageDataBuffer, defStartPos);
+ definitionLevels.initFromPage(currentPageCount, in);
// Skip values if requested by caller
for (int idx = 0; idx < skipCount; ++idx) {
definitionLevels.skip();
}
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index b6205c1ef3c..385cb8369f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -66,12 +66,7 @@ final class VarLenBulkPageReader {
this.buffer.order(ByteOrder.nativeOrder());
if (pageInfoInput != null) {
- this.pageInfo.pageData = pageInfoInput.pageData;
- this.pageInfo.pageDataOff = pageInfoInput.pageDataOff;
- this.pageInfo.pageDataLen = pageInfoInput.pageDataLen;
- this.pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
- this.pageInfo.definitionLevels = pageInfoInput.definitionLevels;
- this.pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
+ set(pageInfoInput, false);
}
this.columnPrecInfo = columnPrecInfoInput;
@@ -87,15 +82,17 @@ final class VarLenBulkPageReader {
nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
}
- final void set(PageDataInfo pageInfoInput) {
+ final void set(PageDataInfo pageInfoInput, boolean clear) {
pageInfo.pageData = pageInfoInput.pageData;
pageInfo.pageDataOff = pageInfoInput.pageDataOff;
pageInfo.pageDataLen = pageInfoInput.pageDataLen;
pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
pageInfo.definitionLevels = pageInfoInput.definitionLevels;
pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
-
- buffer.clear();
+ pageInfo.numPageValues = pageInfoInput.numPageValues;
+ if (clear) {
+ buffer.clear();
+ }
}
final VarLenColumnBulkEntry getEntry(int valuesToRead) {
@@ -160,4 +157,4 @@ private final VarLenColumnBulkEntry getVLEntry(int valuesToRead) {
}
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 8daf2cc1a52..1b307375974 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -204,7 +204,7 @@ private final void setBufferedPagePayload() {
buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback);
} else {
- buffPagePayload.set(pageInfo);
+ buffPagePayload.set(pageInfo, true);
}
} else {
if (buffPagePayload == null) {
@@ -567,4 +567,4 @@ private Binary getNextEntry() {
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index ab655e92178..a61cc183286 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -39,17 +39,19 @@
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
@@ -87,6 +89,7 @@ public class Metadata {
public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
public static final String METADATA_FILENAME = ".drill.parquet_metadata";
public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
+ public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled";
private final ParquetFormatConfig formatConfig;
@@ -409,9 +412,16 @@ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3
final FileStatus file, final FileSystem fs) throws IOException, InterruptedException {
final ParquetMetadata metadata;
final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
+ final Configuration conf = new Configuration(fs.getConf());
+ final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
+ .useSignedStringMinMax(true)
+ .build();
try {
- metadata = processUserUgi.doAs((PrivilegedExceptionAction)
- () -> ParquetFileReader.readFooter(fs.getConf(), file, ParquetMetadataConverter.NO_FILTER));
+ metadata = processUserUgi.doAs((PrivilegedExceptionAction)() -> {
+ try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) {
+ return parquetFileReader.getFooter();
+ }
+ });
} catch(Exception e) {
logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index f208d6e8238..1d764b1b6d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -21,7 +21,7 @@
import com.google.common.base.Stopwatch;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
import java.io.Closeable;
import java.io.IOException;
@@ -179,7 +179,7 @@ private int getNextBlock() throws IOException {
int nBytes = 0;
if (bytesToRead > 0) {
try {
- nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
+ nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer);
} catch (Exception e) {
logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
throw new IOException((e));
@@ -193,8 +193,8 @@ private int getNextBlock() throws IOException {
logger.trace(
"PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
+ "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset,
- this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS))
- / 1000);
+ this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer,
+ ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index ae09a3708c6..ea2542eb804 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -23,7 +23,8 @@
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -86,12 +87,16 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException
buf.clear();
ByteBuffer directBuffer = buf.nioBuffer(0, len);
int lengthLeftToRead = len;
+ SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
while (lengthLeftToRead > 0) {
if(logger.isTraceEnabled()) {
logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
}
Stopwatch timer = Stopwatch.createStarted();
- int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+ int bytesRead = seekableInputStream.read(directBuffer);
+ if (bytesRead < 0) {
+ return bytesRead;
+ }
lengthLeftToRead -= bytesRead;
if(logger.isTraceEnabled()) {
logger.trace(
@@ -113,7 +118,7 @@ public synchronized DrillBuf getNext(int bytes) throws IOException {
b.release();
throw e;
}
- if (bytesRead <= -1) {
+ if (bytesRead < 0) {
b.release();
return null;
}
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 6e9db7e947e..89731ff2a4e 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -46,7 +46,7 @@
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
import io.netty.buffer.ByteBuf;
@@ -163,12 +163,10 @@ public DataPage readPage() {
ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
lastPage = buf;
ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
- int lengthLeftToRead = pageHeader.compressed_page_size;
- while (lengthLeftToRead > 0) {
- lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
- }
+ HadoopStreams.wrap(in).readFully(buffer);
+ buffer.flip();
return new DataPageV1(
- decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+ decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
@@ -182,28 +180,33 @@ public DataPage readPage() {
buf = allocator.buffer(pageHeader.compressed_page_size);
lastPage = buf;
buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
- lengthLeftToRead = pageHeader.compressed_page_size;
- while (lengthLeftToRead > 0) {
- lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
- }
+ HadoopStreams.wrap(in).readFully(buffer);
+ buffer.flip();
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
BytesInput decompressedPageData =
decompressor.decompress(
- BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
+ BytesInput.from(buffer),
pageHeader.uncompressed_page_size);
+ ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
+ int limit = byteBuffer.limit();
+ byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
+ BytesInput repetitionLevels = BytesInput.from(byteBuffer.slice());
+ byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length());
+ byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+ BytesInput definitionLevels = BytesInput.from(byteBuffer.slice());
+ byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+ byteBuffer.limit(limit);
+ BytesInput data = BytesInput.from(byteBuffer.slice());
+
return new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
- BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
- BytesInput.from(decompressedPageData.toByteBuffer(),
- dataHeaderV2.getRepetition_levels_byte_length(),
- dataHeaderV2.getDefinition_levels_byte_length()),
+ repetitionLevels,
+ definitionLevels,
parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
- BytesInput.from(decompressedPageData.toByteBuffer(),
- dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
- dataSize),
+ data,
uncompressedPageSize,
fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
dataHeaderV2.isIs_compressed()
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 93f9920bd90..0ed22452501 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -17,8 +17,6 @@
*/
package org.apache.parquet.hadoop;
-import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -119,7 +117,7 @@ private ColumnChunkPageWriter(ColumnDescriptor path,
this.path = path;
this.compressor = compressor;
this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
- this.totalStatistics = getStatsBasedOnType(this.path.getType());
+ this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
}
@Override
@@ -226,11 +224,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
writer.writeDictionaryPage(dictionaryPage);
// tracking the dictionary encoding is handled in writeDictionaryPage
}
- List encodings = Lists.newArrayList();
- encodings.addAll(rlEncodings);
- encodings.addAll(dlEncodings);
- encodings.addAll(dataEncodings);
- writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings);
+ writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings);
writer.endColumn();
logger.debug(
String.format(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 50e679ad64f..1da25301311 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -30,6 +30,7 @@
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -737,6 +738,7 @@ public void testBooleanPartitionPruning() throws Exception {
}
}
+ @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
@Test // DRILL-4139
public void testIntervalDayPartitionPruning() throws Exception {
final String intervalDayPartitionTable = "dfs.tmp.`interval_day_partition`";
@@ -762,6 +764,7 @@ public void testIntervalDayPartitionPruning() throws Exception {
}
}
+ @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
@Test // DRILL-4139
public void testIntervalYearPartitionPruning() throws Exception {
final String intervalYearPartitionTable = "dfs.tmp.`interval_yr_partition`";
@@ -812,6 +815,7 @@ public void testVarCharWithNullsPartitionPruning() throws Exception {
}
}
+ @Ignore // Statistics for DECIMAL is not available (see PARQUET-1322).
@Test // DRILL-4139
public void testDecimalPartitionPruning() throws Exception {
List ctasQueries = Lists.newArrayList();
diff --git a/pom.xml b/pom.xml
index 6078dc7c10e..242b134ddb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
1.7.6
18.0
2
- 1.8.1-drill-r0
+ 1.10.0
1.16.0-drill-r3
1.11.0
2.7.6
@@ -1522,6 +1522,36 @@
+
+ org.apache.parquet
+ parquet-format
+ 2.5.0
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+ org.apache.parquet
+ parquet-common
+ ${parquet.version}
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
@@ -2040,6 +2070,14 @@
parquet-hadoop
${parquet.version}
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-common
+
org.xerial.snappy
snappy-java