Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions api/src/main/java/com/netflix/iceberg/expressions/Binder.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,63 @@ private Binder() {
*
* @param struct The {@link StructType struct type} to resolve references by name.
* @param expr An {@link Expression expression} to rewrite with bound references.
* @param caseSensitive A boolean flag to control whether the bind should enforce case sensitivity.
* @return the expression rewritten with bound references
* @throws ValidationException if literals do not match bound references
* @throws IllegalStateException if any references are already bound
*/
public static Expression bind(StructType struct,
Expression expr) {
return ExpressionVisitors.visit(expr, new BindVisitor(struct));
Expression expr,
boolean caseSensitive) {
return ExpressionVisitors.visit(expr, new BindVisitor(struct, caseSensitive));
}

public static Set<Integer> boundReferences(StructType struct, List<Expression> exprs) {
/**
* Replaces all unbound/named references with bound references to fields in the given struct,
* defaulting to case sensitive mode.
*
* Access modifier is package-private, to only allow use from existing tests.
*
* <p>
* When a reference is resolved, any literal used in a predicate for that field is converted to
* the field's type using {@link Literal#to(Type)}. If automatic conversion to that type isn't
* allowed, a {@link ValidationException validation exception} is thrown.
* <p>
* The result expression may be simplified when constructed. For example, {@code isNull("a")} is
* replaced with {@code alwaysFalse()} when {@code "a"} is resolved to a required field.
* <p>
* The expression cannot contain references that are already bound, or an
* {@link IllegalStateException} will be thrown.
*
* @param struct The {@link StructType struct type} to resolve references by name.
* @param expr An {@link Expression expression} to rewrite with bound references.
* @return the expression rewritten with bound references
*
* @throws IllegalStateException if any references are already bound
*/
static Expression bind(StructType struct,
Expression expr) {
return Binder.bind(struct, expr, true);
}

public static Set<Integer> boundReferences(StructType struct, List<Expression> exprs, boolean caseSensitive) {
if (exprs == null) {
return ImmutableSet.of();
}
ReferenceVisitor visitor = new ReferenceVisitor();
for (Expression expr : exprs) {
ExpressionVisitors.visit(bind(struct, expr), visitor);
ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
}
return visitor.references;
}

private static class BindVisitor extends ExpressionVisitor<Expression> {
private final StructType struct;
private final boolean caseSensitive;

private BindVisitor(StructType struct) {
private BindVisitor(StructType struct, boolean caseSensitive) {
this.struct = struct;
this.caseSensitive = caseSensitive;
}

@Override
Expand Down Expand Up @@ -110,7 +142,7 @@ public <T> Expression predicate(BoundPredicate<T> pred) {

@Override
public <T> Expression predicate(UnboundPredicate<T> pred) {
return pred.bind(struct);
return pred.bind(struct, caseSensitive);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private EvalVisitor visitor() {
}

public Evaluator(Types.StructType struct, Expression unbound) {
this.expr = Binder.bind(struct, unbound);
this.expr = Binder.bind(struct, unbound, true);
}

public boolean eval(StructLike data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private ManifestEvalVisitor visitor() {

public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
this.struct = spec.partitionType();
this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)));
this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)), true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private MetricsEvalVisitor visitor() {
public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound));
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Expression or(Expression leftResult, Expression rightResult) {

@Override
public <T> Expression predicate(UnboundPredicate<T> pred) {
Expression bound = pred.bind(spec.schema().asStruct());
Expression bound = pred.bind(spec.schema().asStruct(), true);

if (bound instanceof BoundPredicate) {
return predicate((BoundPredicate<?>) bound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
.projectStrict(part.name(), pred);

if (strictProjection != null) {
Expression bound = strictProjection.bind(spec.partitionType());
Expression bound = strictProjection.bind(spec.partitionType(), true);
if (bound instanceof BoundPredicate) {
// the predicate methods will evaluate and return alwaysTrue or alwaysFalse
return super.predicate((BoundPredicate<?>) bound);
Expand All @@ -184,7 +184,7 @@ public <T> Expression predicate(BoundPredicate<T> pred) {

@Override
public <T> Expression predicate(UnboundPredicate<T> pred) {
Expression bound = pred.bind(spec.schema().asStruct());
Expression bound = pred.bind(spec.schema().asStruct(), true);

if (bound instanceof BoundPredicate) {
Expression boundResidual = predicate((BoundPredicate<?>) bound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private MetricsEvalVisitor visitor() {
public StrictMetricsEvaluator(Schema schema, Expression unbound) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound));
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,35 @@ public Expression negate() {
return new UnboundPredicate<>(op().negate(), ref(), literal());
}

public Expression bind(Types.StructType struct) {
Types.NestedField field = struct.field(ref().name());
/**
* Bind this UnboundPredicate, defaulting to case sensitive mode.
*
* Access modifier is package-private, to only allow use from existing tests.
*
* @param struct The {@link Types.StructType struct type} to resolve references by name.
* @return an {@link Expression}
* @throws ValidationException if literals do not match bound references, or if comparison on expression is invalid
*/
Expression bind(Types.StructType struct) {
return bind(struct, true);
}

/**
* Bind this UnboundPredicate.
*
* @param struct The {@link Types.StructType struct type} to resolve references by name.
* @param caseSensitive A boolean flag to control whether the bind should enforce case sensitivity.
* @return an {@link Expression}
* @throws ValidationException if literals do not match bound references, or if comparison on expression is invalid
*/
public Expression bind(Types.StructType struct, boolean caseSensitive) {
Types.NestedField field;
if (caseSensitive) {
field = struct.field(ref().name());
} else {
field = struct.caseInsensitiveField(ref().name());
}

ValidationException.check(field != null,
"Cannot find field '%s' in struct: %s", ref().name(), struct);

Expand Down
17 changes: 16 additions & 1 deletion api/src/main/java/com/netflix/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class Types {
private static final Pattern DECIMAL = Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");

public static PrimitiveType fromPrimitiveString(String typeString) {
String lowerTypeString = typeString.toLowerCase(Locale.ENGLISH);
String lowerTypeString = typeString.toLowerCase(Locale.ROOT);
if (TYPES.containsKey(lowerTypeString)) {
return TYPES.get(lowerTypeString);
}
Expand Down Expand Up @@ -516,6 +516,7 @@ public static StructType of(List<NestedField> fields) {
// lazy values
private transient List<NestedField> fieldList = null;
private transient Map<String, NestedField> fieldsByName = null;
private transient Map<String, NestedField> fieldsByLowerCaseName = null;
private transient Map<Integer, NestedField> fieldsById = null;

private StructType(List<NestedField> fields) {
Expand All @@ -535,6 +536,10 @@ public NestedField field(String name) {
return lazyFieldsByName().get(name);
}

public NestedField caseInsensitiveField(String name) {
return lazyFieldsByLowerCaseName().get(name.toLowerCase(Locale.ROOT));
}

@Override
public Type fieldType(String name) {
NestedField field = field(name);
Expand Down Expand Up @@ -600,6 +605,13 @@ private Map<String, NestedField> lazyFieldsByName() {
return fieldsByName;
}

private Map<String, NestedField> lazyFieldsByLowerCaseName() {
if (fieldsByLowerCaseName == null) {
indexFields();
}
return fieldsByLowerCaseName;
}

private Map<Integer, NestedField> lazyFieldsById() {
if (fieldsById == null) {
indexFields();
Expand All @@ -609,12 +621,15 @@ private Map<Integer, NestedField> lazyFieldsById() {

private void indexFields() {
ImmutableMap.Builder<String, NestedField> byNameBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, NestedField> byLowerCaseNameBuilder = ImmutableMap.builder();
ImmutableMap.Builder<Integer, NestedField> byIdBuilder = ImmutableMap.builder();
for (NestedField field : fields) {
byNameBuilder.put(field.name(), field);
byLowerCaseNameBuilder.put(field.name().toLowerCase(Locale.ROOT), field);
byIdBuilder.put(field.fieldId(), field);
}
this.fieldsByName = byNameBuilder.build();
this.fieldsByLowerCaseName = byLowerCaseNameBuilder.build();
this.fieldsById = byIdBuilder.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,19 @@ public void testBoundExpressionFails() {
@Test
public void testSingleReference() {
Expression expr = not(equal("x", 7));
TestHelpers.assertAllReferencesBound("Single reference", Binder.bind(STRUCT, expr));
TestHelpers.assertAllReferencesBound("Single reference", Binder.bind(STRUCT, expr, true));
}

@Test
public void testCaseInsensitiveReference() {
Expression expr = not(equal("X", 7));
TestHelpers.assertAllReferencesBound("Single reference", Binder.bind(STRUCT, expr, false));
}

@Test(expected = ValidationException.class)
public void testCaseSensitiveReference() {
Expression expr = not(equal("X", 7));
Binder.bind(STRUCT, expr, true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void testLongToIntegerConversion() {
Assert.assertEquals("Less than or equal below min should be alwaysFalse",
Expressions.alwaysFalse(), lteqMin.bind(struct));

Expression ltExpr = new UnboundPredicate<>(LT, ref("i"), (long) Integer.MAX_VALUE).bind(struct);
Expression ltExpr = new UnboundPredicate<>(LT, ref("i"), (long) Integer.MAX_VALUE).bind(struct, true);
BoundPredicate<Integer> ltMax = assertAndUnwrap(ltExpr);
Assert.assertEquals("Should translate bound to Integer",
(Integer) Integer.MAX_VALUE, ltMax.literal().value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testIdentityProjection() {
UnboundPredicate<?> projected = assertAndUnwrapUnbound(expr);

// check inclusive the bound predicate to ensure the types are correct
BoundPredicate<?> bound = assertAndUnwrap(predicate.bind(spec.schema().asStruct()));
BoundPredicate<?> bound = assertAndUnwrap(predicate.bind(spec.schema().asStruct(), true));

Assert.assertEquals("Field name should match partition struct field",
"id", projected.ref().name());
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testStrictIdentityProjection() {
UnboundPredicate<?> projected = assertAndUnwrapUnbound(expr);

// check inclusive the bound predicate to ensure the types are correct
BoundPredicate<?> bound = assertAndUnwrap(predicate.bind(spec.schema().asStruct()));
BoundPredicate<?> bound = assertAndUnwrap(predicate.bind(spec.schema().asStruct(), true));

Assert.assertEquals("Field name should match partition struct field",
"id", projected.ref().name());
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/netflix/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public TableScan select(Collection<String> columns) {

// all of the filter columns are required
requiredFieldIds.addAll(
Binder.boundReferences(table.schema().asStruct(), Collections.singletonList(rowFilter)));
Binder.boundReferences(table.schema().asStruct(), Collections.singletonList(rowFilter), true));

// all of the projection columns are required
requiredFieldIds.addAll(TypeUtil.getProjectedIds(table.schema().select(columns)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private EvalVisitor visitor() {
public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound));
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public <T> FilterPredicate predicate(BoundPredicate<T> pred) {
}

protected Expression bind(UnboundPredicate<?> pred) {
return pred.bind(schema.asStruct());
return pred.bind(schema.asStruct(), true);
}

@Override
Expand Down Expand Up @@ -189,7 +189,7 @@ private ConvertColumnFilterToParquet(Schema schema, String column) {

protected Expression bind(UnboundPredicate<?> pred) {
// instead of binding the predicate using the top-level schema, bind it to the partition data
return pred.bind(partitionStruct);
return pred.bind(partitionStruct, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private MetricsEvalVisitor visitor() {
public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound));
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private static com.netflix.iceberg.expressions.Expression filter(

public static Expression convert(com.netflix.iceberg.expressions.Expression filter,
Schema schema) {
return visit(Binder.bind(schema.asStruct(), filter), new ExpressionToSpark(schema));
return visit(Binder.bind(schema.asStruct(), filter, true), new ExpressionToSpark(schema));
}

private static class ExpressionToSpark extends ExpressionVisitors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static Schema prune(Schema schema, StructType requestedType) {
* @throws IllegalArgumentException if the Spark type does not match the Schema
*/
public static Schema prune(Schema schema, StructType requestedType, List<Expression> filters) {
Set<Integer> filterRefs = Binder.boundReferences(schema.asStruct(), filters);
Set<Integer> filterRefs = Binder.boundReferences(schema.asStruct(), filters, true);
return new Schema(visit(schema, new PruneColumnsWithoutReordering(requestedType, filterRefs))
.asNestedType()
.asStructType()
Expand All @@ -225,7 +225,7 @@ public static Schema prune(Schema schema, StructType requestedType, List<Express
* @throws IllegalArgumentException if the Spark type does not match the Schema
*/
public static Schema prune(Schema schema, StructType requestedType, Expression filter) {
Set<Integer> filterRefs = Binder.boundReferences(schema.asStruct(), Collections.singletonList(filter));
Set<Integer> filterRefs = Binder.boundReferences(schema.asStruct(), Collections.singletonList(filter), true);
return new Schema(visit(schema, new PruneColumnsWithoutReordering(requestedType, filterRefs))
.asNestedType()
.asStructType()
Expand Down