-
Notifications
You must be signed in to change notification settings - Fork 1.5k
PARQUET-2006: Column resolution by ID #950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,7 @@ public class ParquetProperties { | |
| public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; | ||
| public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024; | ||
| public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false; | ||
| public static final boolean DEFAULT_COLUMN_ID_RESOLUTION = false; | ||
|
|
||
| public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; | ||
|
|
||
|
|
@@ -105,6 +106,7 @@ public static WriterVersion fromString(String name) { | |
| private final int pageRowCountLimit; | ||
| private final boolean pageWriteChecksumEnabled; | ||
| private final boolean enableByteStreamSplit; | ||
| private final boolean columnIdResolution; | ||
|
|
||
| private ParquetProperties(Builder builder) { | ||
| this.pageSizeThreshold = builder.pageSize; | ||
|
|
@@ -127,6 +129,7 @@ private ParquetProperties(Builder builder) { | |
| this.pageRowCountLimit = builder.pageRowCountLimit; | ||
| this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; | ||
| this.enableByteStreamSplit = builder.enableByteStreamSplit; | ||
| this.columnIdResolution = builder.columnIdResolution; | ||
| } | ||
|
|
||
| public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { | ||
|
|
@@ -266,6 +269,10 @@ public int getMaxBloomFilterBytes() { | |
| return maxBloomFilterBytes; | ||
| } | ||
|
|
||
| public boolean getColumnIdResolution() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to use the verb from the setting, rather than adding |
||
| return columnIdResolution; | ||
| } | ||
|
|
||
| public static Builder builder() { | ||
| return new Builder(); | ||
| } | ||
|
|
@@ -310,6 +317,7 @@ public static class Builder { | |
| private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; | ||
| private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; | ||
| private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED; | ||
| private boolean columnIdResolution = DEFAULT_COLUMN_ID_RESOLUTION; | ||
|
|
||
| private Builder() { | ||
| enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); | ||
|
|
@@ -511,6 +519,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) { | |
| return this; | ||
| } | ||
|
|
||
| public Builder withColumnIdResolution(boolean columnIdResolution) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| this.columnIdResolution = columnIdResolution; | ||
| return this; | ||
| } | ||
|
|
||
| public ParquetProperties build() { | ||
| ParquetProperties properties = new ParquetProperties(this); | ||
| // we pass a constructed but uninitialized factory to ParquetProperties above as currently | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,7 @@ | |
| import org.apache.parquet.filter2.predicate.Operators.UserDefined; | ||
| import org.apache.parquet.filter2.predicate.Operators.UserDefinedByClass; | ||
| import org.apache.parquet.filter2.predicate.Operators.UserDefinedByInstance; | ||
| import org.apache.parquet.schema.Type; | ||
|
|
||
| /** | ||
| * The Filter API is expressed through these static methods. | ||
|
|
@@ -72,26 +73,50 @@ public static IntColumn intColumn(String columnPath) { | |
| return new IntColumn(ColumnPath.fromDotString(columnPath)); | ||
| } | ||
|
|
||
| public static IntColumn intColumn(Type.ID id) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @huaxingao and @shangxinli, has the Parquet community considered using Iceberg expressions and filters? I know that's a separate change, but it would be a great way to pick up a cleaner filter API that handles expression binding. |
||
| return new IntColumn(id); | ||
| } | ||
|
|
||
| public static LongColumn longColumn(String columnPath) { | ||
| return new LongColumn(ColumnPath.fromDotString(columnPath)); | ||
| } | ||
|
|
||
| public static LongColumn longColumn(Type.ID id) { | ||
| return new LongColumn(id); | ||
| } | ||
|
|
||
| public static FloatColumn floatColumn(String columnPath) { | ||
| return new FloatColumn(ColumnPath.fromDotString(columnPath)); | ||
| } | ||
|
|
||
| public static FloatColumn floatColumn(Type.ID id) { | ||
| return new FloatColumn(id); | ||
| } | ||
|
|
||
| public static DoubleColumn doubleColumn(String columnPath) { | ||
| return new DoubleColumn(ColumnPath.fromDotString(columnPath)); | ||
| } | ||
|
|
||
| public static DoubleColumn doubleColumn(Type.ID id) { | ||
| return new DoubleColumn(id); | ||
| } | ||
|
|
||
| public static BooleanColumn booleanColumn(String columnPath) { | ||
| return new BooleanColumn(ColumnPath.fromDotString(columnPath)); | ||
| } | ||
|
|
||
| public static BooleanColumn booleanColumn(Type.ID id) { | ||
| return new BooleanColumn(id); | ||
| } | ||
|
|
||
| public static BinaryColumn binaryColumn(String columnPath) { | ||
| return new BinaryColumn(ColumnPath.fromDotString(columnPath)); | ||
| } | ||
|
|
||
| public static BinaryColumn binaryColumn(Type.ID id) { | ||
| return new BinaryColumn(id); | ||
| } | ||
|
|
||
| /** | ||
| * Keeps records if their value is equal to the provided value. | ||
| * Nulls are treated the same way the java programming language does. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
|
|
||
| import org.apache.parquet.hadoop.metadata.ColumnPath; | ||
| import org.apache.parquet.io.api.Binary; | ||
| import org.apache.parquet.schema.Type; | ||
|
|
||
| import static org.apache.parquet.Preconditions.checkArgument; | ||
|
|
||
|
|
@@ -36,9 +37,15 @@ public final class Operators { | |
| private Operators() { } | ||
|
|
||
| public static abstract class Column<T extends Comparable<T>> implements Serializable { | ||
| private final ColumnPath columnPath; | ||
| private Type.ID columnId = null; | ||
| private ColumnPath columnPath = null; | ||
| private final Class<T> columnType; | ||
|
|
||
| protected Column(Type.ID columnId, Class<T> columnType) { | ||
| this.columnId = columnId; | ||
| this.columnType = Objects.requireNonNull(columnType, "columnType cannot be null"); | ||
| } | ||
|
|
||
| protected Column(ColumnPath columnPath, Class<T> columnType) { | ||
| this.columnPath = Objects.requireNonNull(columnPath, "columnPath cannot be null");; | ||
| this.columnType = Objects.requireNonNull(columnType, "columnType cannot be null");; | ||
|
|
@@ -48,10 +55,18 @@ public Class<T> getColumnType() { | |
| return columnType; | ||
| } | ||
|
|
||
| public Type.ID getColumnId() { | ||
| return columnId; | ||
| } | ||
|
|
||
| public ColumnPath getColumnPath() { | ||
| return columnPath; | ||
| } | ||
|
|
||
| public void setColumnPath(ColumnPath columnPath) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this add a setter for column path? |
||
| this.columnPath = columnPath; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "column(" + columnPath.toDotString() + ")"; | ||
|
|
@@ -82,36 +97,60 @@ public static interface SupportsEqNotEq { } // marker for columns that can be us | |
| public static interface SupportsLtGt extends SupportsEqNotEq { } // marker for columns that can be used with lt(), ltEq(), gt(), gtEq() | ||
|
|
||
| public static final class IntColumn extends Column<Integer> implements SupportsLtGt { | ||
| IntColumn(Type.ID columnId) { | ||
| super(columnId, Integer.class); | ||
| } | ||
|
|
||
| IntColumn(ColumnPath columnPath) { | ||
| super(columnPath, Integer.class); | ||
| } | ||
| } | ||
|
|
||
| public static final class LongColumn extends Column<Long> implements SupportsLtGt { | ||
| LongColumn(Type.ID columnId) { | ||
| super(columnId, Long.class); | ||
| } | ||
|
|
||
| LongColumn(ColumnPath columnPath) { | ||
| super(columnPath, Long.class); | ||
| } | ||
| } | ||
|
|
||
| public static final class DoubleColumn extends Column<Double> implements SupportsLtGt { | ||
| DoubleColumn(Type.ID columnId) { | ||
| super(columnId, Double.class); | ||
| } | ||
|
|
||
| DoubleColumn(ColumnPath columnPath) { | ||
| super(columnPath, Double.class); | ||
| } | ||
| } | ||
|
|
||
| public static final class FloatColumn extends Column<Float> implements SupportsLtGt { | ||
| FloatColumn(Type.ID columnId) { | ||
| super(columnId, Float.class); | ||
| } | ||
|
|
||
| FloatColumn(ColumnPath columnPath) { | ||
| super(columnPath, Float.class); | ||
| } | ||
| } | ||
|
|
||
| public static final class BooleanColumn extends Column<Boolean> implements SupportsEqNotEq { | ||
| BooleanColumn(Type.ID columnId) { | ||
| super(columnId, Boolean.class); | ||
| } | ||
|
|
||
| BooleanColumn(ColumnPath columnPath) { | ||
| super(columnPath, Boolean.class); | ||
| } | ||
| } | ||
|
|
||
| public static final class BinaryColumn extends Column<Binary> implements SupportsLtGt { | ||
| BinaryColumn(Type.ID columnId) { | ||
| super(columnId, Binary.class); | ||
| } | ||
|
|
||
| BinaryColumn(ColumnPath columnPath) { | ||
| super(columnPath, Binary.class); | ||
| } | ||
|
|
@@ -131,7 +170,12 @@ protected ColumnFilterPredicate(Column<T> column, T value) { | |
| this.value = value; | ||
|
|
||
| String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); | ||
| this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")"; | ||
|
|
||
| if (column.getColumnPath() != null) { | ||
| this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")"; | ||
| } else { | ||
| this.toString = name + "(" + column.getColumnId() + ", " + value + ")"; | ||
| } | ||
| } | ||
|
|
||
| public Column<T> getColumn() { | ||
|
|
@@ -276,7 +320,11 @@ public Set<T> getValues() { | |
| public String toString() { | ||
| String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); | ||
| StringBuilder str = new StringBuilder(); | ||
| str.append(name).append("(").append(column.getColumnPath().toDotString()).append(", "); | ||
| if (column.getColumnPath() != null) { | ||
| str.append(name).append("(").append(column.getColumnPath().toDotString()).append(", "); | ||
| } else { | ||
| str.append(name).append("(").append(column.getColumnId()).append(", "); | ||
| } | ||
| int iter = 0; | ||
| for (T value : values) { | ||
| if (iter >= 100) break; | ||
|
|
@@ -464,7 +512,11 @@ public static final class UserDefinedByClass<T extends Comparable<T>, U extends | |
| super(column); | ||
| this.udpClass = Objects.requireNonNull(udpClass, "udpClass cannot be null"); | ||
| String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); | ||
| this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")"; | ||
| if (column.getColumnPath() != null) { | ||
| this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")"; | ||
| } else { | ||
| this.toString = name + "(" + column.getColumnId() + ", " + udpClass.getName() + ")"; | ||
| } | ||
|
|
||
| // defensively try to instantiate the class early to make sure that it's possible | ||
| getUserDefinedPredicate(); | ||
|
|
@@ -518,7 +570,11 @@ public static final class UserDefinedByInstance<T extends Comparable<T>, U exten | |
| super(column); | ||
| this.udpInstance = Objects.requireNonNull(udpInstance, "udpInstance cannot be null"); | ||
| String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); | ||
| this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")"; | ||
| if (column.getColumnPath() != null) { | ||
| this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")"; | ||
| } else { | ||
| this.toString = name + "(" + column.getColumnId() + ", " + udpInstance + ")"; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,10 @@ | |
| */ | ||
| package org.apache.parquet.filter2.predicate; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
|
|
@@ -41,6 +44,7 @@ | |
| import org.apache.parquet.filter2.predicate.Operators.UserDefined; | ||
| import org.apache.parquet.hadoop.metadata.ColumnPath; | ||
| import org.apache.parquet.schema.MessageType; | ||
| import org.apache.parquet.schema.Type; | ||
|
|
||
| /** | ||
| * Inspects the column types found in the provided {@link FilterPredicate} and compares them | ||
|
|
@@ -170,6 +174,24 @@ private <T extends Comparable<T>> void validateColumnFilterPredicate(SetColumnFi | |
|
|
||
| private <T extends Comparable<T>> void validateColumn(Column<T> column) { | ||
| ColumnPath path = column.getColumnPath(); | ||
| HashSet<Type.ID> ids = new HashSet<>(); | ||
|
||
| if (path == null) { | ||
| Type.ID id = column.getColumnId(); | ||
| List<ColumnDescriptor> columnDescriptors = new ArrayList<>(columnsAccordingToSchema.values()); | ||
| for (ColumnDescriptor columnDescriptor : columnDescriptors) { | ||
| Type.ID columnId = columnDescriptor.getId(); | ||
| if (columnId != null) { | ||
| if (ids.contains(columnId)) { | ||
| throw new RuntimeException("duplicate id"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I doubt that this is the right place to catch duplicate column IDs. Also, I think it should probably throw an exception more specific than |
||
| } | ||
| ids.add(columnId); | ||
| if (columnId.intValue() == id.intValue()) { | ||
| column.setColumnPath(ColumnPath.get(columnDescriptor.getPath())); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I appreciate not wanting to change the validation logic, but making the filter I think this should validate directly using IDs instead. |
||
| path = column.getColumnPath(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Class<?> alreadySeen = columnTypesEncountered.get(path); | ||
| if (alreadySeen != null && !alreadySeen.equals(column.getColumnType())) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I would expect to deprecate the old one.