Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions contrib/storage-hive/hive-exec-shade/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@
<packaging>jar</packaging>
<name>contrib/hive-storage-plugin/hive-exec-shaded</name>

<properties>
<hive.parquet.version>1.8.3</hive.parquet.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${hive.parquet.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down Expand Up @@ -68,11 +82,6 @@
</exclusion>
</exclusions>
</dependency>
<!--Once newer hive-exec version leverages parquet-column 1.9.0, this dependency can be deleted -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</dependency>
</dependencies>

<build>
Expand All @@ -83,7 +92,7 @@
<artifactSet>
<includes>
<include>org.apache.hive:hive-exec</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop-bundle</include>
<include>commons-codec:commons-codec</include>
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-annotations</include>
Expand Down Expand Up @@ -117,6 +126,10 @@
<pattern>org.apache.parquet.</pattern>
<shadedPattern>hive.org.apache.parquet.</shadedPattern>
</relocation>
<relocation>
<pattern>shaded.parquet.</pattern>
<shadedPattern>hive.shaded.parquet.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro.</pattern>
<shadedPattern>hive.org.apache.avro.</shadedPattern>
Expand Down
75 changes: 0 additions & 75 deletions exec/java-exec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,92 +249,17 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format</artifactId>
<version>2.3.0-incubating</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-jackson</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-generator</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static <C extends Comparable<C>> LogicalExpression createEqualPredicate(
// can drop when left's max < right's min, or right's max < left's min
final C leftMin = leftStat.genericGetMin();
final C rightMin = rightStat.genericGetMin();
return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0;
return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0);
}) {
@Override
public String toString() {
Expand All @@ -132,7 +132,7 @@ private static <C extends Comparable<C>> LogicalExpression createGTPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when left's max <= right's min.
final C rightMin = rightStat.genericGetMin();
return leftStat.genericGetMax().compareTo(rightMin) <= 0;
return leftStat.compareMaxToValue(rightMin) <= 0;
});
}

Expand All @@ -146,7 +146,7 @@ private static <C extends Comparable<C>> LogicalExpression createGEPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when left's max < right's min.
final C rightMin = rightStat.genericGetMin();
return leftStat.genericGetMax().compareTo(rightMin) < 0;
return leftStat.compareMaxToValue(rightMin) < 0;
});
}

Expand All @@ -160,7 +160,7 @@ private static <C extends Comparable<C>> LogicalExpression createLTPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when right's max <= left's min.
final C leftMin = leftStat.genericGetMin();
return rightStat.genericGetMax().compareTo(leftMin) <= 0;
return rightStat.compareMaxToValue(leftMin) <= 0;
});
}

Expand All @@ -173,7 +173,7 @@ private static <C extends Comparable<C>> LogicalExpression createLEPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when right's max < left's min.
final C leftMin = leftStat.genericGetMin();
return rightStat.genericGetMax().compareTo(leftMin) < 0;
return rightStat.compareMaxToValue(leftMin) < 0;
});
}

Expand All @@ -188,8 +188,8 @@ private static <C extends Comparable<C>> LogicalExpression createNEPredicate(
// can drop when there is only one unique value.
final C leftMax = leftStat.genericGetMax();
final C rightMax = rightStat.genericGetMax();
return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 &&
leftStat.genericGetMax().compareTo(rightMax) == 0;
return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 &&
leftStat.compareMaxToValue(rightMax) == 0;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.Statistics;

import java.util.ArrayList;
Expand Down Expand Up @@ -114,7 +115,7 @@ private static <C extends Comparable<C>> LogicalExpression createIsNotNullPredic
private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if max value is not true or if there are all nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount())
(exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
);
}

Expand All @@ -124,7 +125,7 @@ private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if min value is not false or if there are all nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount())
(exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
);
}

Expand All @@ -134,7 +135,7 @@ private static LogicalExpression createIsFalsePredicate(LogicalExpression expr)
private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if min value is not false or if there are no nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
(exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
);
}

Expand All @@ -144,7 +145,7 @@ private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr
private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if max value is not true or if there are no nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
(exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static boolean isNullOrEmpty(Statistics stat) {
* False if at least one row is not null.
*/
static boolean isAllNulls(Statistics stat, long rowCount) {
return stat.getNumNulls() == rowCount;
return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
}

/**
Expand All @@ -54,7 +54,7 @@ static boolean isAllNulls(Statistics stat, long rowCount) {
* False if the parquet file hasn't nulls.
*/
static boolean hasNoNulls(Statistics stat) {
return stat.getNumNulls() == 0;
return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

Expand All @@ -50,6 +51,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;

public abstract class AbstractParquetScanBatchCreator {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
Expand Down Expand Up @@ -146,11 +150,15 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);

private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
Configuration newConf = new Configuration(conf);
conf = new Configuration(conf);
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER);
conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
return reader.getFooter();
}
}

private boolean isComplex(ParquetMetadata footer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.util.CompatibilityUtil;
import org.apache.parquet.hadoop.util.HadoopStreams;

public class ColumnDataReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
Expand Down Expand Up @@ -58,11 +57,7 @@ public BytesInput getPageAsBytesInput(int pageLength) throws IOException{

public void loadPage(DrillBuf target, int pageLength) throws IOException {
target.clear();
ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
int lengthLeftToRead = pageLength;
while (lengthLeftToRead > 0) {
lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead);
}
HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
target.writerIndex(pageLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.base.Preconditions;

import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;

public class FooterGatherer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
Expand Down Expand Up @@ -160,7 +161,8 @@ public static Footer readFooter(final Configuration config, final FileStatus sta
footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
}

ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes));
final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER);
Footer footer = new Footer(status.getPath(), metadata);
return footer;
}
Expand Down
Loading