Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/deletes/DeleteSetter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.deletes;

public abstract class DeleteSetter<T> {

private T row;

protected T row() {
return this.row;
}

public DeleteSetter<T> wrap(T newRow) {
this.row = newRow;
return this;
}

public abstract boolean isDeleted();

public abstract T setDeleted();
}
85 changes: 61 additions & 24 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.Accessor;
Expand All @@ -32,13 +33,13 @@
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FilterIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DeleteMarker;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;
Expand Down Expand Up @@ -67,14 +68,16 @@ public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Functio
return equalityFilter.filter(rows);
}

public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
Set<Long> deleteSet) {
public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
Set<Long> deleteSet,
DeleteSetter<T> mutationWrapper) {
if (deleteSet.isEmpty()) {
return rows;
}

PositionSetDeleteFilter<T> filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet);
return filter.filter(rows);
PositionSetDeleteMarker<T> iterable = new PositionSetDeleteMarker<>(rowToPosition, deleteSet, mutationWrapper);
return iterable.setDeleted(rows);
}

public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDeletes, Types.StructType eqType) {
Expand Down Expand Up @@ -107,10 +110,11 @@ public static Set<Long> toPositionSet(CloseableIterable<Long> posDeletes) {
}
}

public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
public static <T> CloseableIterable<T> streamingDeleteMarker(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
DeleteSetter<T> deleteSetter) {
return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, deleteSetter);
}

public static CloseableIterable<Long> deletePositions(CharSequence dataLocation,
Expand Down Expand Up @@ -143,31 +147,38 @@ protected boolean shouldKeep(T row) {
}
}

private static class PositionSetDeleteFilter<T> extends Filter<T> {
private static class PositionSetDeleteMarker<T> extends DeleteMarker<T> {
private final Function<T, Long> rowToPosition;
private final Set<Long> deleteSet;

private PositionSetDeleteFilter(Function<T, Long> rowToPosition, Set<Long> deleteSet) {
private PositionSetDeleteMarker(Function<T, Long> rowToPosition,
Set<Long> deleteSet,
DeleteSetter<T> deleteSetter) {
super(deleteSetter);
this.rowToPosition = rowToPosition;
this.deleteSet = deleteSet;
}

@Override
protected boolean shouldKeep(T row) {
return !deleteSet.contains(rowToPosition.apply(row));
protected boolean shouldDelete(T row) {
return deleteSet.contains(rowToPosition.apply(row));
}
}

private static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
private static class PositionStreamDeleteMarker<T> extends CloseableGroup implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
private final Function<T, Long> extractPos;
private final CloseableIterable<Long> deletePositions;
private final DeleteSetter<T> deleteSetter;

private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T, Long> extractPos,
CloseableIterable<Long> deletePositions) {
private PositionStreamDeleteMarker(CloseableIterable<T> rows,
Function<T, Long> extractPos,
CloseableIterable<Long> deletePositions,
DeleteSetter<T> deleteSetter) {
this.rows = rows;
this.extractPos = extractPos;
this.deletePositions = deletePositions;
this.deleteSetter = deleteSetter;
}

@Override
Expand All @@ -176,7 +187,7 @@ public CloseableIterator<T> iterator() {

CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
iter = new PositionFilterIterator(rows.iterator(), deletePosIterator);
iter = new PositionDeleteMarkerIterator(rows.iterator(), deletePosIterator);
} else {
iter = rows.iterator();
try {
Expand All @@ -191,21 +202,42 @@ public CloseableIterator<T> iterator() {
return iter;
}

private class PositionFilterIterator extends FilterIterator<T> {
private class PositionDeleteMarkerIterator implements CloseableIterator<T> {
private final CloseableIterator<T> items;
private final CloseableIterator<Long> deletePosIterator;
private long nextDeletePos;

protected PositionFilterIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
super(items);
protected PositionDeleteMarkerIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
this.items = items;
this.deletePosIterator = deletePositions;
this.nextDeletePos = deletePosIterator.next();
}

@Override
protected boolean shouldKeep(T row) {
public boolean hasNext() {
return items.hasNext();
}

@Override
public T next() {
if (!items.hasNext()) {
throw new NoSuchElementException();
}

T current = items.next();

deleteSetter.wrap(current);
if (!deleteSetter.isDeleted() && shouldDelete(current)) {
return deleteSetter.wrap(current).setDeleted();
} else {
return current;
}
}

private boolean shouldDelete(T row) {
long currentPos = extractPos.apply(row);
if (currentPos < nextDeletePos) {
return true;
return false;
}

// consume delete positions until the next is past the current position
Expand All @@ -218,12 +250,17 @@ protected boolean shouldKeep(T row) {
}
}

return keep;
return !keep;
}

@Override
public void close() {
super.close();
try {
items.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close the iterator", e);
}

try {
deletePosIterator.close();
} catch (IOException e) {
Expand Down
73 changes: 73 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/DeleteMarker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.iceberg.deletes.DeleteSetter;
import org.apache.iceberg.io.CloseableIterable;

public abstract class DeleteMarker<T> {

private final DeleteSetter<T> deleteSetter;

public DeleteMarker(DeleteSetter<T> deleteSetter) {
this.deleteSetter = deleteSetter;
}

protected abstract boolean shouldDelete(T item);

private Iterable<T> setDeleted(Iterable<T> iterable) {
return () -> new InternalIterator(iterable.iterator());
}

public CloseableIterable<T> setDeleted(CloseableIterable<T> iterable) {
return CloseableIterable.combine(setDeleted((Iterable<T>) iterable), iterable);
}

private class InternalIterator implements Iterator<T> {
private final Iterator<T> iterator;

private InternalIterator(Iterator<T> iterator) {
this.iterator = iterator;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

T current = iterator.next();

deleteSetter.wrap(current);
if (!deleteSetter.isDeleted() && shouldDelete(current)) {
return deleteSetter.setDeleted();
} else {
return current;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testPositionStreamRowFilter() {

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
CloseableIterable<StructLike> actual = Deletes.streamingDeleteMarker(rows, row -> row.get(0, Long.class), deletes);
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand All @@ -135,7 +135,7 @@ public void testPositionStreamRowFilterWithDuplicates() {
CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(
Lists.newArrayList(0L, 0L, 0L, 3L, 4L, 7L, 7L, 9L, 9L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
CloseableIterable<StructLike> actual = Deletes.streamingDeleteMarker(rows, row -> row.get(0, Long.class), deletes);
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand All @@ -153,7 +153,7 @@ public void testPositionStreamRowFilterWithRowGaps() {

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
CloseableIterable<StructLike> actual = Deletes.streamingDeleteMarker(rows, row -> row.get(0, Long.class), deletes);
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(2L, 5L, 6L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand Down Expand Up @@ -189,7 +189,7 @@ public void testCombinedPositionStreamRowFilter() {
Row.of(9L, "j")
));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(
CloseableIterable<StructLike> actual = Deletes.streamingDeleteMarker(
rows,
row -> row.get(0, Long.class),
Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)));
Expand Down
Loading