diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index 4e4e2769a60b..e768d366693c 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -104,7 +104,10 @@ private static class StructLikeComparator implements Comparator { private StructLikeComparator(Types.StructType struct) { this.comparators = struct.fields().stream() - .map(field -> internal(field.type())) + .map(field -> field.isOptional() ? + Comparators.nullsFirst().thenComparing(internal(field.type())) : + internal(field.type()) + ) .toArray((IntFunction[]>) Comparator[]::new); this.classes = struct.fields().stream() .map(field -> internalClass(field.type())) @@ -129,7 +132,10 @@ private static class ListComparator implements Comparator> { private final Comparator elementComparator; private ListComparator(Types.ListType list) { - this.elementComparator = internal(list.elementType()); + Comparator elemComparator = internal(list.elementType()); + this.elementComparator = list.isElementOptional() ? + Comparators.nullsFirst().thenComparing(elemComparator) : + elemComparator; } @Override diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index b4881b3e256a..062a2d5d0257 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -25,6 +25,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.iceberg.expressions.BoundPredicate; @@ -125,6 +126,25 @@ public T get(int pos, Class javaClass) { public void set(int pos, T value) { throw new UnsupportedOperationException("Setting values is not supported"); } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + Row that = (Row) other; + + return Arrays.equals(values, that.values); + } + + @Override + public int hashCode() { + return Arrays.hashCode(values); + } } public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary { diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 2b85cd08578d..2a181984b67b 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -23,6 +23,7 @@ import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.function.Function; import org.apache.iceberg.Accessor; import org.apache.iceberg.Schema; @@ -31,12 +32,15 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.FilterIterator; import org.apache.iceberg.util.SortedMerge; +import org.apache.iceberg.util.StructLikeSet; public class Deletes { private static final Schema POSITION_DELETE_SCHEMA = new Schema( @@ -50,13 +54,63 @@ public class Deletes { private Deletes() { } - public static CloseableIterable positionFilter(CloseableIterable rows, Function rowToPosition, - CloseableIterable posDeletes) { - return new PositionDeleteFilter<>(rows, rowToPosition, posDeletes); + public static CloseableIterable filter(CloseableIterable rows, Function rowToDeleteKey, + StructLikeSet deleteSet) { + if (deleteSet.isEmpty()) { + return rows; + } + + EqualitySetDeleteFilter equalityFilter = new EqualitySetDeleteFilter<>(rowToDeleteKey, deleteSet); + return equalityFilter.filter(rows); + } + + public static CloseableIterable filter(CloseableIterable rows, Function rowToPosition, + Set deleteSet) { + if (deleteSet.isEmpty()) { + return rows; + } + + PositionSetDeleteFilter filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet); + return filter.filter(rows); + } + + public static StructLikeSet toEqualitySet(CloseableIterable eqDeletes, Types.StructType eqType) { + try (CloseableIterable deletes = eqDeletes) { + StructLikeSet deleteSet = StructLikeSet.create(eqType); + Iterables.addAll(deleteSet, deletes); + return deleteSet; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close equality delete source", e); + } } - public static CloseableIterable deletePositions(String dataLocation, CloseableIterable posDeletes) { - return deletePositions(dataLocation, ImmutableList.of(posDeletes)); + public static Set toPositionSet(String dataLocation, CloseableIterable deleteFile) { + return toPositionSet(dataLocation, ImmutableList.of(deleteFile)); + } + + public static Set toPositionSet(String dataLocation, List> deleteFiles) { + DataFileFilter locationFilter = new DataFileFilter(dataLocation); + List> positions = Lists.transform(deleteFiles, deletes -> + CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); + return toPositionSet(CloseableIterable.concat(positions)); + } + + public static Set toPositionSet(CloseableIterable posDeletes) { + try (CloseableIterable deletes = posDeletes) { + return Sets.newHashSet(deletes); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close position delete source", e); + } + } + + public static CloseableIterable streamingFilter(CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes) { + return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); + } + + public static CloseableIterable deletePositions(String dataLocation, CloseableIterable deleteFile) { + return deletePositions(dataLocation, ImmutableList.of(deleteFile)); } public static CloseableIterable deletePositions(String dataLocation, @@ -68,13 +122,44 @@ public static CloseableIterable deletePositions(String dataLocation, return new SortedMerge<>(Long::compare, positions); } - private static class PositionDeleteFilter extends CloseableGroup implements CloseableIterable { + private static class EqualitySetDeleteFilter extends Filter { + private final StructLikeSet deletes; + private final Function extractEqStruct; + + protected EqualitySetDeleteFilter(Function extractEq, + StructLikeSet deletes) { + this.extractEqStruct = extractEq; + this.deletes = deletes; + } + + @Override + protected boolean shouldKeep(T row) { + return !deletes.contains(extractEqStruct.apply(row)); + } + } + + private static class PositionSetDeleteFilter extends Filter { + private final Function rowToPosition; + private final Set deleteSet; + + private PositionSetDeleteFilter(Function rowToPosition, Set deleteSet) { + this.rowToPosition = rowToPosition; + this.deleteSet = deleteSet; + } + + @Override + protected boolean shouldKeep(T row) { + return !deleteSet.contains(rowToPosition.apply(row)); + } + } + + private static class PositionStreamDeleteFilter extends CloseableGroup implements CloseableIterable { private final CloseableIterable rows; private final Function extractPos; private final CloseableIterable deletePositions; - private PositionDeleteFilter(CloseableIterable rows, Function extractPos, - CloseableIterable deletePositions) { + private PositionStreamDeleteFilter(CloseableIterable rows, Function extractPos, + CloseableIterable deletePositions) { this.rows = rows; this.extractPos = extractPos; this.deletePositions = deletePositions; diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestEqualityFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestEqualityFilter.java new file mode 100644 index 000000000000..7e66d6369391 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/deletes/TestEqualityFilter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.deletes; + +import java.util.List; +import org.apache.avro.util.Utf8; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TestHelpers.Row; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.junit.Assert; +import org.junit.Test; + +public class TestEqualityFilter { + private static final Schema ROW_SCHEMA = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.required(2, "name", Types.StringType.get()), + NestedField.optional(3, "description", Types.StringType.get())); + + private static final CloseableIterable ROWS = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(0L, "a", "panda"), + Row.of(1L, "b", "koala"), + Row.of(2L, "c", new Utf8("kodiak")), + Row.of(4L, new Utf8("d"), "gummy"), + Row.of(5L, "e", "brown"), + Row.of(6L, "f", new Utf8("teddy")), + Row.of(7L, "g", "grizzly"), + Row.of(8L, "h", null) + )); + + @Test + public void testEqualitySetFilterLongColumn() { + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(4L), + Row.of(3L), + Row.of(6L) + )); + + List expected = Lists.newArrayList( + Row.of(0L, "a", "panda"), + Row.of(1L, "b", "koala"), + Row.of(2L, "c", new Utf8("kodiak")), + Row.of(5L, "e", "brown"), + Row.of(7L, "g", "grizzly"), + Row.of(8L, "h", null) + ); + + Assert.assertEquals("Filter should produce expected rows", + expected, + Lists.newArrayList(Deletes.filter(ROWS, + row -> Row.of(row.get(0, Long.class)), + Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("id").asStruct())))); + } + + @Test + public void testEqualitySetFilterStringColumn() { + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of("a"), + Row.of("d"), + Row.of("h") + )); + + List expected = Lists.newArrayList( + Row.of(1L, "b", "koala"), + Row.of(2L, "c", new Utf8("kodiak")), + Row.of(5L, "e", "brown"), + Row.of(6L, "f", new Utf8("teddy")), + Row.of(7L, "g", "grizzly") + ); + + Assert.assertEquals("Filter should produce expected rows", + expected, + Lists.newArrayList(Deletes.filter(ROWS, + row -> Row.of(row.get(1, CharSequence.class)), + Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("name").asStruct())))); + } + + @Test + public void testEqualitySetFilterStringColumnWithNull() { + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(new Object[] { null }) + )); + + List expected = Lists.newArrayList( + Row.of(0L, "a", "panda"), + Row.of(1L, "b", "koala"), + Row.of(2L, "c", new Utf8("kodiak")), + Row.of(4L, new Utf8("d"), "gummy"), + Row.of(5L, "e", "brown"), + Row.of(6L, "f", new Utf8("teddy")), + Row.of(7L, "g", "grizzly") + ); + + Assert.assertEquals("Filter should produce expected rows", + expected, + Lists.newArrayList(Deletes.filter(ROWS, + row -> Row.of(row.get(2, CharSequence.class)), + Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("description").asStruct())))); + } + + @Test + public void testEqualitySetFilterMultipleColumns() { + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(2L, "kodiak"), + Row.of(3L, "care"), + Row.of(8L, null) + )); + + List expected = Lists.newArrayList( + Row.of(0L, "a", "panda"), + Row.of(1L, "b", "koala"), + Row.of(4L, new Utf8("d"), "gummy"), + Row.of(5L, "e", "brown"), + Row.of(6L, "f", new Utf8("teddy")), + Row.of(7L, "g", "grizzly") + ); + + Assert.assertEquals("Filter should produce expected rows", + expected, + Lists.newArrayList(Deletes.filter(ROWS, + row -> Row.of(row.get(0, Long.class), row.get(2, CharSequence.class)), + Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("id", "description").asStruct())))); + } +} diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index 87472baa3383..e6e5703df7cf 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -95,7 +95,7 @@ public void testPositionMerging() { } @Test - public void testPositionRowFilter() { + public void testPositionStreamRowFilter() { CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( Row.of(0L, "a"), Row.of(1L, "b"), @@ -111,14 +111,14 @@ public void testPositionRowFilter() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.positionFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); } @Test - public void testPositionRowFilterWithDuplicates() { + public void testPositionStreamRowFilterWithDuplicates() { CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( Row.of(0L, "a"), Row.of(1L, "b"), @@ -135,14 +135,14 @@ public void testPositionRowFilterWithDuplicates() { CloseableIterable deletes = CloseableIterable.withNoopClose( Lists.newArrayList(0L, 0L, 0L, 3L, 4L, 7L, 7L, 9L, 9L, 9L)); - CloseableIterable actual = Deletes.positionFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); } @Test - public void testPositionRowFilterWithRowGaps() { + public void testPositionStreamRowFilterWithRowGaps() { // test the case where row position is greater than the delete position CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( Row.of(2L, "c"), @@ -153,14 +153,14 @@ public void testPositionRowFilterWithRowGaps() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.positionFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(2L, 5L, 6L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); } @Test - public void testCombinedPositionRowFilter() { + public void testCombinedPositionStreamRowFilter() { CloseableIterable positionDeletes1 = CloseableIterable.withNoopClose(Lists.newArrayList( Row.of("file_a.avro", 0L), Row.of("file_a.avro", 3L), @@ -189,7 +189,7 @@ public void testCombinedPositionRowFilter() { Row.of(9L, "j") )); - CloseableIterable actual = Deletes.positionFilter( + CloseableIterable actual = Deletes.streamingFilter( rows, row -> row.get(0, Long.class), Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); @@ -198,4 +198,68 @@ public void testCombinedPositionRowFilter() { Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); } + + @Test + public void testPositionSetRowFilter() { + CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(0L, "a"), + Row.of(1L, "b"), + Row.of(2L, "c"), + Row.of(3L, "d"), + Row.of(4L, "e"), + Row.of(5L, "f"), + Row.of(6L, "g"), + Row.of(7L, "h"), + Row.of(8L, "i"), + Row.of(9L, "j") + )); + + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); + + CloseableIterable actual = Deletes.filter( + rows, row -> row.get(0, Long.class), + Deletes.toPositionSet(deletes)); + Assert.assertEquals("Filter should produce expected rows", + Lists.newArrayList(1L, 2L, 5L, 6L, 8L), + Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); + } + + @Test + public void testCombinedPositionSetRowFilter() { + CloseableIterable positionDeletes1 = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of("file_a.avro", 0L), + Row.of("file_a.avro", 3L), + Row.of("file_a.avro", 9L), + Row.of("file_b.avro", 5L), + Row.of("file_b.avro", 6L) + )); + + CloseableIterable positionDeletes2 = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of("file_a.avro", 3L), + Row.of("file_a.avro", 4L), + Row.of("file_a.avro", 7L), + Row.of("file_b.avro", 2L) + )); + + CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(0L, "a"), + Row.of(1L, "b"), + Row.of(2L, "c"), + Row.of(3L, "d"), + Row.of(4L, "e"), + Row.of(5L, "f"), + Row.of(6L, "g"), + Row.of(7L, "h"), + Row.of(8L, "i"), + Row.of(9L, "j") + )); + + CloseableIterable actual = Deletes.filter( + rows, row -> row.get(0, Long.class), + Deletes.toPositionSet("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); + + Assert.assertEquals("Filter should produce expected rows", + Lists.newArrayList(1L, 2L, 5L, 6L, 8L), + Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); + } }