diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index ab7728583ac8..215966c52e47 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -649,6 +649,8 @@ public String predicate(UnboundPredicate pred) { return pred.ref().name() + " != " + sqlString(pred.literal()); case STARTS_WITH: return pred.ref().name() + " LIKE '" + pred.literal() + "%'"; + case NOT_STARTS_WITH: + return pred.ref().name() + " NOT LIKE '" + pred.literal() + "%'"; case IN: return pred.ref().name() + " IN (" + sqlString(pred.literals()) + ")"; case NOT_IN: diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index a74882d42033..d51fd3c4e8eb 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Locale; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -40,6 +41,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.data.GenericsHelpers; import org.apache.iceberg.transforms.Transform; @@ -60,6 +62,7 @@ import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.GreaterThan; import org.apache.spark.sql.sources.LessThan; +import org.apache.spark.sql.sources.Not; import org.apache.spark.sql.sources.StringStartsWith; import org.apache.spark.sql.types.IntegerType$; import org.apache.spark.sql.types.LongType$; @@ -421,6 +424,19 @@ public void testPartitionedByDataStartsWithFilter() { Assert.assertEquals(1, scan.planInputPartitions().length); } + @Test + public void testPartitionedByDataNotStartsWithFilter() { + Table table = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data"); + CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); + + SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + + pushFilters(builder, new Not(new StringStartsWith("data", "junc"))); + Batch scan = builder.build().toBatch(); + + Assert.assertEquals(9, scan.planInputPartitions().length); + } + @Test public void testPartitionedByIdStartsWith() { Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID, "id_ident", "id"); @@ -437,6 +453,22 @@ public void testPartitionedByIdStartsWith() { Assert.assertEquals(1, scan.planInputPartitions().length); } + @Test + public void testPartitionedByIdNotStartsWith() { + Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID, "id_ident", "id"); + + CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of( + "path", table.location()) + ); + + SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + + pushFilters(builder, new Not(new StringStartsWith("data", "junc"))); + Batch scan = builder.build().toBatch(); + + Assert.assertEquals(9, scan.planInputPartitions().length); + } + @Test public void testUnpartitionedStartsWith() { Dataset df = spark.read() @@ -453,6 +485,27 @@ public void testUnpartitionedStartsWith() { Assert.assertEquals("junction", matchedData.get(0)); } + @Test + public void testUnpartitionedNotStartsWith() { + Dataset df = spark.read() + .format("iceberg") + .option(SparkReadOptions.VECTORIZATION_ENABLED, String.valueOf(vectorized)) + .load(unpartitioned.toString()); + + List matchedData = df.select("data") + .where("data NOT LIKE 'jun%'") + .as(Encoders.STRING()) + .collectAsList(); + + List expected = testRecords(SCHEMA).stream() + .map(r -> r.getField("data").toString()) + .filter(d -> !d.startsWith("jun")) + .collect(Collectors.toList()); + + Assert.assertEquals(9, matchedData.size()); + Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(matchedData)); + } + private static Record projectFlat(Schema projection, Record record) { Record result = GenericRecord.create(projection); List fields = projection.asStruct().fields();