diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java index ff44b4fa5e..23d859b262 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java @@ -30,6 +30,7 @@ public class ColumnDescriptor implements Comparable { private final String[] path; + private final Type.ID id; private final PrimitiveType type; private final int maxRep; private final int maxDef; @@ -70,7 +71,20 @@ public ColumnDescriptor(String[] path, PrimitiveTypeName type, * @param maxDef the maximum definition level for that path */ public ColumnDescriptor(String[] path, PrimitiveType type, int maxRep, int maxDef) { + this(path, null, type, maxRep, maxDef); + } + + /** + * @param path the path to the leaf field in the schema + * @param id the id to the leaf field in the schema + * @param type the type of the field + * @param maxRep the maximum repetition level for that path + * @param maxDef the maximum definition level for that path + * @deprecated will be removed in 2.0.0; Use {@link #ColumnDescriptor(String[], PrimitiveType, int, int)} + */ + public ColumnDescriptor(String[] path, Type.ID id, PrimitiveType type, int maxRep, int maxDef) { this.path = path; + this.id = id; this.type = type; this.maxRep = maxRep; this.maxDef = maxDef; @@ -83,6 +97,10 @@ public String[] getPath() { return path; } + public Type.ID getId() { + return id; + } + /** * @return the maximum repetition level for that path */ diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 7e55ebcdb9..193b144473 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -57,6 +57,7 @@ public class ParquetProperties { public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024; public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false; + public static final boolean DEFAULT_COLUMN_ID_RESOLUTION = false; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; @@ -105,6 +106,7 @@ public static WriterVersion fromString(String name) { private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final boolean enableByteStreamSplit; + private final boolean columnIdResolution; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -127,6 +129,7 @@ private ParquetProperties(Builder builder) { this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.enableByteStreamSplit = builder.enableByteStreamSplit; + this.columnIdResolution = builder.columnIdResolution; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -266,6 +269,10 @@ public int getMaxBloomFilterBytes() { return maxBloomFilterBytes; } + public boolean getColumnIdResolution() { + return columnIdResolution; + } + public static Builder builder() { return new Builder(); } @@ -310,6 +317,7 @@ public static class Builder { private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED; + private boolean columnIdResolution = DEFAULT_COLUMN_ID_RESOLUTION; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -511,6 +519,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) { return this; } + public Builder withColumnIdResolution(boolean columnIdResolution) { + this.columnIdResolution = columnIdResolution; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index a0490d9ac9..1ac5426c86 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -45,6 +45,7 @@ import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.Operators.UserDefinedByClass; import org.apache.parquet.filter2.predicate.Operators.UserDefinedByInstance; +import org.apache.parquet.schema.Type; /** * The Filter API is expressed through these static methods. @@ -72,26 +73,50 @@ public static IntColumn intColumn(String columnPath) { return new IntColumn(ColumnPath.fromDotString(columnPath)); } + public static IntColumn intColumn(Type.ID id) { + return new IntColumn(id); + } + public static LongColumn longColumn(String columnPath) { return new LongColumn(ColumnPath.fromDotString(columnPath)); } + public static LongColumn longColumn(Type.ID id) { + return new LongColumn(id); + } + public static FloatColumn floatColumn(String columnPath) { return new FloatColumn(ColumnPath.fromDotString(columnPath)); } + public static FloatColumn floatColumn(Type.ID id) { + return new FloatColumn(id); + } + public static DoubleColumn doubleColumn(String columnPath) { return new DoubleColumn(ColumnPath.fromDotString(columnPath)); } + public static DoubleColumn doubleColumn(Type.ID id) { + return new DoubleColumn(id); + } + public static BooleanColumn booleanColumn(String columnPath) { return new BooleanColumn(ColumnPath.fromDotString(columnPath)); } + public static BooleanColumn booleanColumn(Type.ID id) { + return new BooleanColumn(id); + } + public static BinaryColumn binaryColumn(String columnPath) { return new BinaryColumn(ColumnPath.fromDotString(columnPath)); } + public static BinaryColumn binaryColumn(Type.ID id) { + return new BinaryColumn(id); + } + /** * Keeps records if their value is equal to the provided value. * Nulls are treated the same way the java programming language does. diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index d52aa92495..6bce51d3c2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.Type; import static org.apache.parquet.Preconditions.checkArgument; @@ -36,9 +37,15 @@ public final class Operators { private Operators() { } public static abstract class Column> implements Serializable { - private final ColumnPath columnPath; + private Type.ID columnId = null; + private ColumnPath columnPath = null; private final Class columnType; + protected Column(Type.ID columnId, Class columnType) { + this.columnId = columnId; + this.columnType = Objects.requireNonNull(columnType, "columnType cannot be null"); + } + protected Column(ColumnPath columnPath, Class columnType) { this.columnPath = Objects.requireNonNull(columnPath, "columnPath cannot be null");; this.columnType = Objects.requireNonNull(columnType, "columnType cannot be null");; @@ -48,10 +55,18 @@ public Class getColumnType() { return columnType; } + public Type.ID getColumnId() { + return columnId; + } + public ColumnPath getColumnPath() { return columnPath; } + public void setColumnPath(ColumnPath columnPath) { + this.columnPath = columnPath; + } + @Override public String toString() { return "column(" + columnPath.toDotString() + ")"; @@ -82,36 +97,60 @@ public static interface SupportsEqNotEq { } // marker for columns that can be us public static interface SupportsLtGt extends SupportsEqNotEq { } // marker for columns that can be used with lt(), ltEq(), gt(), gtEq() public static final class IntColumn extends Column implements SupportsLtGt { + IntColumn(Type.ID columnId) { + super(columnId, Integer.class); + } + IntColumn(ColumnPath columnPath) { super(columnPath, Integer.class); } } public static final class LongColumn extends Column implements SupportsLtGt { + LongColumn(Type.ID columnId) { + super(columnId, Long.class); + } + LongColumn(ColumnPath columnPath) { super(columnPath, Long.class); } } public static final class DoubleColumn extends Column implements SupportsLtGt { + DoubleColumn(Type.ID columnId) { + super(columnId, Double.class); + } + DoubleColumn(ColumnPath columnPath) { super(columnPath, Double.class); } } public static final class FloatColumn extends Column implements SupportsLtGt { + FloatColumn(Type.ID columnId) { + super(columnId, Float.class); + } + FloatColumn(ColumnPath columnPath) { super(columnPath, Float.class); } } public static final class BooleanColumn extends Column implements SupportsEqNotEq { + BooleanColumn(Type.ID columnId) { + super(columnId, Boolean.class); + } + BooleanColumn(ColumnPath columnPath) { super(columnPath, Boolean.class); } } public static final class BinaryColumn extends Column implements SupportsLtGt { + BinaryColumn(Type.ID columnId) { + super(columnId, Binary.class); + } + BinaryColumn(ColumnPath columnPath) { super(columnPath, Binary.class); } @@ -131,7 +170,12 @@ protected ColumnFilterPredicate(Column column, T value) { this.value = value; String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); - this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")"; + + if (column.getColumnPath() != null) { + this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")"; + } else { + this.toString = name + "(" + column.getColumnId() + ", " + value + ")"; + } } public Column getColumn() { @@ -276,7 +320,11 @@ public Set getValues() { public String toString() { String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); StringBuilder str = new StringBuilder(); - str.append(name).append("(").append(column.getColumnPath().toDotString()).append(", "); + if (column.getColumnPath() != null) { + str.append(name).append("(").append(column.getColumnPath().toDotString()).append(", "); + } else { + str.append(name).append("(").append(column.getColumnId()).append(", "); + } int iter = 0; for (T value : values) { if (iter >= 100) break; @@ -464,7 +512,11 @@ public static final class UserDefinedByClass, U extends super(column); this.udpClass = Objects.requireNonNull(udpClass, "udpClass cannot be null"); String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); - this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")"; + if (column.getColumnPath() != null) { + this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")"; + } else { + this.toString = name + "(" + column.getColumnId() + ", " + udpClass.getName() + ")"; + } // defensively try to instantiate the class early to make sure that it's possible getUserDefinedPredicate(); @@ -518,7 +570,11 @@ public static final class UserDefinedByInstance, U exten super(column); this.udpInstance = Objects.requireNonNull(udpInstance, "udpInstance cannot be null"); String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); - this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")"; + if (column.getColumnPath() != null) { + this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")"; + } else { + this.toString = name + "(" + column.getColumnId() + ", " + udpInstance + ")"; + } } @Override diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index 49fd10cc81..a4cd7bb685 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -18,7 +18,10 @@ */ package org.apache.parquet.filter2.predicate; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,6 +44,7 @@ import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; /** * Inspects the column types found in the provided {@link FilterPredicate} and compares them @@ -170,6 +174,24 @@ private > void validateColumnFilterPredicate(SetColumnFi private > void validateColumn(Column column) { ColumnPath path = column.getColumnPath(); + if (path == null) { + HashSet ids = new HashSet<>(); + Type.ID id = column.getColumnId(); + List columnDescriptors = new ArrayList<>(columnsAccordingToSchema.values()); + for (ColumnDescriptor columnDescriptor : columnDescriptors) { + Type.ID columnId = columnDescriptor.getId(); + if (columnId != null) { + if (ids.contains(columnId)) { + throw new RuntimeException("duplicate id"); + } + ids.add(columnId); + if (columnId.intValue() == id.intValue()) { + column.setColumnPath(ColumnPath.get(columnDescriptor.getPath())); + path = column.getColumnPath(); + } + } + } + } Class alreadySeen = columnTypesEncountered.get(path); if (alreadySeen != null && !alreadySeen.equals(column.getColumnType())) { diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java index af5650e6cc..e775b54b11 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java @@ -19,9 +19,12 @@ package org.apache.parquet.schema; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.InvalidRecordException; /** @@ -94,7 +97,7 @@ public ColumnDescriptor getColumnDescription(String[] path) { int maxRep = getMaxRepetitionLevel(path); int maxDef = getMaxDefinitionLevel(path); PrimitiveType type = getType(path).asPrimitiveType(); - return new ColumnDescriptor(path, type, maxRep, maxDef); + return new ColumnDescriptor(path, getType(path).asPrimitiveType().getId(), type, maxRep, maxDef); } public List getPaths() { @@ -109,6 +112,7 @@ public List getColumns() { PrimitiveType primitiveType = getType(path).asPrimitiveType(); columns.add(new ColumnDescriptor( path, + primitiveType.getId(), primitiveType, getMaxRepetitionLevel(path), getMaxDefinitionLevel(path))); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 8ffe19f2b8..de7aa46e48 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -171,17 +171,18 @@ public void initialize(ParquetFileReader reader, ParquetReadOptions options) { } // initialize a ReadContext for this file + boolean useColumnId = conf.getBoolean(ParquetInputFormat.COLUMN_ID_RESOLUTION, false); this.reader = reader; FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); this.fileSchema = parquetFileMetadata.getSchema(); Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); ReadSupport.ReadContext readContext = readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), fileSchema)); this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); - this.requestedSchema = readContext.getRequestedSchema(); - this.columnCount = requestedSchema.getPaths().size(); + MessageType requestedSchemaByColName = readContext.getRequestedSchema(); + this.columnCount = requestedSchemaByColName.getPaths().size(); // Setting the projection schema before running any filtering (e.g. getting filtered record count) // because projection impacts filtering - reader.setRequestedSchema(requestedSchema); + this.requestedSchema = reader.setRequestedSchema(requestedSchemaByColName, useColumnId); this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext); this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true); this.total = reader.getFilteredRecordCount(); @@ -193,6 +194,7 @@ public void initialize(ParquetFileReader reader, ParquetReadOptions options) { public void initialize(ParquetFileReader reader, Configuration configuration) throws IOException { // initialize a ReadContext for this file + boolean useColumnId = configuration.getBoolean(ParquetInputFormat.COLUMN_ID_RESOLUTION, false); this.reader = reader; FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); this.fileSchema = parquetFileMetadata.getSchema(); @@ -200,11 +202,11 @@ public void initialize(ParquetFileReader reader, Configuration configuration) ReadSupport.ReadContext readContext = readSupport.init(new InitContext( configuration, toSetMultiMap(fileMetadata), fileSchema)); this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); - this.requestedSchema = readContext.getRequestedSchema(); - this.columnCount = requestedSchema.getPaths().size(); + MessageType requestedSchemaByColName = readContext.getRequestedSchema(); + this.columnCount = requestedSchemaByColName.getPaths().size(); // Setting the projection schema before running any filtering (e.g. getting filtered record count) // because projection impacts filtering - reader.setRequestedSchema(requestedSchema); + this.requestedSchema = reader.setRequestedSchema(requestedSchemaByColName, useColumnId); this.recordConverter = readSupport.prepareForRead( configuration, fileMetadata, fileSchema, readContext); this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 63a22d1321..c3e4ed215d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -111,8 +111,10 @@ import org.apache.parquet.io.InputFile; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import org.apache.yetus.audience.InterfaceAudience.Private; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -878,11 +880,97 @@ public List getRowGroups() { return blocks; } - public void setRequestedSchema(MessageType projection) { + private boolean uniqueId(GroupType schema, HashSet ids) { + boolean unique = true; + List fields = schema.getFields(); + for (Type field : fields) { + if (field instanceof PrimitiveType) { + Type.ID id = field.getId(); + if (id != null) { + if (ids.contains(id)) { + return false; + } + ids.add(id); + } + } + + if (field instanceof GroupType) { + Type.ID id = field.getId(); + if (id != null) { + if (ids.contains(id)) { + return false; + } + ids.add(id); + } + if (unique) unique = uniqueId(field.asGroupType(), ids); + } + } + return unique; + } + + public MessageType setRequestedSchema(MessageType projection, boolean useColumnId) { paths.clear(); - for (ColumnDescriptor col : projection.getColumns()) { + MessageType schema = null; + if (useColumnId) { + HashSet ids = new HashSet<>(); + boolean fileSchemaIdUnique = uniqueId(fileMetaData.getSchema(), ids); + if (!fileSchemaIdUnique) { + throw new RuntimeException("can't use column id resolution because there are duplicate column ids."); + } + ids = new HashSet<>(); + boolean projectionSchemaIdUnique = uniqueId(projection, ids); + if (!projectionSchemaIdUnique) { + throw new RuntimeException("can't use column id resolution because there are duplicate column ids."); + } + schema = resetColumnNameBasedOnId(projection); + } else { + schema = projection; + } + for (ColumnDescriptor col : schema.getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } + return schema; + } + + private MessageType resetColumnNameBasedOnId(MessageType schema) { + List fields = schema.getFields(); + List resetFields = resetColumnNameInFields(fields); + return new MessageType(schema.getName(), resetFields); + } + + private List resetColumnNameInFields(List fields) { + List resetFields = new ArrayList<>(); + for (Type childField : fields) { + Type resetChildField = resetColumnNameInField(childField); + if (resetChildField != null) { + resetFields.add(resetChildField); + } + } + return resetFields; + } + + private Type resetColumnNameInField(Type field) { + String fieldName = field.getName(); + Type resetField = null; + if (field.isPrimitive()) { + Type.ID id = field.getId(); + List descriptors = fileMetaData.getSchema().getColumns(); + for (ColumnDescriptor c : descriptors) { + Type.ID idInFileMetaData = c.getPrimitiveType().getId(); + if (idInFileMetaData != null && id != null && idInFileMetaData.intValue() == id.intValue()) { + fieldName = c.getPrimitiveType().getName(); + } + } + resetField = new PrimitiveType(field.getRepetition(), field.asPrimitiveType().getPrimitiveTypeName(), fieldName); + } else { + List childFields = ((GroupType) field).getFields(); + List resetFields = resetColumnNameInFields(childFields); + if (resetFields.size() > 0) { + resetField = ((GroupType) field).withNewFields(resetFields); + } + } + + return resetField; } public void appendTo(ParquetFileWriter writer) throws IOException { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index f46f18211a..e07b562b4e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -151,6 +151,8 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata"; + public static final String COLUMN_ID_RESOLUTION = "parquet.column.id.resolution"; + /** * key to turn off file splitting. See PARQUET-246. */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index eb910074c3..230745e78d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -23,6 +23,8 @@ import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration; import java.io.IOException; +import java.util.HashSet; +import java.util.List; import java.util.Objects; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,10 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +159,7 @@ public static enum JobSummaryLevel { public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + public static final String COLUMN_ID_RESOLUTION = "parquet.column.id.resolution"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -287,6 +294,14 @@ public static int getPageSize(Configuration configuration) { return configuration.getInt(PAGE_SIZE, ParquetProperties.DEFAULT_PAGE_SIZE); } + public static boolean getColumnIdResolution(Configuration configuration) { + return configuration.getBoolean(COLUMN_ID_RESOLUTION, ParquetProperties.DEFAULT_COLUMN_ID_RESOLUTION); + } + + public static void setColumnIdResolution(Configuration conf, boolean columnIdResolution) { + conf.setBoolean(COLUMN_ID_RESOLUTION, columnIdResolution); + } + public static int getDictionaryPageSize(Configuration configuration) { return configuration.getInt( DICTIONARY_PAGE_SIZE, ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE); @@ -452,7 +467,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)) .withBloomFilterEnabled(getBloomFilterEnabled(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) - .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)); + .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) + .withColumnIdResolution(getColumnIdResolution(conf)); new ColumnConfigParser() .withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding) .withColumnConfig(BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false), @@ -472,7 +488,13 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.debug("Parquet properties are:\n{}", props); WriteContext fileWriteContext = writeSupport.init(conf); - + + MessageType schema = fileWriteContext.getSchema(); + if (props.getColumnIdResolution()) { + HashSet ids = new HashSet<>(); + checkDuplicateId(schema, ids); + } + FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext); ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf), @@ -507,6 +529,32 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp conf); } + private void checkDuplicateId(GroupType schema, HashSet ids) { + List fields = schema.getFields(); + for (Type field : fields) { + if (field instanceof PrimitiveType) { + Type.ID id = field.getId(); + if (id != null) { + if (ids.contains(id)) { + throw new RuntimeException("can't use column id resolution because there are duplicate column ids."); + } + ids.add(id); + } + } + + if (field instanceof GroupType) { + Type.ID id = field.getId(); + if (id != null) { + if (ids.contains(id)) { + throw new RuntimeException("can't use column id resolution because there are duplicate column ids."); + } + ids.add(id); + } + checkDuplicateId(field.asGroupType(), ids); + } + } + } + /** * @param configuration to find the configuration for the write support class * @return the configured write support diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java index 3d18e1c3ad..c41695391f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java @@ -30,6 +30,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.junit.Before; import org.junit.Rule; @@ -71,8 +72,8 @@ public void createDataFile() throws Exception { this.path = new Path(file.toString()); MessageType type = Types.buildMessage() - .required(INT64).named("id") - .required(BINARY).as(UTF8).named("data") + .addField(Types.required(INT64).id(1).named("id")) + .addField(Types.required(BINARY).as(UTF8).named("data")) .named("test"); SimpleGroupFactory factory = new SimpleGroupFactory(type); @@ -97,6 +98,7 @@ public void createDataFile() throws Exception { @Test public void testNormalFilter() throws Exception { assertEquals(500, countFilteredRecords(path, lt(longColumn("id"), 500L))); + assertEquals(500, countFilteredRecords(path, lt(longColumn(new Type.ID(1)), 500L))); } @Test @@ -108,6 +110,8 @@ public void testSimpleMissingColumnFilter() throws Exception { values.add(5L); assertEquals(0, countFilteredRecords(path, in(longColumn("missing"), values))); assertEquals(1000, countFilteredRecords(path, notIn(longColumn("missing"), values))); + assertEquals(0, countFilteredRecords(path, in(longColumn(new Type.ID(11)), values))); + assertEquals(1000, countFilteredRecords(path, notIn(longColumn(new Type.ID(11)), values))); } @Test @@ -183,6 +187,79 @@ public void testAndMissingColumnFilter() throws Exception { ))); } + @Test + public void testAndMissingColumnFilterForID() throws Exception { + // missing column filter is true + assertEquals(500, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + eq(binaryColumn(new Type.ID(11)), null) + ))); + assertEquals(500, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + notEq(binaryColumn(new Type.ID(11)), fromString("any")) + ))); + + assertEquals(500, countFilteredRecords(path, and( + eq(binaryColumn(new Type.ID(11)), null), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(500, countFilteredRecords(path, and( + notEq(binaryColumn(new Type.ID(11)), fromString("any")), + lt(longColumn(new Type.ID(1)), 500L) + ))); + + // missing column filter is false + assertEquals(0, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + eq(binaryColumn(new Type.ID(11)), fromString("any")) + ))); + assertEquals(0, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + notEq(binaryColumn(new Type.ID(11)), null) + ))); + assertEquals(0, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + lt(doubleColumn(new Type.ID(11)), 33.33) + ))); + assertEquals(0, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + ltEq(doubleColumn(new Type.ID(11)), 33.33) + ))); + assertEquals(0, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + gt(doubleColumn(new Type.ID(11)), 33.33) + ))); + assertEquals(0, countFilteredRecords(path, and( + lt(longColumn(new Type.ID(1)), 500L), + gtEq(doubleColumn(new Type.ID(11)), 33.33) + ))); + + assertEquals(0, countFilteredRecords(path, and( + eq(binaryColumn(new Type.ID(11)), fromString("any")), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(0, countFilteredRecords(path, and( + notEq(binaryColumn(new Type.ID(11)), null), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(0, countFilteredRecords(path, and( + lt(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(0, countFilteredRecords(path, and( + ltEq(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(0, countFilteredRecords(path, and( + gt(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(0, countFilteredRecords(path, and( + gtEq(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + } + @Test public void testOrMissingColumnFilter() throws Exception { // missing column filter is false @@ -256,6 +333,79 @@ public void testOrMissingColumnFilter() throws Exception { ))); } + @Test + public void testOrMissingColumnFilterForID() throws Exception { + // missing column filter is false + assertEquals(500, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + eq(binaryColumn(new Type.ID(11)), fromString("any")) + ))); + assertEquals(500, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + notEq(binaryColumn(new Type.ID(11)), null) + ))); + assertEquals(500, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + lt(doubleColumn(new Type.ID(11)), 33.33) + ))); + assertEquals(500, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + ltEq(doubleColumn(new Type.ID(11)), 33.33) + ))); + assertEquals(500, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + gt(doubleColumn(new Type.ID(11)), 33.33) + ))); + assertEquals(500, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + gtEq(doubleColumn(new Type.ID(11)), 33.33) + ))); + + assertEquals(500, countFilteredRecords(path, or( + eq(binaryColumn(new Type.ID(11)), fromString("any")), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(500, countFilteredRecords(path, or( + notEq(binaryColumn(new Type.ID(11)), null), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(500, countFilteredRecords(path, or( + lt(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(500, countFilteredRecords(path, or( + ltEq(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(500, countFilteredRecords(path, or( + gt(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(500, countFilteredRecords(path, or( + gtEq(doubleColumn(new Type.ID(11)), 33.33), + lt(longColumn(new Type.ID(1)), 500L) + ))); + + // missing column filter is false + assertEquals(1000, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + eq(binaryColumn(new Type.ID(11)), null) + ))); + assertEquals(1000, countFilteredRecords(path, or( + lt(longColumn(new Type.ID(1)), 500L), + notEq(binaryColumn(new Type.ID(11)), fromString("any")) + ))); + + assertEquals(1000, countFilteredRecords(path, or( + eq(binaryColumn(new Type.ID(11)), null), + lt(longColumn(new Type.ID(1)), 500L) + ))); + assertEquals(1000, countFilteredRecords(path, or( + notEq(binaryColumn(new Type.ID(11)), fromString("any")), + lt(longColumn(new Type.ID(1)), 500L) + ))); + } + public static long countFilteredRecords(Path path, FilterPredicate pred) throws IOException{ ParquetReader reader = ParquetReader .builder(new GroupReadSupport(), path) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java index 40527775bf..e241f22ae9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.HashSet; +import org.apache.parquet.schema.Type; import org.junit.Test; import org.apache.parquet.column.statistics.IntStatistics; @@ -64,7 +65,6 @@ public void testApplyRowGroupFilters() { BlockMetaData b3 = makeBlockFromStats(stats3, 303); blocks.add(b3); - IntStatistics stats4 = new IntStatistics(); stats4.setMinMax(0, 0); stats4.setNumNulls(304); @@ -84,9 +84,16 @@ public void testApplyRowGroupFilters() { BlockMetaData b6 = makeBlockFromStats(stats6, 306); blocks.add(b6); - MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }"); + MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo = 15; }"); IntColumn foo = intColumn("foo"); + testApplyRowGroupFiltersHelper(foo, blocks, schema, b1, b2, b3, b4, b5, b6); + IntColumn fooUsingID = intColumn(new Type.ID(15)); + testApplyRowGroupFiltersHelper(fooUsingID, blocks, schema, b1, b2, b3, b4, b5, b6); + } + private void testApplyRowGroupFiltersHelper(IntColumn foo, List blocks, MessageType schema, + BlockMetaData b1, BlockMetaData b2, BlockMetaData b3, + BlockMetaData b4, BlockMetaData b5, BlockMetaData b6) { Set set1 = new HashSet<>(); set1.add(9); set1.add(10); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index 6355f35c3c..0f86332a74 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -40,16 +40,16 @@ public class PhoneBookWriter { private static final String schemaString = "message user {\n" - + " required int64 id;\n" - + " optional binary name (UTF8);\n" - + " optional group location {\n" - + " optional double lon;\n" - + " optional double lat;\n" + + " required int64 id = 1;\n" + + " optional binary name (UTF8) = 2;\n" + + " optional group location = 7 {\n" + + " optional double lon = 3;\n" + + " optional double lat = 4;\n" + " }\n" - + " optional group phoneNumbers {\n" - + " repeated group phone {\n" - + " required int64 number;\n" - + " optional binary kind (UTF8);\n" + + " optional group phoneNumbers = 9 {\n" + + " repeated group phone = 8 {\n" + + " required int64 number = 5;\n" + + " optional binary kind (UTF8) = 6;\n" + " }\n" + " }\n" + "}\n"; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 4c3538c3d5..931754a4b0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.HashSet; +import org.apache.parquet.schema.Type; import org.junit.BeforeClass; import org.junit.Test; @@ -140,7 +141,12 @@ public boolean keep(User u) { @Test public void testAllFilter() throws Exception { BinaryColumn name = binaryColumn("name"); + BinaryColumn nameAsID = binaryColumn(new Type.ID(2)); + testAllFilterHelper(name); + testAllFilterHelper(nameAsID); + } + public void testAllFilterHelper(BinaryColumn name) throws Exception { FilterPredicate pred = eq(name, Binary.fromString("no matches")); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); @@ -150,7 +156,12 @@ public void testAllFilter() throws Exception { @Test public void testInFilter() throws Exception { BinaryColumn name = binaryColumn("name"); + BinaryColumn nameAsID = binaryColumn(new Type.ID(2)); + testInFilterHelper(name); + testInFilterHelper(nameAsID); + } + public void testInFilterHelper(BinaryColumn name) throws Exception { HashSet nameSet = new HashSet<>(); nameSet.add(Binary.fromString("thing2")); nameSet.add(Binary.fromString("thing1")); @@ -183,7 +194,12 @@ public void testInFilter() throws Exception { @Test public void testNameNotNull() throws Exception { BinaryColumn name = binaryColumn("name"); + BinaryColumn nameAsID = binaryColumn(new Type.ID(2)); + testNameNotNullHelper(name); + testNameNotNullHelper(nameAsID); + } + public void testNameNotNullHelper(BinaryColumn name) throws Exception { FilterPredicate pred = notEq(name, null); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); @@ -248,7 +264,12 @@ public boolean inverseCanDrop(Statistics statistics) { @Test public void testNameNotStartWithP() throws Exception { BinaryColumn name = binaryColumn("name"); + BinaryColumn nameAsID = binaryColumn(new Type.ID(2)); + testNameNotStartWithPHelper(name); + testNameNotStartWithPHelper(nameAsID); + } + public void testNameNotStartWithPHelper(BinaryColumn name) throws Exception { FilterPredicate pred = not(userDefined(name, StartWithP.class)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); @@ -287,7 +308,14 @@ public void testComplex() throws Exception { BinaryColumn name = binaryColumn("name"); DoubleColumn lon = doubleColumn("location.lon"); DoubleColumn lat = doubleColumn("location.lat"); + testComplexHelper(name, lon, lat); + BinaryColumn nameAsID = binaryColumn(new Type.ID(2)); + DoubleColumn lonAsID = doubleColumn(new Type.ID(3)); + DoubleColumn latAsID = doubleColumn(new Type.ID(4)); + testComplexHelper(nameAsID, lonAsID, latAsID); + } + public void testComplexHelper(BinaryColumn name, DoubleColumn lon, DoubleColumn lat) throws Exception { FilterPredicate pred = or(and(gt(lon, 150.0), notEq(lat, null)), eq(name, Binary.fromString("alice"))); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java index b07fccddde..df41325359 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.Type; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -339,6 +340,41 @@ record -> "dummy".equals(record.getName()), in(binaryColumn("name"), values3)); } + @Test + public void testSimpleFilteringForID() throws IOException { + assertCorrectFiltering( + record -> record.getId() == 1234L, + eq(longColumn(new Type.ID(1)), 1234L)); + + assertCorrectFiltering( + record -> "miller".equals(record.getName()), + eq(binaryColumn(new Type.ID(2)), Binary.fromString("miller"))); + + Set values1 = new HashSet<>(); + values1.add(Binary.fromString("miller")); + values1.add(Binary.fromString("anderson")); + + assertCorrectFiltering( + record -> "miller".equals(record.getName()) || "anderson".equals(record.getName()), + in(binaryColumn(new Type.ID(2)), values1)); + + Set values2 = new HashSet<>(); + values2.add(Binary.fromString("miller")); + values2.add(Binary.fromString("alien")); + + assertCorrectFiltering( + record -> "miller".equals(record.getName()), + in(binaryColumn(new Type.ID(2)), values2)); + + Set values3 = new HashSet<>(); + values3.add(Binary.fromString("alien")); + values3.add(Binary.fromString("predator")); + + assertCorrectFiltering( + record -> "dummy".equals(record.getName()), + in(binaryColumn(new Type.ID(2)), values3)); + } + @Test public void testNestedFiltering() throws IOException { assertCorrectFiltering( @@ -348,4 +384,14 @@ record -> { }, eq(doubleColumn("location.lat"), 99.9)); } + + @Test + public void testNestedFilteringForID() throws IOException { + assertCorrectFiltering( + record -> { + PhoneBookWriter.Location location = record.getLocation(); + return location != null && location.getLat() != null && location.getLat() == 99.9; + }, + eq(doubleColumn(new Type.ID(4)), 99.9)); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index 5e181059f0..41b199c751 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -87,6 +87,7 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -377,9 +378,13 @@ public static void deleteFiles() throws IOException { @Test public void testSimpleFiltering() throws IOException { + assertCorrectFiltering( + record -> record.getId() == 1234, + eq(longColumn("id"), 1234l)); + assertCorrectFiltering( record -> record.getId() == 1234, - eq(longColumn("id"), 1234l)); + eq(longColumn(new Type.ID(1)), 1234l)); Set idSet = new HashSet<>(); idSet.add(1234l); @@ -395,10 +400,18 @@ record -> (record.getId() == 1234 || record.getId() == 5678 || record.getId() == record.getId() == 111 || record.getId() == 6666 || record.getId() == 2 || record.getId() == 2468), in(longColumn("id"), idSet) ); + assertCorrectFiltering( + record -> (record.getId() == 1234 || record.getId() == 5678 || record.getId() == 1357 || + record.getId() == 111 || record.getId() == 6666 || record.getId() == 2 || record.getId() == 2468), + in(longColumn(new Type.ID(1)), idSet) + ); + assertCorrectFiltering( + record -> "miller".equals(record.getName()), + eq(binaryColumn("name"), Binary.fromString("miller"))); assertCorrectFiltering( record -> "miller".equals(record.getName()), - eq(binaryColumn("name"), Binary.fromString("miller"))); + eq(binaryColumn(new Type.ID(2)), Binary.fromString("miller"))); Set nameSet = new HashSet<>(); nameSet.add(Binary.fromString("anderson")); @@ -411,10 +424,18 @@ record -> ("anderson".equals(record.getName()) || "miller".equals(record.getName "thomas".equals(record.getName()) || "williams".equals(record.getName())), in(binaryColumn("name"), nameSet) ); + assertCorrectFiltering( + record -> ("anderson".equals(record.getName()) || "miller".equals(record.getName()) || + "thomas".equals(record.getName()) || "williams".equals(record.getName())), + in(binaryColumn(new Type.ID(2)), nameSet) + ); + assertCorrectFiltering( + record -> record.getName() == null, + eq(binaryColumn("name"), null)); assertCorrectFiltering( record -> record.getName() == null, - eq(binaryColumn("name"), null)); + eq(binaryColumn(new Type.ID(2)), null)); Set nullSet = new HashSet<>(); nullSet.add(null); @@ -422,6 +443,9 @@ record -> record.getName() == null, assertCorrectFiltering( record -> record.getName() == null, in(binaryColumn("name"), nullSet)); + assertCorrectFiltering( + record -> record.getName() == null, + in(binaryColumn(new Type.ID(2)), nullSet)); } @Test @@ -442,10 +466,18 @@ public void testNoFiltering() throws IOException { assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()), readUsers(eq(binaryColumn("name"), null), true, false)); + // Column index filtering turned off + assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()), + readUsers(eq(longColumn(new Type.ID(1)), 1234l), true, false)); + assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()), + readUsers(eq(binaryColumn(new Type.ID(2)), Binary.fromString("miller")), true, false)); + assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()), + readUsers(eq(binaryColumn(new Type.ID(2)), null), true, false)); + // Every filtering mechanism turned off - assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false)); - assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false)); - assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false)); + assertEquals(DATA, readUsers(eq(longColumn(new Type.ID(1)), 1234l), false, false)); + assertEquals(DATA, readUsers(eq(binaryColumn(new Type.ID(2)), Binary.fromString("miller")), false, false)); + assertEquals(DATA, readUsers(eq(binaryColumn(new Type.ID(2)), null), false, false)); } @Test @@ -459,6 +491,16 @@ record -> { }, and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)), and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0)))); + assertCorrectFiltering( + record -> { + Location loc = record.getLocation(); + Double lat = loc == null ? null : loc.getLat(); + Double lon = loc == null ? null : loc.getLon(); + return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35; + }, + and(and(gtEq(doubleColumn(new Type.ID(4)), 37.0), ltEq(doubleColumn(new Type.ID(4)), 70.0)), + and(gtEq(doubleColumn(new Type.ID(3)), -21.0), ltEq(doubleColumn(new Type.ID(3)), 35.0)))); + assertCorrectFiltering( record -> { Location loc = record.getLocation(); @@ -466,11 +508,24 @@ record -> { }, and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null))); assertCorrectFiltering( - record -> { - String name = record.getName(); - return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4; - }, - and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4))); + record -> { + Location loc = record.getLocation(); + return loc == null || (loc.getLat() == null && loc.getLon() == null); + }, + and(eq(doubleColumn(new Type.ID(4)), null), eq(doubleColumn(new Type.ID(3)), null))); + + assertCorrectFiltering( + record -> { + String name = record.getName(); + return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4; + }, + and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4))); + assertCorrectFiltering( + record -> { + String name = record.getName(); + return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4; + }, + and(lt(binaryColumn(new Type.ID(2)), Binary.fromString("thomas")), ltEq(longColumn(new Type.ID(1)), 3l * DATA.size() / 4))); } public static class NameStartsWithVowel extends UserDefinedPredicate { @@ -571,6 +626,15 @@ record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.ge record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0), not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class), userDefined(longColumn("id"), new IsDivisibleBy(234))))); + + assertCorrectFiltering( + record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0, + or(userDefined(binaryColumn(new Type.ID(2)), NameStartsWithVowel.class), + userDefined(longColumn(new Type.ID(1)), new IsDivisibleBy(234)))); + assertCorrectFiltering( + record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0), + not(or(userDefined(binaryColumn(new Type.ID(2)), NameStartsWithVowel.class), + userDefined(longColumn(new Type.ID(1)), new IsDivisibleBy(234))))); } @Test @@ -596,6 +660,28 @@ record -> "miller".equals(record.getName()), record -> record.getId() == 1234, or(eq(longColumn("id"), 1234l), userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1)))); + + // Missing column filter is always true + assertEquals(DATA, readUsers(notEq(binaryColumn(new Type.ID(10)), Binary.EMPTY), true)); + assertCorrectFiltering( + record -> record.getId() == 1234, + and(eq(longColumn(new Type.ID(1)), 1234l), + eq(longColumn(new Type.ID(10)), null))); + assertCorrectFiltering( + record -> "miller".equals(record.getName()), + and(eq(binaryColumn(new Type.ID(2)), Binary.fromString("miller")), + invert(userDefined(binaryColumn(new Type.ID(10)), NameStartsWithVowel.class)))); + + // Missing column filter is always false + assertEquals(emptyList(), readUsers(lt(longColumn(new Type.ID(10)), 0l), true)); + assertCorrectFiltering( + record -> "miller".equals(record.getName()), + or(eq(binaryColumn(new Type.ID(2)), Binary.fromString("miller")), + gtEq(binaryColumn(new Type.ID(10)), Binary.EMPTY))); + assertCorrectFiltering( + record -> record.getId() == 1234, + or(eq(longColumn(new Type.ID(1)), 1234l), + userDefined(longColumn(new Type.ID(10)), new IsDivisibleBy(1)))); } @Test @@ -613,5 +699,19 @@ public void testFilteringWithProjection() throws IOException { emptyList(), readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)), SCHEMA_WITHOUT_NAME, false, true)); + + // All rows shall be retrieved because all values in column 'name' shall be handled as null values + assertEquals( + DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()), + readUsersWithProjection(FilterCompat.get(eq(binaryColumn(new Type.ID(2)), null)), SCHEMA_WITHOUT_NAME, true, true)); + + // Column index filter shall drop all pages because all values in column 'name' shall be handled as null values + assertEquals( + emptyList(), + readUsersWithProjection(FilterCompat.get(notEq(binaryColumn(new Type.ID(2)), null)), SCHEMA_WITHOUT_NAME, false, true)); + assertEquals( + emptyList(), + readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn(new Type.ID(2)), NameStartsWithVowel.class)), + SCHEMA_WITHOUT_NAME, false, true)); } } diff --git a/pom.xml b/pom.xml index ad36b71d54..717a20ef6a 100644 --- a/pom.xml +++ b/pom.xml @@ -519,6 +519,7 @@ change to fix a integer overflow issue. TODO: remove this after Parquet 1.13 release --> org.apache.parquet.column.values.dictionary.DictionaryValuesWriter#dictionaryByteSize + org.apache.parquet.hadoop.ParquetFileReader