Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ private static class StructLikeComparator implements Comparator<StructLike> {

private StructLikeComparator(Types.StructType struct) {
this.comparators = struct.fields().stream()
.map(field -> internal(field.type()))
.map(field -> field.isOptional() ?
Comparators.nullsFirst().thenComparing(internal(field.type())) :
internal(field.type())
)
.toArray((IntFunction<Comparator<Object>[]>) Comparator[]::new);
this.classes = struct.fields().stream()
.map(field -> internalClass(field.type()))
Expand All @@ -129,7 +132,10 @@ private static class ListComparator<T> implements Comparator<List<T>> {
private final Comparator<T> elementComparator;

private ListComparator(Types.ListType list) {
this.elementComparator = internal(list.elementType());
Comparator<T> elemComparator = internal(list.elementType());
this.elementComparator = list.isElementOptional() ?
Comparators.<T>nullsFirst().thenComparing(elemComparator) :
elemComparator;
}

@Override
Expand Down
20 changes: 20 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.expressions.BoundPredicate;
Expand Down Expand Up @@ -125,6 +126,25 @@ public <T> T get(int pos, Class<T> javaClass) {
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("Setting values is not supported");
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}

Row that = (Row) other;

return Arrays.equals(values, that.values);
}

@Override
public int hashCode() {
return Arrays.hashCode(values);
}
}

public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
Expand Down
101 changes: 93 additions & 8 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.Schema;
Expand All @@ -31,12 +32,15 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.FilterIterator;
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;

public class Deletes {
private static final Schema POSITION_DELETE_SCHEMA = new Schema(
Expand All @@ -50,13 +54,63 @@ public class Deletes {
private Deletes() {
}

public static <T> CloseableIterable<T> positionFilter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
return new PositionDeleteFilter<>(rows, rowToPosition, posDeletes);
public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Function<T, StructLike> rowToDeleteKey,
StructLikeSet deleteSet) {
if (deleteSet.isEmpty()) {
return rows;
}

EqualitySetDeleteFilter<T> equalityFilter = new EqualitySetDeleteFilter<>(rowToDeleteKey, deleteSet);
return equalityFilter.filter(rows);
}

public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
Set<Long> deleteSet) {
if (deleteSet.isEmpty()) {
return rows;
}

PositionSetDeleteFilter<T> filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet);
return filter.filter(rows);
}

public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDeletes, Types.StructType eqType) {
try (CloseableIterable<StructLike> deletes = eqDeletes) {
StructLikeSet deleteSet = StructLikeSet.create(eqType);
Iterables.addAll(deleteSet, deletes);
return deleteSet;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close equality delete source", e);
}
}

public static CloseableIterable<Long> deletePositions(String dataLocation, CloseableIterable<StructLike> posDeletes) {
return deletePositions(dataLocation, ImmutableList.of(posDeletes));
public static Set<Long> toPositionSet(String dataLocation, CloseableIterable<StructLike> deleteFile) {
return toPositionSet(dataLocation, ImmutableList.of(deleteFile));
}

public static Set<Long> toPositionSet(String dataLocation, List<CloseableIterable<StructLike>> deleteFiles) {
DataFileFilter locationFilter = new DataFileFilter(dataLocation);
List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes ->
CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row)));
return toPositionSet(CloseableIterable.concat(positions));
}

public static Set<Long> toPositionSet(CloseableIterable<Long> posDeletes) {
try (CloseableIterable<Long> deletes = posDeletes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a static method toLongSet(We can optimize it to primitive long set in future) in CloseableIterable(Or some other places)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the set methods as you suggested, which should make this a bit cleaner. Thanks!

return Sets.newHashSet(deletes);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close position delete source", e);
}
}

public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
}

public static CloseableIterable<Long> deletePositions(String dataLocation, CloseableIterable<StructLike> deleteFile) {
return deletePositions(dataLocation, ImmutableList.of(deleteFile));
}

public static CloseableIterable<Long> deletePositions(String dataLocation,
Expand All @@ -68,13 +122,44 @@ public static CloseableIterable<Long> deletePositions(String dataLocation,
return new SortedMerge<>(Long::compare, positions);
}

private static class PositionDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
private static class EqualitySetDeleteFilter<T> extends Filter<T> {
private final StructLikeSet deletes;
private final Function<T, StructLike> extractEqStruct;

protected EqualitySetDeleteFilter(Function<T, StructLike> extractEq,
StructLikeSet deletes) {
this.extractEqStruct = extractEq;
this.deletes = deletes;
}

@Override
protected boolean shouldKeep(T row) {
return !deletes.contains(extractEqStruct.apply(row));
}
}

private static class PositionSetDeleteFilter<T> extends Filter<T> {
private final Function<T, Long> rowToPosition;
private final Set<Long> deleteSet;

private PositionSetDeleteFilter(Function<T, Long> rowToPosition, Set<Long> deleteSet) {
this.rowToPosition = rowToPosition;
this.deleteSet = deleteSet;
}

@Override
protected boolean shouldKeep(T row) {
return !deleteSet.contains(rowToPosition.apply(row));
}
}

private static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
private final Function<T, Long> extractPos;
private final CloseableIterable<Long> deletePositions;

private PositionDeleteFilter(CloseableIterable<T> rows, Function<T, Long> extractPos,
CloseableIterable<Long> deletePositions) {
private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T, Long> extractPos,
CloseableIterable<Long> deletePositions) {
this.rows = rows;
this.extractPos = extractPos;
this.deletePositions = deletePositions;
Expand Down
144 changes: 144 additions & 0 deletions core/src/test/java/org/apache/iceberg/deletes/TestEqualityFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.deletes;

import java.util.List;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.junit.Assert;
import org.junit.Test;

public class TestEqualityFilter {
private static final Schema ROW_SCHEMA = new Schema(
NestedField.required(1, "id", Types.LongType.get()),
NestedField.required(2, "name", Types.StringType.get()),
NestedField.optional(3, "description", Types.StringType.get()));

private static final CloseableIterable<StructLike> ROWS = CloseableIterable.withNoopClose(Lists.newArrayList(
Row.of(0L, "a", "panda"),
Row.of(1L, "b", "koala"),
Row.of(2L, "c", new Utf8("kodiak")),
Row.of(4L, new Utf8("d"), "gummy"),
Row.of(5L, "e", "brown"),
Row.of(6L, "f", new Utf8("teddy")),
Row.of(7L, "g", "grizzly"),
Row.of(8L, "h", null)
));

@Test
public void testEqualitySetFilterLongColumn() {
CloseableIterable<StructLike> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(
Row.of(4L),
Row.of(3L),
Row.of(6L)
));

List<StructLike> expected = Lists.newArrayList(
Row.of(0L, "a", "panda"),
Row.of(1L, "b", "koala"),
Row.of(2L, "c", new Utf8("kodiak")),
Row.of(5L, "e", "brown"),
Row.of(7L, "g", "grizzly"),
Row.of(8L, "h", null)
);

Assert.assertEquals("Filter should produce expected rows",
expected,
Lists.newArrayList(Deletes.filter(ROWS,
row -> Row.of(row.get(0, Long.class)),
Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("id").asStruct()))));
}

@Test
public void testEqualitySetFilterStringColumn() {
CloseableIterable<StructLike> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(
Row.of("a"),
Row.of("d"),
Row.of("h")
));

List<StructLike> expected = Lists.newArrayList(
Row.of(1L, "b", "koala"),
Row.of(2L, "c", new Utf8("kodiak")),
Row.of(5L, "e", "brown"),
Row.of(6L, "f", new Utf8("teddy")),
Row.of(7L, "g", "grizzly")
);

Assert.assertEquals("Filter should produce expected rows",
expected,
Lists.newArrayList(Deletes.filter(ROWS,
row -> Row.of(row.get(1, CharSequence.class)),
Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("name").asStruct()))));
}

@Test
public void testEqualitySetFilterStringColumnWithNull() {
CloseableIterable<StructLike> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(
Row.of(new Object[] { null })
));

List<StructLike> expected = Lists.newArrayList(
Row.of(0L, "a", "panda"),
Row.of(1L, "b", "koala"),
Row.of(2L, "c", new Utf8("kodiak")),
Row.of(4L, new Utf8("d"), "gummy"),
Row.of(5L, "e", "brown"),
Row.of(6L, "f", new Utf8("teddy")),
Row.of(7L, "g", "grizzly")
);

Assert.assertEquals("Filter should produce expected rows",
expected,
Lists.newArrayList(Deletes.filter(ROWS,
row -> Row.of(row.get(2, CharSequence.class)),
Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("description").asStruct()))));
}

@Test
public void testEqualitySetFilterMultipleColumns() {
CloseableIterable<StructLike> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(
Row.of(2L, "kodiak"),
Row.of(3L, "care"),
Row.of(8L, null)
));

List<StructLike> expected = Lists.newArrayList(
Row.of(0L, "a", "panda"),
Row.of(1L, "b", "koala"),
Row.of(4L, new Utf8("d"), "gummy"),
Row.of(5L, "e", "brown"),
Row.of(6L, "f", new Utf8("teddy")),
Row.of(7L, "g", "grizzly")
);

Assert.assertEquals("Filter should produce expected rows",
expected,
Lists.newArrayList(Deletes.filter(ROWS,
row -> Row.of(row.get(0, Long.class), row.get(2, CharSequence.class)),
Deletes.toEqualitySet(deletes, ROW_SCHEMA.select("id", "description").asStruct()))));
}
}
Loading