From 7caa4dc767a3f8a50b4044a3e7e59eb88f3847b3 Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Mon, 2 Feb 2015 12:23:49 +0530 Subject: [PATCH] PARQUET-116: Add ConfiguredUserDefined that takes a serialiazble udp directly --- .../parquet/filter2/predicate/FilterApi.java | 13 +- .../filter2/predicate/FilterPredicate.java | 9 +- .../predicate/LogicalInverseRewriter.java | 16 ++- .../filter2/predicate/LogicalInverter.java | 18 ++- .../parquet/filter2/predicate/Operators.java | 115 ++++++++++++++++-- .../SchemaCompatibilityValidator.java | 17 ++- .../predicate/UserDefinedPredicate.java | 20 +-- .../parquet/filter2/predicate/DummyUdp.java | 2 +- .../predicate/TestFilterApiMethods.java | 4 +- .../predicate/TestLogicalInverseRewriter.java | 9 +- .../predicate/TestLogicalInverter.java | 13 +- .../TestSchemaCompatibilityValidator.java | 9 +- ...ntallyUpdatedFilterPredicateGenerator.java | 25 +++- .../statisticslevel/StatisticsFilter.java | 32 ++++- .../recordlevel/TestRecordLevelFilters.java | 16 ++- .../statisticslevel/TestStatisticsFilter.java | 6 +- .../main/scala/parquet/filter2/dsl/Dsl.scala | 25 ++-- .../scala/parquet/filter2/dsl/DslTest.scala | 9 +- 18 files changed, 264 insertions(+), 94 deletions(-) diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java index 1ba7f8321c..56249014db 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java @@ -7,6 +7,7 @@ import parquet.filter2.predicate.Operators.BinaryColumn; import parquet.filter2.predicate.Operators.BooleanColumn; import parquet.filter2.predicate.Operators.Column; +import parquet.filter2.predicate.Operators.ConfiguredUserDefined; import parquet.filter2.predicate.Operators.DoubleColumn; import parquet.filter2.predicate.Operators.Eq; import parquet.filter2.predicate.Operators.FloatColumn; @@ -147,11 +148,17 @@ public static , C extends Column & SupportsLtGt> GtEq /** * Keeps records that pass the provided {@link UserDefinedPredicate} */ - public static , U extends UserDefinedPredicate, S extends Serializable> - UserDefined userDefined(Column column, Class clazz, S o) { - return new UserDefined(column, clazz, o); + public static , U extends UserDefinedPredicate> + UserDefined userDefined(Column column, Class clazz) { + return new UserDefined(column, clazz); } + public static , U extends UserDefinedPredicate & Serializable> + ConfiguredUserDefined userDefined(Column column, U udp) { + return new ConfiguredUserDefined (column, udp); + } + + /** * Constructs the logical and of two predicates. Records will be kept if both the left and right predicate agree * that the record should be kept. diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java index 57041c167d..2b2a468112 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java @@ -3,17 +3,18 @@ import java.io.Serializable; import parquet.filter2.predicate.Operators.And; +import parquet.filter2.predicate.Operators.ConfiguredUserDefined; import parquet.filter2.predicate.Operators.Eq; import parquet.filter2.predicate.Operators.Gt; import parquet.filter2.predicate.Operators.GtEq; import parquet.filter2.predicate.Operators.LogicalNotUserDefined; +import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined; import parquet.filter2.predicate.Operators.Lt; import parquet.filter2.predicate.Operators.LtEq; import parquet.filter2.predicate.Operators.Not; import parquet.filter2.predicate.Operators.NotEq; import parquet.filter2.predicate.Operators.Or; import parquet.filter2.predicate.Operators.UserDefined; - /** * A FilterPredicate is an expression tree describing the criteria for which records to keep when loading data from * a parquet file. These predicates are applied in multiple places. Currently, they are applied to all row groups at @@ -49,8 +50,10 @@ public static interface Visitor { R visit(And and); R visit(Or or); R visit(Not not); - , U extends UserDefinedPredicate, S extends Serializable> R visit(UserDefined udp); - , U extends UserDefinedPredicate, S extends Serializable> R visit(LogicalNotUserDefined udp); + , U extends UserDefinedPredicate > R visit(UserDefined udp); + , U extends UserDefinedPredicate & Serializable > R visit(ConfiguredUserDefined udp); + , U extends UserDefinedPredicate > R visit(LogicalNotUserDefined udp); + , U extends UserDefinedPredicate & Serializable > R visit(LogicalNotConfiguredUserDefined udp); } } diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java index 5c9d67ad4d..aaee078aec 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java @@ -4,10 +4,12 @@ import parquet.filter2.predicate.FilterPredicate.Visitor; import parquet.filter2.predicate.Operators.And; +import parquet.filter2.predicate.Operators.ConfiguredUserDefined; import parquet.filter2.predicate.Operators.Eq; import parquet.filter2.predicate.Operators.Gt; import parquet.filter2.predicate.Operators.GtEq; import parquet.filter2.predicate.Operators.LogicalNotUserDefined; +import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined; import parquet.filter2.predicate.Operators.Lt; import parquet.filter2.predicate.Operators.LtEq; import parquet.filter2.predicate.Operators.Not; @@ -86,12 +88,22 @@ public FilterPredicate visit(Not not) { } @Override - public , U extends UserDefinedPredicate, S extends Serializable> FilterPredicate visit(UserDefined udp) { + public , U extends UserDefinedPredicate > FilterPredicate visit(UserDefined udp) { return udp; } @Override - public , U extends UserDefinedPredicate, S extends Serializable> FilterPredicate visit(LogicalNotUserDefined udp) { + public , U extends UserDefinedPredicate & Serializable> FilterPredicate visit(ConfiguredUserDefined udp) { + return udp; + } + + @Override + public , U extends UserDefinedPredicate > FilterPredicate visit(LogicalNotUserDefined udp) { + return udp; + } + + @Override + public , U extends UserDefinedPredicate & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined udp) { return udp; } } diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java index a4016aff44..87e49a4194 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java @@ -4,10 +4,12 @@ import parquet.filter2.predicate.FilterPredicate.Visitor; import parquet.filter2.predicate.Operators.And; +import parquet.filter2.predicate.Operators.ConfiguredUserDefined; import parquet.filter2.predicate.Operators.Eq; import parquet.filter2.predicate.Operators.Gt; import parquet.filter2.predicate.Operators.GtEq; import parquet.filter2.predicate.Operators.LogicalNotUserDefined; +import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined; import parquet.filter2.predicate.Operators.Lt; import parquet.filter2.predicate.Operators.LtEq; import parquet.filter2.predicate.Operators.Not; @@ -81,12 +83,22 @@ public FilterPredicate visit(Not not) { } @Override - public , U extends UserDefinedPredicate, S extends Serializable> FilterPredicate visit(UserDefined udp) { - return new LogicalNotUserDefined(udp); + public , U extends UserDefinedPredicate > FilterPredicate visit(UserDefined udp) { + return new LogicalNotUserDefined(udp); } @Override - public , U extends UserDefinedPredicate, S extends Serializable> FilterPredicate visit(LogicalNotUserDefined udp) { + public , U extends UserDefinedPredicate & Serializable> FilterPredicate visit(ConfiguredUserDefined udp) { + return new LogicalNotConfiguredUserDefined(udp); + } + + @Override + public , U extends UserDefinedPredicate > FilterPredicate visit(LogicalNotUserDefined udp) { + return udp.getUserDefined(); + } + + @Override + public , U extends UserDefinedPredicate & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined udp) { return udp.getUserDefined(); } } diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java index 740c6ea48c..00894318c2 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java @@ -340,20 +340,19 @@ public int hashCode() { } } - public static final class UserDefined, U extends UserDefinedPredicate, S extends Serializable> implements FilterPredicate, Serializable { + public static final class UserDefined, U extends UserDefinedPredicate > implements FilterPredicate, Serializable { private final Column column; private final Class udpClass; private final String toString; - private final S udpConfig; + private static final String INSTANTIATION_ERROR_MESSAGE = "Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor."; - UserDefined(Column column, Class udpClass, S udpConfigParam) { + UserDefined(Column column, Class udpClass) { this.column = checkNotNull(column, "column"); this.udpClass = checkNotNull(udpClass, "udpClass"); String name = getClass().getSimpleName().toLowerCase(); this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")"; - this.udpConfig = udpConfigParam; // defensively try to instantiate the class early to make sure that it's possible getUserDefinedPredicate(); @@ -369,9 +368,7 @@ public Class getUserDefinedPredicateClass() { public U getUserDefinedPredicate() { try { - U udpInstance = udpClass.newInstance(); - udpInstance.configure(udpConfig); - return udpInstance; + return udpClass.newInstance(); } catch (InstantiationException e) { throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e); } catch (IllegalAccessException e) { @@ -411,18 +408,70 @@ public int hashCode() { } } + public static final class ConfiguredUserDefined, U extends UserDefinedPredicate & Serializable > implements FilterPredicate { + private final Column column; + private final U udp; + private final String toString; + + ConfiguredUserDefined(Column column, U udp) { + this.column = checkNotNull(column, "column"); + this.udp = checkNotNull(udp, "udp"); + String name = getClass().getSimpleName().toLowerCase(); + this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udp.getClass().getName() + ")"; + } + + public Column getColumn() { + return column; + } + + public U getUserDefinedPredicate() { + return udp; + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + + @Override + public String toString() { + return toString; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConfiguredUserDefined that = (ConfiguredUserDefined) o; + + if (!column.equals(that.column)) return false; + if (!udp.equals(that.udp)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = column.hashCode(); + result = 31 * result + udp.hashCode(); + result = result * 31 + getClass().hashCode(); + return result; + } + } + // Represents the inverse of a UserDefined. It is equivalent to not(userDefined), without the use // of the not() operator - public static final class LogicalNotUserDefined , U extends UserDefinedPredicate, S extends Serializable> implements FilterPredicate, Serializable { - private final UserDefined udp; + public static final class LogicalNotUserDefined , U extends UserDefinedPredicate > implements FilterPredicate, Serializable { + private final UserDefined udp; private final String toString; - LogicalNotUserDefined(UserDefined userDefined) { + LogicalNotUserDefined(UserDefined userDefined) { this.udp = checkNotNull(userDefined, "userDefined"); this.toString = "inverted(" + udp + ")"; } - public UserDefined getUserDefined() { + public UserDefined getUserDefined() { return udp; } @@ -456,4 +505,48 @@ public int hashCode() { } } + // Represents the inverse of a ConfiguredUserDefined. It is equivalent to not(userDefined), without the use + // of the not() operator + public static final class LogicalNotConfiguredUserDefined , U extends UserDefinedPredicate & Serializable > implements FilterPredicate, Serializable { + private final ConfiguredUserDefined udp; + private final String toString; + + LogicalNotConfiguredUserDefined(ConfiguredUserDefined configuredUserDefined) { + this.udp = checkNotNull(configuredUserDefined, "configuredUserDefined"); + this.toString = "inverted(" + udp + ")"; + } + + public ConfiguredUserDefined getUserDefined() { + return udp; + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + + @Override + public String toString() { + return toString; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LogicalNotUserDefined that = (LogicalNotUserDefined) o; + + if (!udp.equals(that.udp)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = udp.hashCode(); + result = result * 31 + getClass().hashCode(); + return result; + } + } } diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java index d4d04623aa..4f7a95c750 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -13,12 +13,14 @@ import parquet.filter2.predicate.Operators.Gt; import parquet.filter2.predicate.Operators.GtEq; import parquet.filter2.predicate.Operators.LogicalNotUserDefined; +import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined; import parquet.filter2.predicate.Operators.Lt; import parquet.filter2.predicate.Operators.LtEq; import parquet.filter2.predicate.Operators.Not; import parquet.filter2.predicate.Operators.NotEq; import parquet.filter2.predicate.Operators.Or; import parquet.filter2.predicate.Operators.UserDefined; +import parquet.filter2.predicate.Operators.ConfiguredUserDefined; import parquet.schema.MessageType; import parquet.schema.OriginalType; @@ -129,13 +131,24 @@ public Void visit(Not not) { } @Override - public , U extends UserDefinedPredicate, S extends Serializable> Void visit(UserDefined udp) { + public , U extends UserDefinedPredicate > Void visit(UserDefined udp) { validateColumn(udp.getColumn()); return null; } @Override - public , U extends UserDefinedPredicate, S extends Serializable> Void visit(LogicalNotUserDefined udp) { + public , U extends UserDefinedPredicate & Serializable> Void visit(ConfiguredUserDefined udp) { + validateColumn(udp.getColumn()); + return null; + } + + @Override + public , U extends UserDefinedPredicate > Void visit(LogicalNotUserDefined udp) { + return udp.getUserDefined().accept(this); + } + + @Override + public , U extends UserDefinedPredicate & Serializable> Void visit(LogicalNotConfiguredUserDefined udp) { return udp.getUserDefined().accept(this); } diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java b/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java index 552363a6ff..8be600c18e 100644 --- a/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java +++ b/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java @@ -1,7 +1,5 @@ package parquet.filter2.predicate; -import java.io.Serializable; - /** * A UserDefinedPredicate decides whether a record should be kept or dropped, first by * inspecting meta data about a group of records to see if the entire group can be dropped, @@ -12,12 +10,7 @@ */ // TODO: consider avoiding autoboxing and adding the specialized methods for each type // TODO: downside is that's fairly unwieldy for users -public abstract class UserDefinedPredicate, S extends Serializable> { - - /* - * An object that can be used for filtering in the keep method - */ - protected S udpConfig; +public abstract class UserDefinedPredicate > { /** * A udp must have a default constructor. * The udp passed to {@link FilterApi} will not be serialized along with its state. @@ -26,17 +19,8 @@ public abstract class UserDefinedPredicate, S extends Se */ public UserDefinedPredicate() { } - /* - * This method is used to set the object that is used in the keep method for filtering. - * Called before returning the new instance of this class. - */ - public void configure(S udpConfigParam) { - this.udpConfig = udpConfigParam; - } - /** * Return true to keep the record with this value, false to drop it. - * o is a filter object that can be used for filtering the value. */ public abstract boolean keep(T value); @@ -102,4 +86,4 @@ public void configure(S udpConfigParam) { * } */ public abstract boolean inverseCanDrop(Statistics statistics); -} +} \ No newline at end of file diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java b/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java index f75e315f12..b05cb98116 100644 --- a/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java +++ b/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java @@ -2,7 +2,7 @@ import java.io.Serializable; -public class DummyUdp extends UserDefinedPredicate { +public class DummyUdp extends UserDefinedPredicate { @Override public boolean keep(Integer value) { diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java index 7c2a5451d7..dafd7fd1f1 100644 --- a/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java +++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java @@ -79,7 +79,7 @@ public void testToString() { @Test public void testUdp() { - FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class, null)); + FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class)); assertTrue(predicate instanceof Or); FilterPredicate ud = ((Or) predicate).getRight(); assertTrue(ud instanceof UserDefined); @@ -90,7 +90,7 @@ public void testUdp() { @Test public void testSerializable() throws Exception { BinaryColumn binary = binaryColumn("foo"); - FilterPredicate p = or(and(userDefined(intColumn, DummyUdp.class, null), predicate), eq(binary, Binary.fromString("hi"))); + FilterPredicate p = or(and(userDefined(intColumn, DummyUdp.class), predicate), eq(binary, Binary.fromString("hi"))); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(p); diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java index f23d420d69..0aa360b5ef 100644 --- a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java +++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java @@ -1,6 +1,5 @@ package parquet.filter2.predicate; -import java.io.Serializable; import org.junit.Test; import parquet.filter2.predicate.Operators.DoubleColumn; @@ -33,7 +32,7 @@ public class TestLogicalInverseRewriter { or(ltEq(doubleColumn, 12.0), and( not(or(eq(intColumn, 7), notEq(intColumn, 17))), - userDefined(intColumn, DummyUdp.class, null)))), + userDefined(intColumn, DummyUdp.class)))), or(gt(doubleColumn, 100.0), not(gtEq(intColumn, 77)))); private static final FilterPredicate complexCollapsed = @@ -41,7 +40,7 @@ public class TestLogicalInverseRewriter { and(gt(doubleColumn, 12.0), or( or(eq(intColumn, 7), notEq(intColumn, 17)), - new LogicalNotUserDefined(userDefined(intColumn, DummyUdp.class, null)))), + new LogicalNotUserDefined(userDefined(intColumn, DummyUdp.class)))), or(gt(doubleColumn, 100.0), lt(intColumn, 77))); private static void assertNoOp(FilterPredicate p) { @@ -50,7 +49,7 @@ private static void assertNoOp(FilterPredicate p) { @Test public void testBaseCases() { - UserDefined ud = userDefined(intColumn, DummyUdp.class, null); + UserDefined ud = userDefined(intColumn, DummyUdp.class); assertNoOp(eq(intColumn, 17)); assertNoOp(notEq(intColumn, 17)); @@ -68,7 +67,7 @@ public void testBaseCases() { assertEquals(gt(intColumn, 17), rewrite(not(ltEq(intColumn, 17)))); assertEquals(ltEq(intColumn, 17), rewrite(not(gt(intColumn, 17)))); assertEquals(lt(intColumn, 17), rewrite(not(gtEq(intColumn, 17)))); - assertEquals(new LogicalNotUserDefined(ud), rewrite(not(ud))); + assertEquals(new LogicalNotUserDefined(ud), rewrite(not(ud))); FilterPredicate notedAnd = not(and(eq(intColumn, 17), eq(doubleColumn, 12.0))); FilterPredicate distributedAnd = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0)); diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java index 19796effb5..19e6b68190 100644 --- a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java +++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java @@ -1,6 +1,5 @@ package parquet.filter2.predicate; -import java.io.Serializable; import org.junit.Test; import parquet.filter2.predicate.Operators.DoubleColumn; @@ -27,14 +26,14 @@ public class TestLogicalInverter { private static final IntColumn intColumn = intColumn("a.b.c"); private static final DoubleColumn doubleColumn = doubleColumn("a.b.c"); - private static final UserDefined ud = userDefined(intColumn, DummyUdp.class, null); + private static final UserDefined ud = userDefined(intColumn, DummyUdp.class); private static final FilterPredicate complex = and( or(ltEq(doubleColumn, 12.0), and( not(or(eq(intColumn, 7), notEq(intColumn, 17))), - userDefined(intColumn, DummyUdp.class, null))), + userDefined(intColumn, DummyUdp.class))), or(gt(doubleColumn, 100.0), notEq(intColumn, 77))); private static final FilterPredicate complexInverse = @@ -42,7 +41,7 @@ public class TestLogicalInverter { and(gt(doubleColumn, 12.0), or( or(eq(intColumn, 7), notEq(intColumn, 17)), - new LogicalNotUserDefined(userDefined(intColumn, DummyUdp.class, null)))), + new LogicalNotUserDefined(userDefined(intColumn, DummyUdp.class)))), and(ltEq(doubleColumn, 100.0), eq(intColumn, 77))); @Test @@ -64,10 +63,10 @@ public void testBaseCases() { assertEquals(eq(intColumn, 17), invert(not(eq(intColumn, 17)))); - UserDefined ud = userDefined(intColumn, DummyUdp.class, null); - assertEquals(new LogicalNotUserDefined(ud), invert(ud)); + UserDefined ud = userDefined(intColumn, DummyUdp.class); + assertEquals(new LogicalNotUserDefined(ud), invert(ud)); assertEquals(ud, invert(not(ud))); - assertEquals(ud, invert(new LogicalNotUserDefined(ud))); + assertEquals(ud, invert(new LogicalNotUserDefined(ud))); } @Test diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java index b77bea45fe..d01bef2850 100644 --- a/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java +++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java @@ -3,7 +3,6 @@ import java.io.Serializable; import org.junit.Test; -import java.io.Serializable; import parquet.filter2.predicate.Operators.BinaryColumn; import parquet.filter2.predicate.Operators.IntColumn; import parquet.filter2.predicate.Operators.LongColumn; @@ -48,10 +47,10 @@ public class TestSchemaCompatibilityValidator { or(ltEq(stringC, Binary.fromString("foo")), and( not(or(eq(intBar, 17), notEq(intBar, 17))), - userDefined(intBar, DummyUdp.class, null))), + userDefined(intBar, DummyUdp.class))), or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz")))); - static class LongDummyUdp extends UserDefinedPredicate { + static class LongDummyUdp extends UserDefinedPredicate { @Override public boolean keep(Long value) { return false; @@ -73,7 +72,7 @@ public boolean inverseCanDrop(Statistics statistics) { or(ltEq(stringC, Binary.fromString("foo")), and( not(or(eq(longBar, 17L), notEq(longBar, 17L))), - userDefined(longBar, LongDummyUdp.class, null))), + userDefined(longBar, LongDummyUdp.class))), or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz")))); private static final FilterPredicate complexMixedType = @@ -81,7 +80,7 @@ public boolean inverseCanDrop(Statistics statistics) { or(ltEq(stringC, Binary.fromString("foo")), and( not(or(eq(intBar, 17), notEq(longBar, 17L))), - userDefined(longBar, LongDummyUdp.class, null))), + userDefined(longBar, LongDummyUdp.class))), or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz")))); @Test diff --git a/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index 1d96139b2b..84fa5f38ee 100644 --- a/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -52,10 +52,12 @@ public void run() throws IOException { "\n" + "import java.io.Serializable;\n" + "import parquet.common.schema.ColumnPath;\n" + + "import parquet.filter2.predicate.Operators.ConfiguredUserDefined;\n" + "import parquet.filter2.predicate.Operators.Eq;\n" + "import parquet.filter2.predicate.Operators.Gt;\n" + "import parquet.filter2.predicate.Operators.GtEq;\n" + "import parquet.filter2.predicate.Operators.LogicalNotUserDefined;\n" + + "import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;\n" + "import parquet.filter2.predicate.Operators.Lt;\n" + "import parquet.filter2.predicate.Operators.LtEq;\n" + "import parquet.filter2.predicate.Operators.NotEq;\n" + @@ -108,7 +110,7 @@ public void run() throws IOException { addVisitEnd(); add(" @Override\n" + - " public , U extends UserDefinedPredicate, S extends Serializable> IncrementallyUpdatedFilterPredicate visit(UserDefined pred) {\n"); + " public , U extends UserDefinedPredicate > IncrementallyUpdatedFilterPredicate visit(UserDefined pred) {\n"); addUdpBegin(); for (TypeInfo info : TYPES) { addUdpCase(info, false); @@ -116,14 +118,31 @@ public void run() throws IOException { addVisitEnd(); add(" @Override\n" + - " public , U extends UserDefinedPredicate, S extends Serializable> IncrementallyUpdatedFilterPredicate visit(LogicalNotUserDefined notPred) {\n" + - " UserDefined pred = notPred.getUserDefined();\n"); + " public , U extends UserDefinedPredicate & Serializable> IncrementallyUpdatedFilterPredicate visit(ConfiguredUserDefined pred) {\n"); + addUdpBegin(); + for (TypeInfo info : TYPES) { + addUdpCase(info, false); + } + addVisitEnd(); + + add(" @Override\n" + + " public , U extends UserDefinedPredicate > IncrementallyUpdatedFilterPredicate visit(LogicalNotUserDefined notPred) {\n" + + " UserDefined pred = notPred.getUserDefined();\n"); addUdpBegin(); for (TypeInfo info : TYPES) { addUdpCase(info, true); } addVisitEnd(); + add(" @Override\n" + + " public , U extends UserDefinedPredicate & Serializable> IncrementallyUpdatedFilterPredicate visit(LogicalNotConfiguredUserDefined notPred) {\n" + + " ConfiguredUserDefined pred = notPred.getUserDefined();\n"); + addUdpBegin(); + for (TypeInfo info : TYPES) { + addUdpCase(info, true); + } + addVisitEnd(); + add("}\n"); writer.close(); } diff --git a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java index 13be123d49..bb778a0e4e 100644 --- a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -14,12 +14,14 @@ import parquet.filter2.predicate.Operators.Gt; import parquet.filter2.predicate.Operators.GtEq; import parquet.filter2.predicate.Operators.LogicalNotUserDefined; +import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined; import parquet.filter2.predicate.Operators.Lt; import parquet.filter2.predicate.Operators.LtEq; import parquet.filter2.predicate.Operators.Not; import parquet.filter2.predicate.Operators.NotEq; import parquet.filter2.predicate.Operators.Or; import parquet.filter2.predicate.Operators.UserDefined; +import parquet.filter2.predicate.Operators.ConfiguredUserDefined; import parquet.filter2.predicate.UserDefinedPredicate; import parquet.hadoop.metadata.ColumnChunkMetaData; @@ -217,7 +219,7 @@ public Boolean visit(Not not) { "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not); } - private , U extends UserDefinedPredicate, S extends Serializable> Boolean visit(UserDefined ud, boolean inverted) { + private , U extends UserDefinedPredicate > Boolean visit(UserDefined ud, boolean inverted) { Column filterColumn = ud.getColumn(); ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath()); U udp = ud.getUserDefinedPredicate(); @@ -232,14 +234,38 @@ private , U extends UserDefinedPredicate, S extend } } + private , U extends UserDefinedPredicate & Serializable> Boolean visit(ConfiguredUserDefined ud, boolean inverted) { + Column filterColumn = ud.getColumn(); + ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath()); + U udp = ud.getUserDefinedPredicate(); + Statistics stats = columnChunk.getStatistics(); + parquet.filter2.predicate.Statistics udpStats = + new parquet.filter2.predicate.Statistics(stats.genericGetMin(), stats.genericGetMax()); + + if (inverted) { + return udp.inverseCanDrop(udpStats); + } else { + return udp.canDrop(udpStats); + } + } + + @Override + public , U extends UserDefinedPredicate > Boolean visit(UserDefined ud) { + return visit(ud, false); + } + @Override - public , U extends UserDefinedPredicate, S extends Serializable> Boolean visit(UserDefined ud) { + public , U extends UserDefinedPredicate & Serializable> Boolean visit(ConfiguredUserDefined ud) { return visit(ud, false); } @Override - public , U extends UserDefinedPredicate, S extends Serializable> Boolean visit(LogicalNotUserDefined lnud) { + public , U extends UserDefinedPredicate > Boolean visit(LogicalNotUserDefined lnud) { return visit(lnud.getUserDefined(), true); } + @Override + public , U extends UserDefinedPredicate & Serializable > Boolean visit(LogicalNotConfiguredUserDefined lnud) { + return visit(lnud.getUserDefined(), true); + } } diff --git a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 64bd79a4fd..9e969f3028 100644 --- a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -145,7 +145,7 @@ public boolean keep(User u) { }); } - public static class StartWithP extends UserDefinedPredicate { + public static class StartWithP extends UserDefinedPredicate { @Override public boolean keep(Binary value) { @@ -166,7 +166,13 @@ public boolean inverseCanDrop(Statistics statistics) { } } - public static class SetInFilter extends UserDefinedPredicate> { + public static class SetInFilter extends UserDefinedPredicate implements Serializable { + + HashSet hSet; + + public SetInFilter(HashSet phSet) { + hSet = phSet; + } @Override public boolean keep(Long value) { @@ -174,7 +180,7 @@ public boolean keep(Long value) { return false; } - return udpConfig.contains(value); + return hSet.contains(value); } @Override @@ -192,7 +198,7 @@ public boolean inverseCanDrop(Statistics statistics) { public void testNameNotStartWithP() throws Exception { BinaryColumn name = binaryColumn("name"); - FilterPredicate pred = not(userDefined(name, StartWithP.class, null)); + FilterPredicate pred = not(userDefined(name, StartWithP.class)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); @@ -211,7 +217,7 @@ public void testIdIn() throws Exception { HashSet h = new HashSet() {{ add(20L); add(27L); add(28L); }}; - FilterPredicate pred = userDefined(name, SetInFilter.class, h); + FilterPredicate pred = userDefined(name, new SetInFilter(h)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); diff --git a/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java index b427c631e3..04d2049220 100644 --- a/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -220,7 +220,7 @@ public void testOr() { assertFalse(canDrop(or(no, no), columnMetas)); } - public static class SevensAndEightsUdp extends UserDefinedPredicate { + public static class SevensAndEightsUdp extends UserDefinedPredicate { @Override public boolean keep(Integer value) { @@ -240,8 +240,8 @@ public boolean inverseCanDrop(Statistics statistics) { @Test public void testUdp() { - FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class, null); - FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class, null))); + FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class); + FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class))); IntStatistics seven = new IntStatistics(); seven.setMinMax(7, 7); diff --git a/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala b/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala index f906582fce..7e3997758a 100644 --- a/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala +++ b/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala @@ -1,6 +1,5 @@ package parquet.filter2.dsl -import java.io.Serializable import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong} import parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators, UserDefinedPredicate} @@ -27,10 +26,10 @@ import parquet.io.api.Binary */ object Dsl { - private[Dsl] trait Column[T <: Comparable[T], C <: Operators.Column[T], S <: java.io.Serializable] { + private[Dsl] trait Column[T <: Comparable[T], C <: Operators.Column[T]] { val javaColumn: C - def filterBy[U <: UserDefinedPredicate[T, S]](clazz: Class[U], o: S) = FilterApi.userDefined(javaColumn, clazz, o) + def filterBy[U <: UserDefinedPredicate[T]](clazz: Class[U]) = FilterApi.userDefined(javaColumn, clazz) // this is not supported because it allows for easy mistakes. For example: // val pred = IntColumn("foo") == "hello" @@ -39,40 +38,40 @@ object Dsl { throw new UnsupportedOperationException("You probably meant to use === or !==") } - case class IntColumn(columnPath: String) extends Column[JInt, Operators.IntColumn, Serializable] { + case class IntColumn(columnPath: String) extends Column[JInt, Operators.IntColumn] { override val javaColumn = FilterApi.intColumn(columnPath) } - case class LongColumn(columnPath: String) extends Column[JLong, Operators.LongColumn, Serializable] { + case class LongColumn(columnPath: String) extends Column[JLong, Operators.LongColumn] { override val javaColumn = FilterApi.longColumn(columnPath) } - case class FloatColumn(columnPath: String) extends Column[JFloat, Operators.FloatColumn, Serializable] { + case class FloatColumn(columnPath: String) extends Column[JFloat, Operators.FloatColumn] { override val javaColumn = FilterApi.floatColumn(columnPath) } - case class DoubleColumn(columnPath: String) extends Column[JDouble, Operators.DoubleColumn, Serializable] { + case class DoubleColumn(columnPath: String) extends Column[JDouble, Operators.DoubleColumn] { override val javaColumn = FilterApi.doubleColumn(columnPath) } - case class BooleanColumn(columnPath: String) extends Column[JBoolean, Operators.BooleanColumn, Serializable] { + case class BooleanColumn(columnPath: String) extends Column[JBoolean, Operators.BooleanColumn] { override val javaColumn = FilterApi.booleanColumn(columnPath) } - case class BinaryColumn(columnPath: String) extends Column[Binary, Operators.BinaryColumn, Serializable] { + case class BinaryColumn(columnPath: String) extends Column[Binary, Operators.BinaryColumn] { override val javaColumn = FilterApi.binaryColumn(columnPath) } - implicit def enrichEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq, S <: Serializable](column: Column[T, C, S]): SupportsEqNotEq[T,C, S] = new SupportsEqNotEq(column) + implicit def enrichEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq](column: Column[T, C]): SupportsEqNotEq[T,C] = new SupportsEqNotEq(column) - class SupportsEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq, S <: Serializable](val column: Column[T, C, S]) { + class SupportsEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq](val column: Column[T, C]) { def ===(v: T) = FilterApi.eq(column.javaColumn, v) def !== (v: T) = FilterApi.notEq(column.javaColumn, v) } - implicit def enrichLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt, S <: Serializable](column: Column[T, C, S]): SupportsLtGt[T,C, S] = new SupportsLtGt(column) + implicit def enrichLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt](column: Column[T, C]): SupportsLtGt[T,C] = new SupportsLtGt(column) - class SupportsLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt, S <: Serializable](val column: Column[T, C, S]) { + class SupportsLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt](val column: Column[T, C]) { def >(v: T) = FilterApi.gt(column.javaColumn, v) def >=(v: T) = FilterApi.gtEq(column.javaColumn, v) def <(v: T) = FilterApi.lt(column.javaColumn, v) diff --git a/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala b/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala index 5fe2010a67..23aa5377f2 100644 --- a/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala +++ b/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala @@ -1,7 +1,6 @@ package parquet.filter2.dsl import java.lang.{Double => JDouble, Integer => JInt} -import java.io.Serializable; import org.junit.runner.RunWith import org.scalatest.FlatSpec @@ -9,7 +8,7 @@ import org.scalatest.junit.JUnitRunner import parquet.filter2.predicate.Operators.{Or, UserDefined, DoubleColumn => JDoubleColumn, IntColumn => JIntColumn} import parquet.filter2.predicate.{FilterApi, Statistics, UserDefinedPredicate} -class DummyFilter extends UserDefinedPredicate[JInt, java.io.Serializable] { +class DummyFilter extends UserDefinedPredicate[JInt] { override def keep(value: JInt): Boolean = false override def canDrop(statistics: Statistics[JInt]): Boolean = false @@ -38,11 +37,11 @@ class DslTest extends FlatSpec{ "user defined predicates" should "be correctly constructed" in { val abc = IntColumn("a.b.c") - val pred = (abc > 10) || abc.filterBy(classOf[DummyFilter], null) + val pred = (abc > 10) || abc.filterBy(classOf[DummyFilter]) - val expected = FilterApi.or(FilterApi.gt[JInt, JIntColumn](abc.javaColumn, 10), FilterApi.userDefined[JInt, DummyFilter, java.io.Serializable](abc.javaColumn, classOf[DummyFilter], null)) + val expected = FilterApi.or(FilterApi.gt[JInt, JIntColumn](abc.javaColumn, 10), FilterApi.userDefined(abc.javaColumn, classOf[DummyFilter])) assert(pred === expected) - val intUserDefined = pred.asInstanceOf[Or].getRight.asInstanceOf[UserDefined[JInt, DummyFilter, java.io.Serializable]] + val intUserDefined = pred.asInstanceOf[Or].getRight.asInstanceOf[UserDefined[JInt, DummyFilter]] assert(intUserDefined.getUserDefinedPredicateClass === classOf[DummyFilter]) assert(intUserDefined.getUserDefinedPredicate.isInstanceOf[DummyFilter])