Skip to content

Commit

Permalink
PARQUET-116: Add ConfiguredUserDefined that takes a serialiazble udp …
Browse files Browse the repository at this point in the history
…directly
  • Loading branch information
Yash Datta authored and Yash Datta committed Feb 2, 2015
1 parent 0eaabf4 commit 7caa4dc
Show file tree
Hide file tree
Showing 18 changed files with 264 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import parquet.filter2.predicate.Operators.BinaryColumn;
import parquet.filter2.predicate.Operators.BooleanColumn;
import parquet.filter2.predicate.Operators.Column;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.DoubleColumn;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.FloatColumn;
Expand Down Expand Up @@ -147,11 +148,17 @@ public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> GtEq
/**
* Keeps records that pass the provided {@link UserDefinedPredicate}
*/
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable>
UserDefined<T, U, S> userDefined(Column<T> column, Class<U> clazz, S o) {
return new UserDefined<T, U, S>(column, clazz, o);
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
return new UserDefined<T, U>(column, clazz);
}

public static <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable>
ConfiguredUserDefined<T, U> userDefined(Column<T> column, U udp) {
return new ConfiguredUserDefined<T, U> (column, udp);
}


/**
* Constructs the logical and of two predicates. Records will be kept if both the left and right predicate agree
* that the record should be kept.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import java.io.Serializable;

import parquet.filter2.predicate.Operators.And;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
import parquet.filter2.predicate.Operators.NotEq;
import parquet.filter2.predicate.Operators.Or;
import parquet.filter2.predicate.Operators.UserDefined;

/**
* A FilterPredicate is an expression tree describing the criteria for which records to keep when loading data from
* a parquet file. These predicates are applied in multiple places. Currently, they are applied to all row groups at
Expand Down Expand Up @@ -49,8 +50,10 @@ public static interface Visitor<R> {
R visit(And and);
R visit(Or or);
R visit(Not not);
<T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> R visit(UserDefined<T, U, S> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> R visit(LogicalNotUserDefined<T, U, S> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T> > R visit(UserDefined<T, U> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > R visit(ConfiguredUserDefined<T, U> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T> > R visit(LogicalNotUserDefined<T, U> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > R visit(LogicalNotConfiguredUserDefined<T, U> udp);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import parquet.filter2.predicate.FilterPredicate.Visitor;
import parquet.filter2.predicate.Operators.And;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
Expand Down Expand Up @@ -86,12 +88,22 @@ public FilterPredicate visit(Not not) {
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(UserDefined<T, U, S> udp) {
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(UserDefined<T, U> udp) {
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(LogicalNotUserDefined<T, U, S> udp) {
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(ConfiguredUserDefined<T, U> udp) {
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined<T, U> udp) {
return udp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import parquet.filter2.predicate.FilterPredicate.Visitor;
import parquet.filter2.predicate.Operators.And;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
Expand Down Expand Up @@ -81,12 +83,22 @@ public FilterPredicate visit(Not not) {
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(UserDefined<T, U, S> udp) {
return new LogicalNotUserDefined<T, U, S>(udp);
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(UserDefined<T, U> udp) {
return new LogicalNotUserDefined<T, U>(udp);
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(LogicalNotUserDefined<T, U, S> udp) {
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(ConfiguredUserDefined<T, U> udp) {
return new LogicalNotConfiguredUserDefined<T, U>(udp);
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
return udp.getUserDefined();
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined<T, U> udp) {
return udp.getUserDefined();
}
}
115 changes: 104 additions & 11 deletions parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,20 +340,19 @@ public int hashCode() {
}
}

public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> implements FilterPredicate, Serializable {
public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T> > implements FilterPredicate, Serializable {
private final Column<T> column;
private final Class<U> udpClass;
private final String toString;
private final S udpConfig;

private static final String INSTANTIATION_ERROR_MESSAGE =
"Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";

UserDefined(Column<T> column, Class<U> udpClass, S udpConfigParam) {
UserDefined(Column<T> column, Class<U> udpClass) {
this.column = checkNotNull(column, "column");
this.udpClass = checkNotNull(udpClass, "udpClass");
String name = getClass().getSimpleName().toLowerCase();
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
this.udpConfig = udpConfigParam;

// defensively try to instantiate the class early to make sure that it's possible
getUserDefinedPredicate();
Expand All @@ -369,9 +368,7 @@ public Class<U> getUserDefinedPredicateClass() {

public U getUserDefinedPredicate() {
try {
U udpInstance = udpClass.newInstance();
udpInstance.configure(udpConfig);
return udpInstance;
return udpClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
} catch (IllegalAccessException e) {
Expand Down Expand Up @@ -411,18 +408,70 @@ public int hashCode() {
}
}

public static final class ConfiguredUserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > implements FilterPredicate {
private final Column<T> column;
private final U udp;
private final String toString;

ConfiguredUserDefined(Column<T> column, U udp) {
this.column = checkNotNull(column, "column");
this.udp = checkNotNull(udp, "udp");
String name = getClass().getSimpleName().toLowerCase();
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udp.getClass().getName() + ")";
}

public Column<T> getColumn() {
return column;
}

public U getUserDefinedPredicate() {
return udp;
}

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return toString;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ConfiguredUserDefined that = (ConfiguredUserDefined) o;

if (!column.equals(that.column)) return false;
if (!udp.equals(that.udp)) return false;

return true;
}

@Override
public int hashCode() {
int result = column.hashCode();
result = 31 * result + udp.hashCode();
result = result * 31 + getClass().hashCode();
return result;
}
}

// Represents the inverse of a UserDefined. It is equivalent to not(userDefined), without the use
// of the not() operator
public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> implements FilterPredicate, Serializable {
private final UserDefined<T, U, S> udp;
public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T> > implements FilterPredicate, Serializable {
private final UserDefined<T, U> udp;
private final String toString;

LogicalNotUserDefined(UserDefined<T, U, S> userDefined) {
LogicalNotUserDefined(UserDefined<T, U> userDefined) {
this.udp = checkNotNull(userDefined, "userDefined");
this.toString = "inverted(" + udp + ")";
}

public UserDefined<T, U, S> getUserDefined() {
public UserDefined<T, U> getUserDefined() {
return udp;
}

Expand Down Expand Up @@ -456,4 +505,48 @@ public int hashCode() {
}
}

// Represents the inverse of a ConfiguredUserDefined. It is equivalent to not(userDefined), without the use
// of the not() operator
public static final class LogicalNotConfiguredUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > implements FilterPredicate, Serializable {
private final ConfiguredUserDefined<T, U> udp;
private final String toString;

LogicalNotConfiguredUserDefined(ConfiguredUserDefined<T, U> configuredUserDefined) {
this.udp = checkNotNull(configuredUserDefined, "configuredUserDefined");
this.toString = "inverted(" + udp + ")";
}

public ConfiguredUserDefined<T, U> getUserDefined() {
return udp;
}

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return toString;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

LogicalNotUserDefined that = (LogicalNotUserDefined) o;

if (!udp.equals(that.udp)) return false;

return true;
}

@Override
public int hashCode() {
int result = udp.hashCode();
result = result * 31 + getClass().hashCode();
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
import parquet.filter2.predicate.Operators.NotEq;
import parquet.filter2.predicate.Operators.Or;
import parquet.filter2.predicate.Operators.UserDefined;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;

Expand Down Expand Up @@ -129,13 +131,24 @@ public Void visit(Not not) {
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> Void visit(UserDefined<T, U, S> udp) {
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > Void visit(UserDefined<T, U> udp) {
validateColumn(udp.getColumn());
return null;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> Void visit(LogicalNotUserDefined<T, U, S> udp) {
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> Void visit(ConfiguredUserDefined<T, U> udp) {
validateColumn(udp.getColumn());
return null;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > Void visit(LogicalNotUserDefined<T, U> udp) {
return udp.getUserDefined().accept(this);
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> Void visit(LogicalNotConfiguredUserDefined<T, U> udp) {
return udp.getUserDefined().accept(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package parquet.filter2.predicate;

import java.io.Serializable;

/**
* A UserDefinedPredicate decides whether a record should be kept or dropped, first by
* inspecting meta data about a group of records to see if the entire group can be dropped,
Expand All @@ -12,12 +10,7 @@
*/
// TODO: consider avoiding autoboxing and adding the specialized methods for each type
// TODO: downside is that's fairly unwieldy for users
public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Serializable> {

/*
* An object that can be used for filtering in the keep method
*/
protected S udpConfig;
public abstract class UserDefinedPredicate<T extends Comparable<T> > {
/**
* A udp must have a default constructor.
* The udp passed to {@link FilterApi} will not be serialized along with its state.
Expand All @@ -26,17 +19,8 @@ public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Se
*/
public UserDefinedPredicate() { }

/*
* This method is used to set the object that is used in the keep method for filtering.
* Called before returning the new instance of this class.
*/
public void configure(S udpConfigParam) {
this.udpConfig = udpConfigParam;
}

/**
* Return true to keep the record with this value, false to drop it.
* o is a filter object that can be used for filtering the value.
*/
public abstract boolean keep(T value);

Expand Down Expand Up @@ -102,4 +86,4 @@ public void configure(S udpConfigParam) {
* }
*/
public abstract boolean inverseCanDrop(Statistics<T> statistics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.io.Serializable;

public class DummyUdp extends UserDefinedPredicate<Integer, Serializable> {
public class DummyUdp extends UserDefinedPredicate<Integer> {

@Override
public boolean keep(Integer value) {
Expand Down
Loading

0 comments on commit 7caa4dc

Please sign in to comment.