diff --git a/api/src/main/java/com/netflix/iceberg/Schema.java b/api/src/main/java/com/netflix/iceberg/Schema.java index 5662c12235e9..ee21439d038a 100644 --- a/api/src/main/java/com/netflix/iceberg/Schema.java +++ b/api/src/main/java/com/netflix/iceberg/Schema.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -46,6 +47,7 @@ public class Schema implements Serializable { private transient BiMap aliasToId = null; private transient Map idToField = null; private transient BiMap nameToId = null; + private transient BiMap lowerCaseNameToId = null; public Schema(List columns, Map aliases) { this.struct = Types.StructType.of(columns); @@ -70,6 +72,13 @@ private BiMap lazyNameToId() { return nameToId; } + private BiMap lazyLowerCaseNameToId() { + if (lowerCaseNameToId == null) { + this.lowerCaseNameToId = ImmutableBiMap.copyOf(TypeUtil.indexByLowerCaseName(struct)); + } + return lowerCaseNameToId; + } + public Schema(Types.NestedField... columns) { this(Arrays.asList(columns)); } @@ -207,13 +216,47 @@ public Schema select(String... names) { * @return a projection schema from this schema, by name */ public Schema select(Collection names) { + return internalSelect(names, true); + } + + /** + * Creates a projection schema for a subset of columns, selected by case insensitive name + *

+ * Names that identify nested fields will select part or all of the field's top-level column. + * + * @param names a List of String names for selected columns + * @return a projection schema from this schema, by name + */ + public Schema caseInsensitiveSelect(String... names) { + return caseInsensitiveSelect(Arrays.asList(names)); + } + + /** + * Creates a projection schema for a subset of columns, selected by case insensitive name + *

+ * Names that identify nested fields will select part or all of the field's top-level column. + * + * @param names a List of String names for selected columns + * @return a projection schema from this schema, by name + */ + public Schema caseInsensitiveSelect(Collection names) { + return internalSelect(names, false); + } + + private Schema internalSelect(Collection names, boolean caseSensitive) { if (names.contains(ALL_COLUMNS)) { return this; } Set selected = Sets.newHashSet(); for (String name : names) { - Integer id = lazyNameToId().get(name); + Integer id; + if (caseSensitive) { + id = lazyNameToId().get(name); + } else { + id = lazyLowerCaseNameToId().get(name.toLowerCase(Locale.ROOT)); + } + if (id != null) { selected.add(id); } diff --git a/api/src/main/java/com/netflix/iceberg/TableScan.java b/api/src/main/java/com/netflix/iceberg/TableScan.java index 11df3be609ad..eaab97858ad7 100644 --- a/api/src/main/java/com/netflix/iceberg/TableScan.java +++ b/api/src/main/java/com/netflix/iceberg/TableScan.java @@ -66,6 +66,15 @@ public interface TableScan { */ TableScan project(Schema schema); + /** + * Create a new {@link TableScan} from this that, if data columns where selected + * via {@link #select(java.util.Collection)}, controls whether the match to the schema will be done + * with case sensitivity. + * + * @return a new scan based on this with case sensitivity as stated + */ + TableScan caseSensitive(boolean caseSensitive); + /** * Create a new {@link TableScan} from this that will read the given data columns. This produces * an expected schema that includes all fields that are either selected or used by this scan's @@ -83,8 +92,8 @@ default TableScan select(String... columns) { * an expected schema that includes all fields that are either selected or used by this scan's * filter expression. * - * @param columns column names from the manifest file schema - * @return a new scan based on this with the given manifest columns + * @param columns column names from the table's schema + * @return a new scan based on this with the given projection columns */ TableScan select(Collection columns); @@ -136,4 +145,10 @@ default TableScan select(String... columns) { * @return this scan's filter expression */ Expression filter(); + + /** + * Returns whether this scan should apply column name case sensitiveness as per {@link #caseSensitive(boolean)}. + * @return true if case sensitive, false otherwise. + */ + boolean isCaseSensitive(); } diff --git a/api/src/main/java/com/netflix/iceberg/expressions/Evaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/Evaluator.java index fc6eacdc1a24..dacaba922d64 100644 --- a/api/src/main/java/com/netflix/iceberg/expressions/Evaluator.java +++ b/api/src/main/java/com/netflix/iceberg/expressions/Evaluator.java @@ -47,6 +47,10 @@ public Evaluator(Types.StructType struct, Expression unbound) { this.expr = Binder.bind(struct, unbound, true); } + public Evaluator(Types.StructType struct, Expression unbound, boolean caseSensitive) { + this.expr = Binder.bind(struct, unbound, caseSensitive); + } + public boolean eval(StructLike data) { return visitor().eval(data); } diff --git a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java index 6493273a543b..203ae788b095 100644 --- a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java +++ b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java @@ -53,8 +53,15 @@ private ManifestEvalVisitor visitor() { } public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) { + this(spec, rowFilter, true); + } + + public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter, boolean caseSensitive) { this.struct = spec.partitionType(); - this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)), true); + this.expr = Binder.bind( + struct, + rewriteNot(Projections.inclusive(spec, caseSensitive).project(rowFilter)), + caseSensitive); } /** diff --git a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveMetricsEvaluator.java index 54cc0be7f649..b8b0c48fc2bb 100644 --- a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveMetricsEvaluator.java +++ b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveMetricsEvaluator.java @@ -44,6 +44,7 @@ public class InclusiveMetricsEvaluator { private final Schema schema; private final StructType struct; private final Expression expr; + private final boolean caseSensitive; private transient ThreadLocal visitors = null; private MetricsEvalVisitor visitor() { @@ -53,10 +54,15 @@ private MetricsEvalVisitor visitor() { return visitors.get(); } - public InclusiveMetricsEvaluator(Schema schema, Expression unbound) { + InclusiveMetricsEvaluator(Schema schema, Expression unbound) { + this(schema, unbound, true); + } + + public InclusiveMetricsEvaluator(Schema schema, Expression unbound, boolean caseSensitive) { this.schema = schema; this.struct = schema.asStruct(); - this.expr = Binder.bind(struct, rewriteNot(unbound), true); + this.caseSensitive = caseSensitive; + this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive); } /** diff --git a/api/src/main/java/com/netflix/iceberg/expressions/Projections.java b/api/src/main/java/com/netflix/iceberg/expressions/Projections.java index b811e27d9d99..d54a33fdb251 100644 --- a/api/src/main/java/com/netflix/iceberg/expressions/Projections.java +++ b/api/src/main/java/com/netflix/iceberg/expressions/Projections.java @@ -54,7 +54,8 @@ public static abstract class ProjectionEvaluator extends ExpressionVisitor * An evaluator is used to project expressions for a table's data rows into expressions on the * table's partition values. The evaluator returned by this function is inclusive and will build @@ -69,11 +70,32 @@ public static abstract class ProjectionEvaluator extends ExpressionVisitor + * An evaluator is used to project expressions for a table's data rows into expressions on the + * table's partition values. The evaluator returned by this function is inclusive and will build + * expressions with the following guarantee: if the original expression matches a row, then the + * projected expression will match that row's partition. + *

+ * Each predicate in the expression is projected using + * {@link Transform#project(String, BoundPredicate)}. + * + * @param spec a partition spec + * @param caseSensitive whether the Projection should consider case sensitivity on column names or not. + * @return an inclusive projection evaluator for the partition spec + * @see Transform#project(String, BoundPredicate) Inclusive transform used for each predicate + */ + public static ProjectionEvaluator inclusive(PartitionSpec spec, boolean caseSensitive) { + return new InclusiveProjection(spec, caseSensitive); + } + + /** + * Creates a strict {@code ProjectionEvaluator} for the {@link PartitionSpec spec}, defaulting + * to case sensitive mode. *

* An evaluator is used to project expressions for a table's data rows into expressions on the * table's partition values. The evaluator returned by this function is strict and will build @@ -88,14 +110,36 @@ public static ProjectionEvaluator inclusive(PartitionSpec spec) { * @see Transform#projectStrict(String, BoundPredicate) Strict transform used for each predicate */ public static ProjectionEvaluator strict(PartitionSpec spec) { - return new StrictProjection(spec); + return new StrictProjection(spec, true); + } + + /** + * Creates a strict {@code ProjectionEvaluator} for the {@link PartitionSpec spec}. + *

+ * An evaluator is used to project expressions for a table's data rows into expressions on the + * table's partition values. The evaluator returned by this function is strict and will build + * expressions with the following guarantee: if the projected expression matches a partition, + * then the original expression will match all rows in that partition. + *

+ * Each predicate in the expression is projected using + * {@link Transform#projectStrict(String, BoundPredicate)}. + * + * @param spec a partition spec + * @param caseSensitive whether the Projection should consider case sensitivity on column names or not. + * @return a strict projection evaluator for the partition spec + * @see Transform#projectStrict(String, BoundPredicate) Strict transform used for each predicate + */ + public static ProjectionEvaluator strict(PartitionSpec spec, boolean caseSensitive) { + return new StrictProjection(spec, caseSensitive); } private static class BaseProjectionEvaluator extends ProjectionEvaluator { final PartitionSpec spec; + final boolean caseSensitive; - private BaseProjectionEvaluator(PartitionSpec spec) { + private BaseProjectionEvaluator(PartitionSpec spec, boolean caseSensitive) { this.spec = spec; + this.caseSensitive = caseSensitive; } @Override @@ -135,7 +179,7 @@ public Expression or(Expression leftResult, Expression rightResult) { @Override public Expression predicate(UnboundPredicate pred) { - Expression bound = pred.bind(spec.schema().asStruct(), true); + Expression bound = pred.bind(spec.schema().asStruct(), caseSensitive); if (bound instanceof BoundPredicate) { return predicate((BoundPredicate) bound); @@ -146,8 +190,8 @@ public Expression predicate(UnboundPredicate pred) { } private static class InclusiveProjection extends BaseProjectionEvaluator { - private InclusiveProjection(PartitionSpec spec) { - super(spec); + private InclusiveProjection(PartitionSpec spec, boolean caseSensitive) { + super(spec, caseSensitive); } @Override @@ -171,8 +215,8 @@ public Expression predicate(BoundPredicate pred) { } private static class StrictProjection extends BaseProjectionEvaluator { - private StrictProjection(PartitionSpec spec) { - super(spec); + private StrictProjection(PartitionSpec spec, boolean caseSensitive) { + super(spec, caseSensitive); } @Override diff --git a/api/src/main/java/com/netflix/iceberg/expressions/ResidualEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/ResidualEvaluator.java index 610bdc5a13f9..dff24e7241c6 100644 --- a/api/src/main/java/com/netflix/iceberg/expressions/ResidualEvaluator.java +++ b/api/src/main/java/com/netflix/iceberg/expressions/ResidualEvaluator.java @@ -49,6 +49,7 @@ public class ResidualEvaluator implements Serializable { private final PartitionSpec spec; private final Expression expr; + private final boolean caseSensitive; private transient ThreadLocal visitors = null; private ResidualVisitor visitor() { @@ -58,9 +59,10 @@ private ResidualVisitor visitor() { return visitors.get(); } - public ResidualEvaluator(PartitionSpec spec, Expression expr) { + public ResidualEvaluator(PartitionSpec spec, Expression expr, boolean caseSensitive) { this.spec = spec; this.expr = expr; + this.caseSensitive = caseSensitive; } /** @@ -170,7 +172,7 @@ public Expression predicate(BoundPredicate pred) { .projectStrict(part.name(), pred); if (strictProjection != null) { - Expression bound = strictProjection.bind(spec.partitionType(), true); + Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive); if (bound instanceof BoundPredicate) { // the predicate methods will evaluate and return alwaysTrue or alwaysFalse return super.predicate((BoundPredicate) bound); @@ -184,7 +186,7 @@ public Expression predicate(BoundPredicate pred) { @Override public Expression predicate(UnboundPredicate pred) { - Expression bound = pred.bind(spec.schema().asStruct(), true); + Expression bound = pred.bind(spec.schema().asStruct(), caseSensitive); if (bound instanceof BoundPredicate) { Expression boundResidual = predicate((BoundPredicate) bound); diff --git a/api/src/main/java/com/netflix/iceberg/types/TypeUtil.java b/api/src/main/java/com/netflix/iceberg/types/TypeUtil.java index 6f2ca7886ad8..81bb4227a600 100644 --- a/api/src/main/java/com/netflix/iceberg/types/TypeUtil.java +++ b/api/src/main/java/com/netflix/iceberg/types/TypeUtil.java @@ -24,9 +24,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.netflix.iceberg.Schema; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -79,6 +81,13 @@ public static Map indexByName(Types.StructType struct) { return visit(struct, new IndexByName()); } + public static Map indexByLowerCaseName(Types.StructType struct) { + Map indexByLowerCaseName = Maps.newHashMap(); + indexByName(struct).forEach( (name, integer) -> + indexByLowerCaseName.put(name.toLowerCase(Locale.ROOT), integer)); + return indexByLowerCaseName; + } + public static Map indexById(Types.StructType struct) { return visit(struct, new IndexById()); } diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestEvaluatior.java b/api/src/test/java/com/netflix/iceberg/expressions/TestEvaluatior.java index 4723f83a0977..0c08903f0b80 100644 --- a/api/src/test/java/com/netflix/iceberg/expressions/TestEvaluatior.java +++ b/api/src/test/java/com/netflix/iceberg/expressions/TestEvaluatior.java @@ -20,6 +20,7 @@ package com.netflix.iceberg.expressions; import com.netflix.iceberg.TestHelpers; +import com.netflix.iceberg.exceptions.ValidationException; import com.netflix.iceberg.types.Types; import com.netflix.iceberg.types.Types.StructType; import org.apache.avro.util.Utf8; @@ -145,6 +146,23 @@ public void testNot() { Assert.assertTrue("not(8 == 7) => false", evaluator.eval(TestHelpers.Row.of(8))); } + @Test + public void testCaseInsensitiveNot() { + Evaluator evaluator = new Evaluator(STRUCT, not(equal("X", 7)), false); + Assert.assertFalse("not(7 == 7) => false", evaluator.eval(TestHelpers.Row.of(7))); + Assert.assertTrue("not(8 == 7) => false", evaluator.eval(TestHelpers.Row.of(8))); + } + + @Test + public void testCaseSensitiveNot() { + TestHelpers.assertThrows( + "X != x when case sensitivity is on", + ValidationException.class, + "Cannot find field 'X' in struct", + () -> { new Evaluator(STRUCT, not(equal("X", 7)), true); } + ); + } + @Test public void testCharSeqValue() { StructType struct = StructType.of(required(34, "s", Types.StringType.get())); diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java index f92f70014fe1..b65a61c4a3df 100644 --- a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java +++ b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java @@ -292,4 +292,35 @@ public void testIntegerNotEqRewritten() { shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE); Assert.assertTrue("Should read: id above upper bound", shouldRead); } + + @Test + public void testCaseInsensitiveIntegerNotEqRewritten() { + boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 5)), false).eval(FILE); + Assert.assertTrue("Should read: id below lower bound", shouldRead); + + shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 29)), false).eval(FILE); + Assert.assertTrue("Should read: id below lower bound", shouldRead); + + shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 30)), false).eval(FILE); + Assert.assertTrue("Should read: id equal to lower bound", shouldRead); + + shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 75)), false).eval(FILE); + Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead); + + shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 79)), false).eval(FILE); + Assert.assertTrue("Should read: id equal to upper bound", shouldRead); + + shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 80)), false).eval(FILE); + Assert.assertTrue("Should read: id above upper bound", shouldRead); + + shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 85)), false).eval(FILE); + Assert.assertTrue("Should read: id above upper bound", shouldRead); + } + + @Test + public void testCaseSensitiveIntegerNotEqRewritten() { + TestHelpers.assertThrows("Should complain about missing column in expression", + ValidationException.class, "Cannot find field 'ID'", + () -> new InclusiveManifestEvaluator(SPEC, not(equal("ID", 5)), true).eval(FILE)); + } } diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveMetricsEvaluator.java b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveMetricsEvaluator.java index bbc96ceb52da..12b8d0d9df24 100644 --- a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveMetricsEvaluator.java +++ b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveMetricsEvaluator.java @@ -312,4 +312,33 @@ public void testIntegerNotEqRewritten() { shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("id", 85))).eval(FILE); Assert.assertTrue("Should read: id above upper bound", shouldRead); } + + @Test + public void testCaseInsensitiveIntegerNotEqRewritten() { + boolean shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 5)), false).eval(FILE); + Assert.assertTrue("Should read: id below lower bound", shouldRead); + + shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 29)), false).eval(FILE); + Assert.assertTrue("Should read: id below lower bound", shouldRead); + + shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 30)), false).eval(FILE); + Assert.assertTrue("Should read: id equal to lower bound", shouldRead); + + shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 75)), false).eval(FILE); + Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead); + + shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 79)), false).eval(FILE); + Assert.assertTrue("Should read: id equal to upper bound", shouldRead); + + shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 80)), false).eval(FILE); + Assert.assertTrue("Should read: id above upper bound", shouldRead); + + shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 85)), false).eval(FILE); + Assert.assertTrue("Should read: id above upper bound", shouldRead); + } + + @Test(expected = ValidationException.class) + public void testCaseSensitiveIntegerNotEqRewritten() { + boolean shouldRead = new InclusiveMetricsEvaluator(SCHEMA, not(equal("ID", 5)), true).eval(FILE); + } } diff --git a/api/src/test/java/com/netflix/iceberg/transforms/TestProjection.java b/api/src/test/java/com/netflix/iceberg/transforms/TestProjection.java index f08fe3fef53b..546cb853ac14 100644 --- a/api/src/test/java/com/netflix/iceberg/transforms/TestProjection.java +++ b/api/src/test/java/com/netflix/iceberg/transforms/TestProjection.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.netflix.iceberg.Schema; +import com.netflix.iceberg.exceptions.ValidationException; import com.netflix.iceberg.expressions.BoundPredicate; import com.netflix.iceberg.expressions.Expression; import com.netflix.iceberg.expressions.Expressions; @@ -35,6 +36,7 @@ import static com.netflix.iceberg.TestHelpers.assertAndUnwrap; import static com.netflix.iceberg.TestHelpers.assertAndUnwrapUnbound; +import static com.netflix.iceberg.TestHelpers.assertThrows; import static com.netflix.iceberg.expressions.Expressions.and; import static com.netflix.iceberg.expressions.Expressions.equal; import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual; @@ -86,6 +88,56 @@ public void testIdentityProjection() { } } + @Test + public void testCaseInsensitiveIdentityProjection() { + List> predicates = Lists.newArrayList( + Expressions.notNull("ID"), + Expressions.isNull("ID"), + Expressions.lessThan("ID", 100), + Expressions.lessThanOrEqual("ID", 101), + Expressions.greaterThan("ID", 102), + Expressions.greaterThanOrEqual("ID", 103), + Expressions.equal("ID", 104), + Expressions.notEqual("ID", 105) + ); + + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("id") + .build(); + + for (UnboundPredicate predicate : predicates) { + // get the projected predicate + Expression expr = Projections.inclusive(spec, false).project(predicate); + UnboundPredicate projected = assertAndUnwrapUnbound(expr); + + // check inclusive the bound predicate to ensure the types are correct + BoundPredicate bound = assertAndUnwrap(predicate.bind(spec.schema().asStruct(), false)); + + Assert.assertEquals("Field name should match partition struct field", + "id", projected.ref().name()); + Assert.assertEquals("Operation should match", bound.op(), projected.op()); + + if (bound.literal() != null) { + Assert.assertEquals("Literal should be equal", + bound.literal().value(), projected.literal().value()); + } else { + Assert.assertNull("Literal should be null", projected.literal()); + } + } + } + + @Test + public void testCaseSensitiveIdentityProjection() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("id") + .build(); + + assertThrows("X != x when case sensitivity is on", + ValidationException.class, + "Cannot find field 'ID' in struct", + () -> { Projections.inclusive(spec, true).project(Expressions.notNull("ID")); }); + } + @Test public void testStrictIdentityProjection() { List> predicates = Lists.newArrayList( @@ -124,6 +176,57 @@ public void testStrictIdentityProjection() { } } + @Test + public void testCaseInsensitiveStrictIdentityProjection() { + List> predicates = Lists.newArrayList( + Expressions.notNull("ID"), + Expressions.isNull("ID"), + Expressions.lessThan("ID", 100), + Expressions.lessThanOrEqual("ID", 101), + Expressions.greaterThan("ID", 102), + Expressions.greaterThanOrEqual("ID", 103), + Expressions.equal("ID", 104), + Expressions.notEqual("ID", 105) + ); + + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("id") + .build(); + + for (UnboundPredicate predicate : predicates) { + // get the projected predicate + Expression expr = Projections.strict(spec, false).project(predicate); + UnboundPredicate projected = assertAndUnwrapUnbound(expr); + + // check inclusive the bound predicate to ensure the types are correct + BoundPredicate bound = assertAndUnwrap(predicate.bind(spec.schema().asStruct(), false)); + + Assert.assertEquals("Field name should match partition struct field", + "id", projected.ref().name()); + Assert.assertEquals("Operation should match", bound.op(), projected.op()); + + if (bound.literal() != null) { + Assert.assertEquals("Literal should be equal", + bound.literal().value(), projected.literal().value()); + } else { + Assert.assertNull("Literal should be null", projected.literal()); + } + } + } + + @Test + public void testCaseSensitiveStrictIdentityProjection() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("id") + .build(); + + assertThrows( + "X != x when case sensitivity is on", + ValidationException.class, + "Cannot find field 'ID' in struct", + () -> { Projections.strict(spec, true).project(Expressions.notNull("ID")); }); + } + @Test public void testBadSparkPartitionFilter() { // this tests a case that results in a full table scan in Spark with Hive tables. because the diff --git a/api/src/test/java/com/netflix/iceberg/transforms/TestResiduals.java b/api/src/test/java/com/netflix/iceberg/transforms/TestResiduals.java index 27b1c11604a3..ec0cbe7e143f 100644 --- a/api/src/test/java/com/netflix/iceberg/transforms/TestResiduals.java +++ b/api/src/test/java/com/netflix/iceberg/transforms/TestResiduals.java @@ -21,6 +21,7 @@ import com.netflix.iceberg.Schema; import com.netflix.iceberg.TestHelpers.Row; +import com.netflix.iceberg.exceptions.ValidationException; import com.netflix.iceberg.expressions.Expression; import com.netflix.iceberg.expressions.ResidualEvaluator; import com.netflix.iceberg.expressions.UnboundPredicate; @@ -55,7 +56,8 @@ public void testIdentityTransformResiduals() { ResidualEvaluator resEval = new ResidualEvaluator(spec, or(or( and(lessThan("dateint", 20170815), greaterThan("dateint", 20170801)), and(equal("dateint", 20170815), lessThan("hour", 12))), - and(equal("dateint", 20170801), greaterThan("hour", 11))) + and(equal("dateint", 20170801), greaterThan("hour", 11))), + true ); // equal to the upper date bound @@ -80,4 +82,59 @@ public void testIdentityTransformResiduals() { residual = resEval.residualFor(Row.of(20170817)); Assert.assertEquals("Residual should be alwaysFalse", alwaysFalse(), residual); } + + @Test + public void testCaseInsensitiveIdentityTransformResiduals() { + Schema schema = new Schema( + Types.NestedField.optional(50, "dateint", Types.IntegerType.get()), + Types.NestedField.optional(51, "hour", Types.IntegerType.get())); + + PartitionSpec spec = PartitionSpec.builderFor(schema) + .identity("dateint") + .build(); + + ResidualEvaluator resEval = new ResidualEvaluator(spec, or(or( + and(lessThan("DATEINT", 20170815), greaterThan("dateint", 20170801)), + and(equal("dateint", 20170815), lessThan("HOUR", 12))), + and(equal("DateInt", 20170801), greaterThan("hOUr", 11))), + false); + + // equal to the upper date bound + Expression residual = resEval.residualFor(Row.of(20170815)); + UnboundPredicate unbound = assertAndUnwrapUnbound(residual); + Assert.assertEquals("Residual should be hour < 12", LT, unbound.op()); + Assert.assertEquals("Residual should be hour < 12", "HOUR", unbound.ref().name()); + Assert.assertEquals("Residual should be hour < 12", 12, unbound.literal().value()); + + // equal to the lower date bound + residual = resEval.residualFor(Row.of(20170801)); + unbound = assertAndUnwrapUnbound(residual); + Assert.assertEquals("Residual should be hour > 11", GT, unbound.op()); + Assert.assertEquals("Residual should be hour > 11", "hOUr", unbound.ref().name()); + Assert.assertEquals("Residual should be hour > 11", 11, unbound.literal().value()); + + // inside the date range + residual = resEval.residualFor(Row.of(20170812)); + Assert.assertEquals("Residual should be alwaysTrue", alwaysTrue(), residual); + + // outside the date range + residual = resEval.residualFor(Row.of(20170817)); + Assert.assertEquals("Residual should be alwaysFalse", alwaysFalse(), residual); + } + + @Test(expected = ValidationException.class) + public void testCaseSensitiveIdentityTransformResiduals() { + Schema schema = new Schema( + Types.NestedField.optional(50, "dateint", Types.IntegerType.get()), + Types.NestedField.optional(51, "hour", Types.IntegerType.get()) + ); + + PartitionSpec spec = PartitionSpec.builderFor(schema) + .identity("dateint") + .build(); + + ResidualEvaluator resEval = new ResidualEvaluator(spec, lessThan("DATEINT", 20170815), true); + + resEval.residualFor(Row.of(20170815)); + } } diff --git a/api/src/test/java/com/netflix/iceberg/types/TestReadabilityChecks.java b/api/src/test/java/com/netflix/iceberg/types/TestReadabilityChecks.java index 6b743386a6a3..90ccbed44ec0 100644 --- a/api/src/test/java/com/netflix/iceberg/types/TestReadabilityChecks.java +++ b/api/src/test/java/com/netflix/iceberg/types/TestReadabilityChecks.java @@ -373,4 +373,23 @@ public void testStructReadReordering() { List errors = CheckCompatibility.readCompatibilityErrors(read, write); Assert.assertEquals("Should produce no error messages", 0, errors.size()); } + + @Test + public void testCaseInsensitiveSchemaProjection() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + ) + )) + ); + + Assert.assertNotNull(schema.caseInsensitiveSelect("ID").findField(0)); + Assert.assertNotNull(schema.caseInsensitiveSelect("loCATIONs").findField(5)); + Assert.assertNotNull(schema.caseInsensitiveSelect("LoCaTiOnS.LaT").findField(1)); + Assert.assertNotNull(schema.caseInsensitiveSelect("locations.LONG").findField(2)); + } } diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java index 7fe5964251fc..b9a107f1f4e5 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java @@ -73,17 +73,22 @@ class BaseTableScan implements TableScan { private final Long snapshotId; private final Schema schema; private final Expression rowFilter; + private final boolean caseSensitive; + private final Collection selectedColumns; BaseTableScan(TableOperations ops, Table table) { - this(ops, table, null, table.schema(), Expressions.alwaysTrue()); + this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, null); } - private BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter) { + private BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema, + Expression rowFilter, boolean caseSensitive, Collection selectedColumns) { this.ops = ops; this.table = table; this.snapshotId = snapshotId; this.schema = schema; this.rowFilter = rowFilter; + this.caseSensitive = caseSensitive; + this.selectedColumns = selectedColumns; } @Override @@ -97,7 +102,7 @@ public TableScan useSnapshot(long snapshotId) { "Cannot override snapshot, already set to id=%s", snapshotId); Preconditions.checkArgument(ops.current().snapshot(snapshotId) != null, "Cannot find snapshot with ID %s", snapshotId); - return new BaseTableScan(ops, table, snapshotId, schema, rowFilter); + return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, selectedColumns); } @Override @@ -121,28 +126,23 @@ public TableScan asOfTime(long timestampMillis) { } public TableScan project(Schema schema) { - return new BaseTableScan(ops, table, snapshotId, schema, rowFilter); + return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, selectedColumns); } @Override - public TableScan select(Collection columns) { - Set requiredFieldIds = Sets.newHashSet(); - - // all of the filter columns are required - requiredFieldIds.addAll( - Binder.boundReferences(table.schema().asStruct(), Collections.singletonList(rowFilter), true)); - - // all of the projection columns are required - requiredFieldIds.addAll(TypeUtil.getProjectedIds(table.schema().select(columns))); - - Schema projection = TypeUtil.select(table.schema(), requiredFieldIds); + public TableScan caseSensitive(boolean caseSensitive) { + return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, selectedColumns); + } - return new BaseTableScan(ops, table, snapshotId, projection, rowFilter); + @Override + public TableScan select(Collection columns) { + return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, columns); } @Override public TableScan filter(Expression expr) { - return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr)); + return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), + caseSensitive, selectedColumns); } private final LoadingCache EVAL_CACHE = CacheBuilder @@ -151,7 +151,7 @@ public TableScan filter(Expression expr) { @Override public InclusiveManifestEvaluator load(Integer specId) { PartitionSpec spec = ops.current().spec(specId); - return new InclusiveManifestEvaluator(spec, rowFilter); + return new InclusiveManifestEvaluator(spec, rowFilter, caseSensitive); } }); @@ -167,7 +167,7 @@ public CloseableIterable planFiles() { rowFilter); Listeners.notifyAll( - new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema)); + new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema())); Iterable matchingManifests = Iterables.filter(snapshot.manifests(), manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest)); @@ -176,11 +176,13 @@ public CloseableIterable planFiles() { Iterable> readers = Iterables.transform( matchingManifests, manifest -> { - ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path())); + ManifestReader reader = ManifestReader + .read(ops.io().newInputFile(manifest.path())) + .caseSensitive(caseSensitive); toClose.add(reader); String schemaString = SchemaParser.toJson(reader.spec().schema()); String specString = PartitionSpecParser.toJson(reader.spec()); - ResidualEvaluator residuals = new ResidualEvaluator(reader.spec(), rowFilter); + ResidualEvaluator residuals = new ResidualEvaluator(reader.spec(), rowFilter, caseSensitive); return Iterables.transform( reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS), file -> new BaseFileScanTask(file, schemaString, specString, residuals) @@ -216,7 +218,7 @@ public CloseableIterable planTasks() { @Override public Schema schema() { - return schema; + return lazyColumnProjection(); } @Override @@ -224,12 +226,18 @@ public Expression filter() { return rowFilter; } + @Override + public boolean isCaseSensitive() { + return caseSensitive; + } + @Override public String toString() { return Objects.toStringHelper(this) .add("table", table) - .add("projection", schema.asStruct()) + .add("projection", schema().asStruct()) .add("filter", rowFilter) + .add("caseSensitive", caseSensitive) .toString(); } @@ -241,4 +249,33 @@ private CloseableIterable splitFiles(long splitSize) { // Capture manifests which can be closed after scan planning return CloseableIterable.combine(splitTasks, ImmutableList.of(fileScanTasks)); } + + /** + * To be able to make refinements {@link #select(Collection)} and {@link #caseSensitive(boolean)} in any order, + * we resolve the schema to be projected lazily here. + * + * @return the Schema to project + */ + private Schema lazyColumnProjection() { + if (selectedColumns != null ) { + Set requiredFieldIds = Sets.newHashSet(); + + // all of the filter columns are required + requiredFieldIds.addAll( + Binder.boundReferences(table.schema().asStruct(), Collections.singletonList(rowFilter), caseSensitive)); + + // all of the projection columns are required + Set selectedIds; + if (caseSensitive) { + selectedIds = TypeUtil.getProjectedIds(table.schema().select(selectedColumns)); + } else { + selectedIds = TypeUtil.getProjectedIds(table.schema().caseInsensitiveSelect(selectedColumns)); + } + requiredFieldIds.addAll(selectedIds); + + return TypeUtil.select(table.schema(), requiredFieldIds); + } + + return schema; + } } diff --git a/core/src/main/java/com/netflix/iceberg/FilteredManifest.java b/core/src/main/java/com/netflix/iceberg/FilteredManifest.java index bea656c9202a..10c68cb0ec9b 100644 --- a/core/src/main/java/com/netflix/iceberg/FilteredManifest.java +++ b/core/src/main/java/com/netflix/iceberg/FilteredManifest.java @@ -36,23 +36,25 @@ public class FilteredManifest implements Filterable { private final Expression partFilter; private final Expression rowFilter; private final Collection columns; + private final boolean caseSensitive; // lazy state private Evaluator lazyEvaluator = null; private InclusiveMetricsEvaluator lazyMetricsEvaluator = null; FilteredManifest(ManifestReader reader, Expression partFilter, Expression rowFilter, - Collection columns) { + Collection columns, boolean caseSensitive) { Preconditions.checkNotNull(reader, "ManifestReader cannot be null"); this.reader = reader; this.partFilter = partFilter; this.rowFilter = rowFilter; this.columns = columns; + this.caseSensitive = caseSensitive; } @Override public FilteredManifest select(Collection columns) { - return new FilteredManifest(reader, partFilter, rowFilter, columns); + return new FilteredManifest(reader, partFilter, rowFilter, columns, caseSensitive); } @Override @@ -60,16 +62,18 @@ public FilteredManifest filterPartitions(Expression expr) { return new FilteredManifest(reader, Expressions.and(partFilter, expr), rowFilter, - columns); + columns, + caseSensitive); } @Override public FilteredManifest filterRows(Expression expr) { - Expression projected = Projections.inclusive(reader.spec()).project(expr); + Expression projected = Projections.inclusive(reader.spec(), caseSensitive).project(expr); return new FilteredManifest(reader, Expressions.and(partFilter, projected), Expressions.and(rowFilter, expr), - columns); + columns, + caseSensitive); } Iterable allEntries() { @@ -128,9 +132,9 @@ public Iterator iterator() { private Evaluator evaluator() { if (lazyEvaluator == null) { if (partFilter != null) { - this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), partFilter); + this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), partFilter, caseSensitive); } else { - this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), Expressions.alwaysTrue()); + this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), Expressions.alwaysTrue(), caseSensitive); } } return lazyEvaluator; @@ -140,10 +144,10 @@ private InclusiveMetricsEvaluator metricsEvaluator() { if (lazyMetricsEvaluator == null) { if (rowFilter != null) { this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator( - reader.spec().schema(), rowFilter); + reader.spec().schema(), rowFilter, caseSensitive); } else { this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator( - reader.spec().schema(), Expressions.alwaysTrue()); + reader.spec().schema(), Expressions.alwaysTrue(), caseSensitive); } } return lazyMetricsEvaluator; diff --git a/core/src/main/java/com/netflix/iceberg/ManifestReader.java b/core/src/main/java/com/netflix/iceberg/ManifestReader.java index f1128fc59ea6..9baaf0c3e21b 100644 --- a/core/src/main/java/com/netflix/iceberg/ManifestReader.java +++ b/core/src/main/java/com/netflix/iceberg/ManifestReader.java @@ -61,20 +61,33 @@ public class ManifestReader extends CloseableGroup implements Filterable metadata; private final PartitionSpec spec; private final Schema schema; + private final boolean caseSensitive; // lazily initialized private List adds = null; private List deletes = null; - private ManifestReader(InputFile file) { + private ManifestReader(InputFile file, boolean caseSensitive) { this.file = file; + this.caseSensitive = caseSensitive; try { try (AvroIterable headerReader = Avro.read(file) @@ -94,6 +107,27 @@ private ManifestReader(InputFile file) { this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec")); } + private ManifestReader(InputFile file, Map metadata, + PartitionSpec spec, Schema schema, boolean caseSensitive) { + this.file = file; + this.metadata = metadata; + this.spec = spec; + this.schema = schema; + this.caseSensitive = caseSensitive; + } + + /** + * Returns a new {@link ManifestReader} that, if filtered via {@link #select(java.util.Collection)}, + * {@link #filterPartitions(Expression)} or {@link #filterRows(Expression)}, will apply the specified + * case sensitivity for column name matching. + * + * @param caseSensitive whether column name matching should have case sensitivity + * @return a manifest reader with case sensitivity as stated + */ + public ManifestReader caseSensitive(boolean caseSensitive) { + return new ManifestReader(file, metadata, spec, schema, caseSensitive); + } + public InputFile file() { return file; } @@ -113,17 +147,21 @@ public Iterator iterator() { @Override public FilteredManifest select(Collection columns) { - return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), Lists.newArrayList(columns)); + return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), Lists.newArrayList(columns), caseSensitive); } @Override public FilteredManifest filterPartitions(Expression expr) { - return new FilteredManifest(this, expr, alwaysTrue(), ALL_COLUMNS); + return new FilteredManifest(this, expr, alwaysTrue(), ALL_COLUMNS, caseSensitive); } @Override public FilteredManifest filterRows(Expression expr) { - return new FilteredManifest(this, Projections.inclusive(spec).project(expr), expr, ALL_COLUMNS); + return new FilteredManifest(this, + Projections.inclusive(spec, caseSensitive).project(expr), + expr, + ALL_COLUMNS, + caseSensitive); } public List addedFiles() { diff --git a/core/src/test/java/com/netflix/iceberg/TestBaseTableScan.java b/core/src/test/java/com/netflix/iceberg/TestBaseTableScan.java new file mode 100644 index 000000000000..ae0a841e74d1 --- /dev/null +++ b/core/src/test/java/com/netflix/iceberg/TestBaseTableScan.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg; + +import com.netflix.iceberg.types.Types; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +import static com.netflix.iceberg.types.Types.NestedField.required; +import static org.junit.Assert.assertEquals; + +public class TestBaseTableScan { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private final Schema schema = new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()) + ); + private File tableDir = null; + + @Before + public void setupTableDir() throws IOException { + this.tableDir = temp.newFolder(); + } + + @After + public void cleanupTables() { + TestTables.clearTables(); + } + + @Test + public void testTableScanHonorsSelect() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = TestTables.create(tableDir, "test", schema, spec); + + TableScan scan = table.newScan().select("id"); + + Schema expectedSchema = new Schema( + required(1, "id", Types.IntegerType.get()) + ); + + assertEquals("A tableScan.select() should prune the schema", + expectedSchema.asStruct(), + scan.schema().asStruct()); + } + + @Test + public void testTableScanHonorsSelectWithoutCaseSensitivity() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = TestTables.create(tableDir, "test", schema, spec); + + TableScan scan1 = table.newScan().caseSensitive(false).select("ID"); + // order of refinements shouldn't matter + TableScan scan2 = table.newScan().select("ID").caseSensitive(false); + + Schema expectedSchema = new Schema( + required(1, "id", Types.IntegerType.get()) + ); + + assertEquals("A tableScan.select() should prune the schema without case sensitivity", + expectedSchema.asStruct(), + scan1.schema().asStruct()); + + assertEquals("A tableScan.select() should prune the schema regardless of scan refinement order", + expectedSchema.asStruct(), + scan2.schema().asStruct()); + } + +} diff --git a/core/src/test/java/com/netflix/iceberg/TestFilterFiles.java b/core/src/test/java/com/netflix/iceberg/TestFilterFiles.java index cafff003e758..22b0ae511db2 100644 --- a/core/src/test/java/com/netflix/iceberg/TestFilterFiles.java +++ b/core/src/test/java/com/netflix/iceberg/TestFilterFiles.java @@ -66,6 +66,13 @@ public void testFilterFilesUnpartitionedTable() { testFilterFiles(table); } + @Test + public void testCaseInsensitiveFilterFilesUnpartitionedTable() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = TestTables.create(tableDir, "test", schema, spec); + testCaseInsensitiveFilterFiles(table); + } + @Test public void testFilterFilesPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build(); @@ -73,6 +80,13 @@ public void testFilterFilesPartitionedTable() { testFilterFiles(table); } + @Test + public void testCaseInsensitiveFilterFilesPartitionedTable() { + PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build(); + Table table = TestTables.create(tableDir, "test", schema, spec); + testCaseInsensitiveFilterFiles(table); + } + private void testFilterFiles(Table table) { Map lowerBounds = new HashMap<>(); Map upperBounds = new HashMap<>(); @@ -98,4 +112,30 @@ private void testFilterFiles(Table table) { TableScan nonEmptyScan = table.newScan().filter(Expressions.equal("id", 1)); assertEquals(1, Iterables.size(nonEmptyScan.planFiles())); } + + private void testCaseInsensitiveFilterFiles(Table table) { + Map lowerBounds = new HashMap<>(); + Map upperBounds = new HashMap<>(); + lowerBounds.put(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)); + upperBounds.put(1, Conversions.toByteBuffer(Types.IntegerType.get(), 2)); + + Metrics metrics = new Metrics(2L, Maps.newHashMap(), Maps.newHashMap(), + Maps.newHashMap(), lowerBounds, upperBounds); + + DataFile file = DataFiles.builder(table.spec()) + .withPath("/path/to/file.parquet") + .withFileSizeInBytes(0) + .withMetrics(metrics) + .build(); + + table.newAppend().appendFile(file).commit(); + + table.refresh(); + + TableScan emptyScan = table.newScan().caseSensitive(false).filter(Expressions.equal("ID", 5)); + assertEquals(0, Iterables.size(emptyScan.planFiles())); + + TableScan nonEmptyScan = table.newScan().caseSensitive(false).filter(Expressions.equal("ID", 1)); + assertEquals(1, Iterables.size(nonEmptyScan.planFiles())); + } } diff --git a/data/src/main/java/com/netflix/iceberg/data/IcebergGenerics.java b/data/src/main/java/com/netflix/iceberg/data/IcebergGenerics.java index 587ed00ea57c..941da26f4c79 100644 --- a/data/src/main/java/com/netflix/iceberg/data/IcebergGenerics.java +++ b/data/src/main/java/com/netflix/iceberg/data/IcebergGenerics.java @@ -44,6 +44,7 @@ public static class ScanBuilder { private Expression where = Expressions.alwaysTrue(); private List columns = ImmutableList.of("*"); private boolean reuseContainers = false; + private boolean caseSensitive = true; public ScanBuilder(Table table) { this.table = table; @@ -59,13 +60,25 @@ public ScanBuilder where(Expression rowFilter) { return this; } + public ScanBuilder caseInsensitive() { + this.caseSensitive = false; + return this; + } + public ScanBuilder select(String... columns) { this.columns = ImmutableList.copyOf(columns); return this; } public Iterable build() { - return new TableScanIterable(table.newScan().filter(where).select(columns), reuseContainers); + return new TableScanIterable( + table + .newScan() + .filter(where) + .caseSensitive(caseSensitive) + .select(columns), + reuseContainers + ); } } } diff --git a/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java b/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java index 4f59556ebb64..b48b057dfc98 100644 --- a/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java @@ -58,6 +58,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable tasks; TableScanIterable(TableScan scan, boolean reuseContainers) { @@ -66,6 +67,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable iterator() { - ScanIterator iter = new ScanIterator(tasks); + ScanIterator iter = new ScanIterator(tasks, caseSensitive); addCloseable(iter); return iter; } @@ -121,11 +123,13 @@ public void close() throws IOException { private class ScanIterator implements Iterator, Closeable { private final Iterator tasks; + private final boolean caseSensitive; private Closeable currentCloseable = null; private Iterator currentIterator = emptyIterator(); - private ScanIterator(Iterable tasks) { + private ScanIterator(Iterable tasks, boolean caseSensitive) { this.tasks = Lists.newArrayList(concat(transform(tasks, CombinedScanTask::files))).iterator(); + this.caseSensitive = caseSensitive; } @Override @@ -148,7 +152,7 @@ public boolean hasNext() { this.currentCloseable = reader; if (task.residual() != null && task.residual() != Expressions.alwaysTrue()) { - Evaluator filter = new Evaluator(projection.asStruct(), task.residual()); + Evaluator filter = new Evaluator(projection.asStruct(), task.residual(), caseSensitive); this.currentIterator = filter(reader, filter::eval).iterator(); } else { this.currentIterator = reader.iterator(); diff --git a/spark/src/main/java/com/netflix/iceberg/spark/SparkSchemaUtil.java b/spark/src/main/java/com/netflix/iceberg/spark/SparkSchemaUtil.java index bfe9390b7aee..6b193af562db 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/SparkSchemaUtil.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/SparkSchemaUtil.java @@ -224,8 +224,10 @@ public static Schema prune(Schema schema, StructType requestedType, List filterRefs = Binder.boundReferences(schema.asStruct(), Collections.singletonList(filter), true); + public static Schema prune(Schema schema, StructType requestedType, Expression filter, boolean caseSensitive) { + Set filterRefs = + Binder.boundReferences(schema.asStruct(), Collections.singletonList(filter), caseSensitive); + return new Schema(visit(schema, new PruneColumnsWithoutReordering(requestedType, filterRefs)) .asNestedType() .asStructType() diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 1991d29cbbe5..bacd173081e8 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -20,6 +20,7 @@ package com.netflix.iceberg.spark.source; import com.google.common.base.Preconditions; +import com.netflix.iceberg.ConfigProperties; import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; @@ -59,7 +60,9 @@ public String shortName() { public DataSourceReader createReader(DataSourceOptions options) { Configuration conf = new Configuration(lazyBaseConf()); Table table = getTableAndResolveHadoopConfiguration(options, conf); - return new Reader(table); + String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive", "true"); + + return new Reader(table, Boolean.valueOf(caseSensitive)); } @Override diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java index 85628deee67f..3f7cb436200a 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java @@ -98,6 +98,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD private final Table table; private final FileIO fileIo; private final EncryptionManager encryptionManager; + private final boolean caseSensitive; private StructType requestedSchema = null; private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; @@ -107,11 +108,12 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD private StructType type = null; // cached because Spark accesses it multiple times private List tasks = null; // lazy cache of tasks - Reader(Table table) { + Reader(Table table, boolean caseSensitive) { this.table = table; this.schema = table.schema(); this.fileIo = table.io(); this.encryptionManager = table.encryption(); + this.caseSensitive = caseSensitive; } private Schema lazySchema() { @@ -144,7 +146,8 @@ public List> planInputPartitions() { List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { - readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager)); + readTasks.add( + new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager, caseSensitive)); } return readTasks; @@ -208,7 +211,10 @@ public Statistics estimateStatistics() { private List tasks() { if (tasks == null) { - TableScan scan = table.newScan().project(lazySchema()); + TableScan scan = table + .newScan() + .caseSensitive(caseSensitive) + .project(lazySchema()); if (filterExpressions != null) { for (Expression filter : filterExpressions) { @@ -229,8 +235,8 @@ private List tasks() { @Override public String toString() { return String.format( - "IcebergScan(table=%s, type=%s, filters=%s)", - table, lazySchema().asStruct(), filterExpressions); + "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)", + table, lazySchema().asStruct(), filterExpressions, caseSensitive); } private static class ReadTask implements InputPartition, Serializable { @@ -239,23 +245,26 @@ private static class ReadTask implements InputPartition, Serializab private final String expectedSchemaString; private final FileIO fileIo; private final EncryptionManager encryptionManager; + private final boolean caseSensitive; private transient Schema tableSchema = null; private transient Schema expectedSchema = null; private ReadTask( CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo, - EncryptionManager encryptionManager) { + EncryptionManager encryptionManager, boolean caseSensitive) { this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; this.fileIo = fileIo; this.encryptionManager = encryptionManager; + this.caseSensitive = caseSensitive; } @Override public InputPartitionReader createPartitionReader() { - return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo, encryptionManager); + return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo, + encryptionManager, caseSensitive); } private Schema lazyTableSchema() { @@ -284,13 +293,14 @@ private static class TaskDataReader implements InputPartitionReader private final Schema expectedSchema; private final FileIO fileIo; private final Map inputFiles; + private final boolean caseSensitive; private Iterator currentIterator = null; private Closeable currentCloseable = null; private InternalRow current = null; public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo, - EncryptionManager encryptionManager) { + EncryptionManager encryptionManager, boolean caseSensitive) { this.fileIo = fileIo; this.tasks = task.files().iterator(); this.tableSchema = tableSchema; @@ -305,6 +315,7 @@ public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expected this.inputFiles = inputFileBuilder.build(); // open last because the schemas and fileIo must be set this.currentIterator = open(tasks.next()); + this.caseSensitive = caseSensitive; } @Override @@ -349,7 +360,7 @@ private Iterator open(FileScanTask task) { Set idColumns = spec.identitySourceIds(); // schema needed for the projection and filtering - Schema requiredSchema = prune(tableSchema, convert(finalSchema), task.residual()); + Schema requiredSchema = prune(tableSchema, convert(finalSchema), task.residual(), caseSensitive); boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java index 2340cd68247c..6c2eafce71e4 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java @@ -218,6 +218,37 @@ public void testUnpartitionedIDFilters() { } } + @Test + public void testUnpartitionedCaseInsensitiveIDFilters() { + DataSourceOptions options = new DataSourceOptions(ImmutableMap.of( + "path", unpartitioned.toString()) + ); + + // set spark.sql.caseSensitive to false + String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive"); + TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false"); + + try { + IcebergSource source = new IcebergSource(); + + for (int i = 0; i < 10; i += 1) { + DataSourceReader reader = source.createReader(options); + + pushFilters(reader, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match + + List> tasks = reader.planInputPartitions(); + Assert.assertEquals("Should only create one task for a small file", 1, tasks.size()); + + // validate row filtering + assertEqualsSafe(SCHEMA.asStruct(), expected(i), + read(unpartitioned.toString(), "id = " + i)); + } + } finally { + // return global conf to previous state + TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest); + } + } + @Test public void testUnpartitionedTimestampFilter() { DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(