Skip to content

Commit

Permalink
PARQUET-116: Pass a filter object to user defined predicate in filter…
Browse files Browse the repository at this point in the history
…2 api
  • Loading branch information
Yash Datta authored and Yash Datta committed Oct 18, 2014
1 parent be1222e commit 4ab46ec
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> GtEq
* Keeps records that pass the provided {@link UserDefinedPredicate}
*/
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
return new UserDefined<T, U>(column, clazz);
UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz, Object o) {
return new UserDefined<T, U>(column, clazz, o);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,16 @@ public static final class UserDefined<T extends Comparable<T>, U extends UserDef
private final Column<T> column;
private final Class<U> udpClass;
private final String toString;
private final Object o;
private static final String INSTANTIATION_ERROR_MESSAGE =
"Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";

UserDefined(Column<T> column, Class<U> udpClass) {
UserDefined(Column<T> column, Class<U> udpClass, Object o) {
this.column = checkNotNull(column, "column");
this.udpClass = checkNotNull(udpClass, "udpClass");
String name = getClass().getSimpleName().toLowerCase();
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
this.o = o;

// defensively try to instantiate the class early to make sure that it's possible
getUserDefinedPredicate();
Expand All @@ -365,6 +367,10 @@ public Class<U> getUserDefinedPredicateClass() {
return udpClass;
}

public Object getFilterObject() {
return o;
}

public U getUserDefinedPredicate() {
try {
return udpClass.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public UserDefinedPredicate() { }

/**
* Return true to keep the record with this value, false to drop it.
* o is a filter object that can be used for filtering the value.
*/
public abstract boolean keep(T value);
public abstract boolean keep(T value, Object o);

/**
* Given information about a group of records (eg, the min and max value)
Expand Down Expand Up @@ -87,4 +88,4 @@ public UserDefinedPredicate() { }
* }
*/
public abstract boolean inverseCanDrop(Statistics<T> statistics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public class DummyUdp extends UserDefinedPredicate<Integer> {

@Override
public boolean keep(Integer value) {
public boolean keep(Integer value, Object o) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testToString() {

@Test
public void testUdp() {
FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class));
FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class, null));
assertTrue(predicate instanceof Or);
FilterPredicate ud = ((Or) predicate).getRight();
assertTrue(ud instanceof UserDefined);
Expand All @@ -90,7 +90,7 @@ public void testUdp() {
@Test
public void testSerializable() throws Exception {
BinaryColumn binary = binaryColumn("foo");
FilterPredicate p = or(and(userDefined(intColumn, DummyUdp.class), predicate), eq(binary, Binary.fromString("hi")));
FilterPredicate p = or(and(userDefined(intColumn, DummyUdp.class, null), predicate), eq(binary, Binary.fromString("hi")));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public class TestLogicalInverseRewriter {
or(ltEq(doubleColumn, 12.0),
and(
not(or(eq(intColumn, 7), notEq(intColumn, 17))),
userDefined(intColumn, DummyUdp.class)))),
userDefined(intColumn, DummyUdp.class, null)))),
or(gt(doubleColumn, 100.0), not(gtEq(intColumn, 77))));

private static final FilterPredicate complexCollapsed =
and(
and(gt(doubleColumn, 12.0),
or(
or(eq(intColumn, 7), notEq(intColumn, 17)),
new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class, null)))),
or(gt(doubleColumn, 100.0), lt(intColumn, 77)));

private static void assertNoOp(FilterPredicate p) {
Expand All @@ -49,7 +49,7 @@ private static void assertNoOp(FilterPredicate p) {

@Test
public void testBaseCases() {
UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class, null);

assertNoOp(eq(intColumn, 17));
assertNoOp(notEq(intColumn, 17));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ public class TestLogicalInverter {
private static final IntColumn intColumn = intColumn("a.b.c");
private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");

private static final UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
private static final UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class, null);

private static final FilterPredicate complex =
and(
or(ltEq(doubleColumn, 12.0),
and(
not(or(eq(intColumn, 7), notEq(intColumn, 17))),
userDefined(intColumn, DummyUdp.class))),
userDefined(intColumn, DummyUdp.class, null))),
or(gt(doubleColumn, 100.0), notEq(intColumn, 77)));

private static final FilterPredicate complexInverse =
or(
and(gt(doubleColumn, 12.0),
or(
or(eq(intColumn, 7), notEq(intColumn, 17)),
new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class, null)))),
and(ltEq(doubleColumn, 100.0), eq(intColumn, 77)));

@Test
Expand All @@ -63,7 +63,7 @@ public void testBaseCases() {

assertEquals(eq(intColumn, 17), invert(not(eq(intColumn, 17))));

UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class, null);
assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), invert(ud));
assertEquals(ud, invert(not(ud)));
assertEquals(ud, invert(new LogicalNotUserDefined<Integer, DummyUdp>(ud)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public class TestSchemaCompatibilityValidator {
or(ltEq(stringC, Binary.fromString("foo")),
and(
not(or(eq(intBar, 17), notEq(intBar, 17))),
userDefined(intBar, DummyUdp.class))),
userDefined(intBar, DummyUdp.class, null))),
or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));

static class LongDummyUdp extends UserDefinedPredicate<Long> {
@Override
public boolean keep(Long value) {
public boolean keep(Long value, Object o) {
return false;
}

Expand All @@ -71,15 +71,15 @@ public boolean inverseCanDrop(Statistics<Long> statistics) {
or(ltEq(stringC, Binary.fromString("foo")),
and(
not(or(eq(longBar, 17L), notEq(longBar, 17L))),
userDefined(longBar, LongDummyUdp.class))),
userDefined(longBar, LongDummyUdp.class, null))),
or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));

private static final FilterPredicate complexMixedType =
and(
or(ltEq(stringC, Binary.fromString("foo")),
and(
not(or(eq(intBar, 17), notEq(longBar, 17L))),
userDefined(longBar, LongDummyUdp.class))),
userDefined(longBar, LongDummyUdp.class, null))),
or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ private void addUdpBegin() throws IOException {
" ValueInspector valueInspector = null;\n" +
"\n" +
" final U udp = pred.getUserDefinedPredicate();\n" +
"\n" +
" final Object o = pred.getFilterObject();\n" +
"\n");
}

Expand All @@ -229,13 +231,13 @@ private void addUdpCase(TypeInfo info, boolean invert)throws IOException {
" valueInspector = new ValueInspector() {\n" +
" @Override\n" +
" public void updateNull() {\n" +
" setResult(" + (invert ? "!" : "") + "udp.keep(null));\n" +
" setResult(" + (invert ? "!" : "") + "udp.keep(null, o));\n" +
" }\n" +
"\n" +
" @SuppressWarnings(\"unchecked\")\n" +
" @Override\n" +
" public void update(" + info.primitiveName + " value) {\n" +
" setResult(" + (invert ? "!" : "") + "udp.keep((T) (Object) value));\n" +
" setResult(" + (invert ? "!" : "") + "udp.keep((T) (Object) value, o));\n" +
" }\n" +
" };\n" +
" }\n\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public boolean keep(User u) {
public static class StartWithP extends UserDefinedPredicate<Binary> {

@Override
public boolean keep(Binary value) {
public boolean keep(Binary value, Object o) {
if (value == null) {
return false;
}
Expand All @@ -165,7 +165,7 @@ public boolean inverseCanDrop(Statistics<Binary> statistics) {
public void testNameNotStartWithP() throws Exception {
BinaryColumn name = binaryColumn("name");

FilterPredicate pred = not(userDefined(name, StartWithP.class));
FilterPredicate pred = not(userDefined(name, StartWithP.class, null));

List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testOr() {
public static class SevensAndEightsUdp extends UserDefinedPredicate<Integer> {

@Override
public boolean keep(Integer value) {
public boolean keep(Integer value, Object o) {
throw new RuntimeException("this method should not be called");
}

Expand All @@ -239,8 +239,8 @@ public boolean inverseCanDrop(Statistics<Integer> statistics) {

@Test
public void testUdp() {
FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class);
FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class)));
FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class, null);
FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class, null)));

IntStatistics seven = new IntStatistics();
seven.setMinMax(7, 7);
Expand Down
2 changes: 1 addition & 1 deletion parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Dsl {
private[Dsl] trait Column[T <: Comparable[T], C <: Operators.Column[T]] {
val javaColumn: C

def filterBy[U <: UserDefinedPredicate[T]](clazz: Class[U]) = FilterApi.userDefined(javaColumn, clazz)
def filterBy[U <: UserDefinedPredicate[T]](clazz: Class[U]) = FilterApi.userDefined(javaColumn, clazz, null)

// this is not supported because it allows for easy mistakes. For example:
// val pred = IntColumn("foo") == "hello"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import parquet.filter2.predicate.Operators.{Or, UserDefined, DoubleColumn => JDo
import parquet.filter2.predicate.{FilterApi, Statistics, UserDefinedPredicate}

class DummyFilter extends UserDefinedPredicate[JInt] {
override def keep(value: JInt): Boolean = false
override def keep(value: JInt, o: Object): Boolean = false

override def canDrop(statistics: Statistics[JInt]): Boolean = false

Expand Down Expand Up @@ -39,7 +39,7 @@ class DslTest extends FlatSpec{
val abc = IntColumn("a.b.c")
val pred = (abc > 10) || abc.filterBy(classOf[DummyFilter])

val expected = FilterApi.or(FilterApi.gt[JInt, JIntColumn](abc.javaColumn, 10), FilterApi.userDefined(abc.javaColumn, classOf[DummyFilter]))
val expected = FilterApi.or(FilterApi.gt[JInt, JIntColumn](abc.javaColumn, 10), FilterApi.userDefined(abc.javaColumn, classOf[DummyFilter], null))
assert(pred === expected)
val intUserDefined = pred.asInstanceOf[Or].getRight.asInstanceOf[UserDefined[JInt, DummyFilter]]

Expand Down

0 comments on commit 4ab46ec

Please sign in to comment.