Skip to content
45 changes: 44 additions & 1 deletion api/src/main/java/com/netflix/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -46,6 +47,7 @@ public class Schema implements Serializable {
private transient BiMap<String, Integer> aliasToId = null;
private transient Map<Integer, Types.NestedField> idToField = null;
private transient BiMap<String, Integer> nameToId = null;
private transient BiMap<String, Integer> lowerCaseNameToId = null;

public Schema(List<Types.NestedField> columns, Map<String, Integer> aliases) {
this.struct = Types.StructType.of(columns);
Expand All @@ -70,6 +72,13 @@ private BiMap<String, Integer> lazyNameToId() {
return nameToId;
}

private BiMap<String, Integer> lazyLowerCaseNameToId() {
if (lowerCaseNameToId == null) {
this.lowerCaseNameToId = ImmutableBiMap.copyOf(TypeUtil.indexByLowerCaseName(struct));
}
return lowerCaseNameToId;
}

public Schema(Types.NestedField... columns) {
this(Arrays.asList(columns));
}
Expand Down Expand Up @@ -207,13 +216,47 @@ public Schema select(String... names) {
* @return a projection schema from this schema, by name
*/
public Schema select(Collection<String> names) {
return internalSelect(names, true);
}

/**
* Creates a projection schema for a subset of columns, selected by case insensitive name
* <p>
* Names that identify nested fields will select part or all of the field's top-level column.
*
* @param names a List of String names for selected columns
* @return a projection schema from this schema, by name
*/
public Schema caseInsensitiveSelect(String... names) {
return caseInsensitiveSelect(Arrays.asList(names));
}

/**
* Creates a projection schema for a subset of columns, selected by case insensitive name
* <p>
* Names that identify nested fields will select part or all of the field's top-level column.
*
* @param names a List of String names for selected columns
* @return a projection schema from this schema, by name
*/
public Schema caseInsensitiveSelect(Collection<String> names) {
return internalSelect(names, false);
}

private Schema internalSelect(Collection<String> names, boolean caseSensitive) {
if (names.contains(ALL_COLUMNS)) {
return this;
}

Set<Integer> selected = Sets.newHashSet();
for (String name : names) {
Integer id = lazyNameToId().get(name);
Integer id;
if (caseSensitive) {
id = lazyNameToId().get(name);
} else {
id = lazyLowerCaseNameToId().get(name.toLowerCase(Locale.ROOT));
}

if (id != null) {
selected.add(id);
}
Expand Down
19 changes: 17 additions & 2 deletions api/src/main/java/com/netflix/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public interface TableScan {
*/
TableScan project(Schema schema);

/**
* Create a new {@link TableScan} from this that, if data columns where selected
* via {@link #select(java.util.Collection)}, controls whether the match to the schema will be done
* with case sensitivity.
*
* @return a new scan based on this with case sensitivity as stated
*/
TableScan caseSensitive(boolean caseSensitive);

/**
* Create a new {@link TableScan} from this that will read the given data columns. This produces
* an expected schema that includes all fields that are either selected or used by this scan's
Expand All @@ -83,8 +92,8 @@ default TableScan select(String... columns) {
* an expected schema that includes all fields that are either selected or used by this scan's
* filter expression.
*
* @param columns column names from the manifest file schema
* @return a new scan based on this with the given manifest columns
* @param columns column names from the table's schema
* @return a new scan based on this with the given projection columns
*/
TableScan select(Collection<String> columns);

Expand Down Expand Up @@ -136,4 +145,10 @@ default TableScan select(String... columns) {
* @return this scan's filter expression
*/
Expression filter();

/**
* Returns whether this scan should apply column name case sensitiveness as per {@link #caseSensitive(boolean)}.
* @return true if case sensitive, false otherwise.
*/
boolean isCaseSensitive();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public Evaluator(Types.StructType struct, Expression unbound) {
this.expr = Binder.bind(struct, unbound, true);
}

public Evaluator(Types.StructType struct, Expression unbound, boolean caseSensitive) {
this.expr = Binder.bind(struct, unbound, caseSensitive);
}

public boolean eval(StructLike data) {
return visitor().eval(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,15 @@ private ManifestEvalVisitor visitor() {
}

public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
this(spec, rowFilter, true);
}

public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter, boolean caseSensitive) {
this.struct = spec.partitionType();
this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)), true);
this.expr = Binder.bind(
struct,
rewriteNot(Projections.inclusive(spec, caseSensitive).project(rowFilter)),
caseSensitive);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class InclusiveMetricsEvaluator {
private final Schema schema;
private final StructType struct;
private final Expression expr;
private final boolean caseSensitive;
private transient ThreadLocal<MetricsEvalVisitor> visitors = null;

private MetricsEvalVisitor visitor() {
Expand All @@ -53,10 +54,15 @@ private MetricsEvalVisitor visitor() {
return visitors.get();
}

public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
this(schema, unbound, true);
}

public InclusiveMetricsEvaluator(Schema schema, Expression unbound, boolean caseSensitive) {
this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound), true);
this.caseSensitive = caseSensitive;
this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}

/**
Expand Down
64 changes: 54 additions & 10 deletions api/src/main/java/com/netflix/iceberg/expressions/Projections.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static abstract class ProjectionEvaluator extends ExpressionVisitor<Expre
}

/**
* Creates an inclusive {@code ProjectionEvaluator} for the {@link PartitionSpec spec}.
* Creates an inclusive {@code ProjectionEvaluator} for the {@link PartitionSpec spec}, defaulting
* to case sensitive mode.
* <p>
* An evaluator is used to project expressions for a table's data rows into expressions on the
* table's partition values. The evaluator returned by this function is inclusive and will build
Expand All @@ -69,11 +70,32 @@ public static abstract class ProjectionEvaluator extends ExpressionVisitor<Expre
* @see Transform#project(String, BoundPredicate) Inclusive transform used for each predicate
*/
public static ProjectionEvaluator inclusive(PartitionSpec spec) {
return new InclusiveProjection(spec);
return new InclusiveProjection(spec, true);
}

/**
* Creates a strict {@code ProjectionEvaluator} for the {@link PartitionSpec spec}.
* Creates an inclusive {@code ProjectionEvaluator} for the {@link PartitionSpec spec}.
* <p>
* An evaluator is used to project expressions for a table's data rows into expressions on the
* table's partition values. The evaluator returned by this function is inclusive and will build
* expressions with the following guarantee: if the original expression matches a row, then the
* projected expression will match that row's partition.
* <p>
* Each predicate in the expression is projected using
* {@link Transform#project(String, BoundPredicate)}.
*
* @param spec a partition spec
* @param caseSensitive whether the Projection should consider case sensitivity on column names or not.
* @return an inclusive projection evaluator for the partition spec
* @see Transform#project(String, BoundPredicate) Inclusive transform used for each predicate
*/
public static ProjectionEvaluator inclusive(PartitionSpec spec, boolean caseSensitive) {
return new InclusiveProjection(spec, caseSensitive);
}

/**
* Creates a strict {@code ProjectionEvaluator} for the {@link PartitionSpec spec}, defaulting
* to case sensitive mode.
* <p>
* An evaluator is used to project expressions for a table's data rows into expressions on the
* table's partition values. The evaluator returned by this function is strict and will build
Expand All @@ -88,14 +110,36 @@ public static ProjectionEvaluator inclusive(PartitionSpec spec) {
* @see Transform#projectStrict(String, BoundPredicate) Strict transform used for each predicate
*/
public static ProjectionEvaluator strict(PartitionSpec spec) {
return new StrictProjection(spec);
return new StrictProjection(spec, true);
}

/**
* Creates a strict {@code ProjectionEvaluator} for the {@link PartitionSpec spec}.
* <p>
* An evaluator is used to project expressions for a table's data rows into expressions on the
* table's partition values. The evaluator returned by this function is strict and will build
* expressions with the following guarantee: if the projected expression matches a partition,
* then the original expression will match all rows in that partition.
* <p>
* Each predicate in the expression is projected using
* {@link Transform#projectStrict(String, BoundPredicate)}.
*
* @param spec a partition spec
* @param caseSensitive whether the Projection should consider case sensitivity on column names or not.
* @return a strict projection evaluator for the partition spec
* @see Transform#projectStrict(String, BoundPredicate) Strict transform used for each predicate
*/
public static ProjectionEvaluator strict(PartitionSpec spec, boolean caseSensitive) {
return new StrictProjection(spec, caseSensitive);
}

private static class BaseProjectionEvaluator extends ProjectionEvaluator {
final PartitionSpec spec;
final boolean caseSensitive;

private BaseProjectionEvaluator(PartitionSpec spec) {
private BaseProjectionEvaluator(PartitionSpec spec, boolean caseSensitive) {
this.spec = spec;
this.caseSensitive = caseSensitive;
}

@Override
Expand Down Expand Up @@ -135,7 +179,7 @@ public Expression or(Expression leftResult, Expression rightResult) {

@Override
public <T> Expression predicate(UnboundPredicate<T> pred) {
Expression bound = pred.bind(spec.schema().asStruct(), true);
Expression bound = pred.bind(spec.schema().asStruct(), caseSensitive);

if (bound instanceof BoundPredicate) {
return predicate((BoundPredicate<?>) bound);
Expand All @@ -146,8 +190,8 @@ public <T> Expression predicate(UnboundPredicate<T> pred) {
}

private static class InclusiveProjection extends BaseProjectionEvaluator {
private InclusiveProjection(PartitionSpec spec) {
super(spec);
private InclusiveProjection(PartitionSpec spec, boolean caseSensitive) {
super(spec, caseSensitive);
}

@Override
Expand All @@ -171,8 +215,8 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
}

private static class StrictProjection extends BaseProjectionEvaluator {
private StrictProjection(PartitionSpec spec) {
super(spec);
private StrictProjection(PartitionSpec spec, boolean caseSensitive) {
super(spec, caseSensitive);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
public class ResidualEvaluator implements Serializable {
private final PartitionSpec spec;
private final Expression expr;
private final boolean caseSensitive;
private transient ThreadLocal<ResidualVisitor> visitors = null;

private ResidualVisitor visitor() {
Expand All @@ -58,9 +59,10 @@ private ResidualVisitor visitor() {
return visitors.get();
}

public ResidualEvaluator(PartitionSpec spec, Expression expr) {
public ResidualEvaluator(PartitionSpec spec, Expression expr, boolean caseSensitive) {
this.spec = spec;
this.expr = expr;
this.caseSensitive = caseSensitive;
}

/**
Expand Down Expand Up @@ -170,7 +172,7 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
.projectStrict(part.name(), pred);

if (strictProjection != null) {
Expression bound = strictProjection.bind(spec.partitionType(), true);
Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive);
if (bound instanceof BoundPredicate) {
// the predicate methods will evaluate and return alwaysTrue or alwaysFalse
return super.predicate((BoundPredicate<?>) bound);
Expand All @@ -184,7 +186,7 @@ public <T> Expression predicate(BoundPredicate<T> pred) {

@Override
public <T> Expression predicate(UnboundPredicate<T> pred) {
Expression bound = pred.bind(spec.schema().asStruct(), true);
Expression bound = pred.bind(spec.schema().asStruct(), caseSensitive);

if (bound instanceof BoundPredicate) {
Expression boundResidual = predicate((BoundPredicate<?>) bound);
Expand Down
9 changes: 9 additions & 0 deletions api/src/main/java/com/netflix/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.iceberg.Schema;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -79,6 +81,13 @@ public static Map<String, Integer> indexByName(Types.StructType struct) {
return visit(struct, new IndexByName());
}

public static Map<String, Integer> indexByLowerCaseName(Types.StructType struct) {
Map<String, Integer> indexByLowerCaseName = Maps.newHashMap();
indexByName(struct).forEach( (name, integer) ->
indexByLowerCaseName.put(name.toLowerCase(Locale.ROOT), integer));
return indexByLowerCaseName;
}

public static Map<Integer, Types.NestedField> indexById(Types.StructType struct) {
return visit(struct, new IndexById());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.netflix.iceberg.expressions;

import com.netflix.iceberg.TestHelpers;
import com.netflix.iceberg.exceptions.ValidationException;
import com.netflix.iceberg.types.Types;
import com.netflix.iceberg.types.Types.StructType;
import org.apache.avro.util.Utf8;
Expand Down Expand Up @@ -145,6 +146,23 @@ public void testNot() {
Assert.assertTrue("not(8 == 7) => false", evaluator.eval(TestHelpers.Row.of(8)));
}

@Test
public void testCaseInsensitiveNot() {
Evaluator evaluator = new Evaluator(STRUCT, not(equal("X", 7)), false);
Assert.assertFalse("not(7 == 7) => false", evaluator.eval(TestHelpers.Row.of(7)));
Assert.assertTrue("not(8 == 7) => false", evaluator.eval(TestHelpers.Row.of(8)));
}

@Test
public void testCaseSensitiveNot() {
TestHelpers.assertThrows(
"X != x when case sensitivity is on",
ValidationException.class,
"Cannot find field 'X' in struct",
() -> { new Evaluator(STRUCT, not(equal("X", 7)), true); }
);
}

@Test
public void testCharSeqValue() {
StructType struct = StructType.of(required(34, "s", Types.StringType.get()));
Expand Down
Loading