Skip to content

Commit

Permalink
Simplify user defined predicates with state
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Feb 4, 2015
1 parent 40d394a commit 25aa716
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import parquet.filter2.predicate.Operators.BinaryColumn;
import parquet.filter2.predicate.Operators.BooleanColumn;
import parquet.filter2.predicate.Operators.Column;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.DoubleColumn;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.FloatColumn;
Expand All @@ -23,6 +22,8 @@
import parquet.filter2.predicate.Operators.SupportsEqNotEq;
import parquet.filter2.predicate.Operators.SupportsLtGt;
import parquet.filter2.predicate.Operators.UserDefined;
import parquet.filter2.predicate.Operators.UserDefinedByClass;
import parquet.filter2.predicate.Operators.UserDefinedByInstance;

/**
* The Filter API is expressed through these static methods.
Expand Down Expand Up @@ -147,18 +148,23 @@ public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> GtEq

/**
* Keeps records that pass the provided {@link UserDefinedPredicate}
*
* The provided class must have a default constructor. To use an instance
* of a UserDefinedPredicate instead, see {@link #userDefined(column, udp)} below.
*/
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
return new UserDefined<T, U>(column, clazz);
return new UserDefinedByClass<T, U>(column, clazz);
}

/**
* Similar to above but allows to pass Serializable {@link UserDefinedPredicate}
* Keeps records that pass the provided {@link UserDefinedPredicate}
*
* The provided instance of UserDefinedPredicate must be serializable.
*/
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable>
ConfiguredUserDefined<T, U> userDefined(Column<T> column, U udp) {
return new ConfiguredUserDefined<T, U> (column, udp);
UserDefined<T, U> userDefined(Column<T> column, U udp) {
return new UserDefinedByInstance<T, U>(column, udp);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package parquet.filter2.predicate;

import java.io.Serializable;

import parquet.filter2.predicate.Operators.And;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
Expand Down Expand Up @@ -52,9 +48,7 @@ public static interface Visitor<R> {
R visit(Or or);
R visit(Not not);
<T extends Comparable<T>, U extends UserDefinedPredicate<T>> R visit(UserDefined<T, U> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> R visit(ConfiguredUserDefined<T, U> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T>> R visit(LogicalNotUserDefined<T, U> udp);
<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> R visit(LogicalNotConfiguredUserDefined<T, U> udp);
}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package parquet.filter2.predicate;

import java.io.Serializable;

import parquet.filter2.predicate.FilterPredicate.Visitor;
import parquet.filter2.predicate.Operators.And;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
Expand Down Expand Up @@ -92,18 +88,8 @@ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredic
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(ConfiguredUserDefined<T, U> udp) {
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined<T, U> udp) {
return udp;
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package parquet.filter2.predicate;

import java.io.Serializable;

import parquet.filter2.predicate.FilterPredicate.Visitor;
import parquet.filter2.predicate.Operators.And;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Eq;
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
Expand Down Expand Up @@ -87,18 +83,8 @@ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredi
return new LogicalNotUserDefined<T, U>(udp);
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(ConfiguredUserDefined<T, U> udp) {
return new LogicalNotConfiguredUserDefined<T, U>(udp);
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
return udp.getUserDefined();
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined<T, U> udp) {
return udp.getUserDefined();
}
}
115 changes: 36 additions & 79 deletions parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,33 @@ public int hashCode() {
}
}

public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
private final Column<T> column;
public static abstract class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
protected final Column<T> column;

UserDefined(Column<T> column) {
this.column = checkNotNull(column, "column");
}

public Column<T> getColumn() {
return column;
}

public abstract U getUserDefinedPredicate();

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
}
}

public static final class UserDefinedByClass<T extends Comparable<T>, U extends UserDefinedPredicate<T>> extends UserDefined<T, U> {
private final Class<U> udpClass;
private final String toString;
private static final String INSTANTIATION_ERROR_MESSAGE =
"Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";

UserDefined(Column<T> column, Class<U> udpClass) {
this.column = checkNotNull(column, "column");
UserDefinedByClass(Column<T> column, Class<U> udpClass) {
super(column);
this.udpClass = checkNotNull(udpClass, "udpClass");
String name = getClass().getSimpleName().toLowerCase();
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
Expand All @@ -357,14 +375,11 @@ public static final class UserDefined<T extends Comparable<T>, U extends UserDef
getUserDefinedPredicate();
}

public Column<T> getColumn() {
return column;
}

public Class<U> getUserDefinedPredicateClass() {
return udpClass;
}

@Override
public U getUserDefinedPredicate() {
try {
return udpClass.newInstance();
Expand All @@ -375,11 +390,6 @@ public U getUserDefinedPredicate() {
}
}

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return toString;
Expand All @@ -390,7 +400,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

UserDefined that = (UserDefined) o;
UserDefinedByClass that = (UserDefinedByClass) o;

if (!column.equals(that.column)) return false;
if (!udpClass.equals(that.udpClass)) return false;
Expand All @@ -406,30 +416,21 @@ public int hashCode() {
return result;
}
}

public static final class ConfiguredUserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > implements FilterPredicate {
private final Column<T> column;
private final U udp;

public static final class UserDefinedByInstance<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> extends UserDefined<T, U> {
private final String toString;
private final U udpInstance;

ConfiguredUserDefined(Column<T> column, U udp) {
this.column = checkNotNull(column, "column");
this.udp = checkNotNull(udp, "udp");
UserDefinedByInstance(Column<T> column, U udpInstance) {
super(column);
this.udpInstance = checkNotNull(udpInstance, "udpInstance");
String name = getClass().getSimpleName().toLowerCase();
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udp.getClass().getName() + ")";
}

public Column<T> getColumn() {
return column;
}

public U getUserDefinedPredicate() {
return udp;
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")";
}

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
public U getUserDefinedPredicate() {
return udpInstance;
}

@Override
Expand All @@ -442,18 +443,18 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ConfiguredUserDefined that = (ConfiguredUserDefined) o;
UserDefinedByInstance that = (UserDefinedByInstance) o;

if (!column.equals(that.column)) return false;
if (!udp.equals(that.udp)) return false;
if (!udpInstance.equals(that.udpInstance)) return false;

return true;
}

@Override
public int hashCode() {
int result = column.hashCode();
result = 31 * result + udp.hashCode();
result = 31 * result + udpInstance.hashCode();
result = result * 31 + getClass().hashCode();
return result;
}
Expand Down Expand Up @@ -504,48 +505,4 @@ public int hashCode() {
}
}

// Represents the inverse of a ConfiguredUserDefined. It is equivalent to not(userDefined), without the use
// of the not() operator
public static final class LogicalNotConfiguredUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > implements FilterPredicate, Serializable {
private final ConfiguredUserDefined<T, U> udp;
private final String toString;

LogicalNotConfiguredUserDefined(ConfiguredUserDefined<T, U> configuredUserDefined) {
this.udp = checkNotNull(configuredUserDefined, "configuredUserDefined");
this.toString = "inverted(" + udp + ")";
}

public ConfiguredUserDefined<T, U> getUserDefined() {
return udp;
}

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return toString;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

LogicalNotUserDefined that = (LogicalNotUserDefined) o;

if (!udp.equals(that.udp)) return false;

return true;
}

@Override
public int hashCode() {
int result = udp.hashCode();
result = result * 31 + getClass().hashCode();
return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package parquet.filter2.predicate;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -13,14 +12,12 @@
import parquet.filter2.predicate.Operators.Gt;
import parquet.filter2.predicate.Operators.GtEq;
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
import parquet.filter2.predicate.Operators.Lt;
import parquet.filter2.predicate.Operators.LtEq;
import parquet.filter2.predicate.Operators.Not;
import parquet.filter2.predicate.Operators.NotEq;
import parquet.filter2.predicate.Operators.Or;
import parquet.filter2.predicate.Operators.UserDefined;
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;

Expand Down Expand Up @@ -136,22 +133,11 @@ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(U
return null;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> Void visit(ConfiguredUserDefined<T, U> udp) {
validateColumn(udp.getColumn());
return null;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(LogicalNotUserDefined<T, U> udp) {
return udp.getUserDefined().accept(this);
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> Void visit(LogicalNotConfiguredUserDefined<T, U> udp) {
return udp.getUserDefined().accept(this);
}

private <T extends Comparable<T>> void validateColumnFilterPredicate(ColumnFilterPredicate<T> pred) {
validateColumn(pred.getColumn());
}
Expand Down
Loading

0 comments on commit 25aa716

Please sign in to comment.