Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface RewriteFiles extends SnapshotUpdate<RewriteFiles> {
* @return this for method chaining
*/
RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd);

/**
* Add a rewrite that replaces one set of deletes with another that contains the same deleted rows.
*
* @param deletesToDelete files that will be replaced, cannot be null or empty.
* @param deletesToAdd files that will be added, cannot be null or empty.
* @return this for method chaining
*/
RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we start the replacing equality deletes with position deletes, I think we need to refactor the RewriteFiles API to adjust more cases:

  1. Rewrite data files and remove all the delete rows. The files to delete will be a set of data files and a set of delete files, and the files to add will be a set of data files.
  2. Replace equality deletes with position deletes, the files to delete will be a set of equality delete files (we will need to ensure that all delete files are equality delete files ? ) , the files to add will be a set of position delete files.
  3. Merging small delete files into a bigger delete files. The files to delete will be a set of equality/position delete files, the files to add will be a set of equality/position delete files.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I think we could parallelize the API refactoring and the implementation.

}
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file

return this;
}

@Override
public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
"Files to delete cannot be null or empty");
Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is incorrect, because if all the equality deletes are not hit the data files, then there will be no position delete to produce..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will suggest to add an unit test for this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your concern. The check is used to discard the invalid rewrite, we don't want to continue the rewrite if there is no position delete produced. Don't we?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of rewrite is valid actually because it replace all the useless equality files to empty position delete files. After the rewrite action, the normal read path don't have to filter the useless equality deletes again, that will be a great performance improvement. So we have to submit the RewriteFiles transaction here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me! I will update then.

"Files to add can not be null or empty");

for (DeleteFile toDelete : deletesToDelete) {
delete(toDelete);
}

for (DeleteFile toAdd : deletesToAdd) {
add(toAdd);
}

return this;
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;

Expand Down Expand Up @@ -104,4 +105,13 @@ public EncryptedOutputFile newOutputFile(PartitionKey key) {
OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
return encryptionManager.encrypt(rawOutputFile);
}

/**
* Generates EncryptedOutputFile for PartitionedWriter.
*/
public EncryptedOutputFile newOutputFile(StructLike partition) {
String newDataLocation = locations.newDataLocation(spec, partition, generateFilename());
OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
return encryptionManager.encrypt(rawOutputFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Set;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -37,7 +37,7 @@
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;

class SortedPosDeleteWriter<T> implements Closeable {
public class SortedPosDeleteWriter<T> implements Closeable {
private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L;

private final Map<CharSequenceWrapper, List<PosRow<T>>> posDeletes = Maps.newHashMap();
Expand All @@ -48,15 +48,15 @@ class SortedPosDeleteWriter<T> implements Closeable {
private final FileAppenderFactory<T> appenderFactory;
private final OutputFileFactory fileFactory;
private final FileFormat format;
private final PartitionKey partition;
private final StructLike partition;
private final long recordsNumThreshold;

private int records = 0;

SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory,
FileFormat format,
PartitionKey partition,
StructLike partition,
long recordsNumThreshold) {
this.appenderFactory = appenderFactory;
this.fileFactory = fileFactory;
Expand All @@ -65,10 +65,10 @@ class SortedPosDeleteWriter<T> implements Closeable {
this.recordsNumThreshold = recordsNumThreshold;
}

SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory,
FileFormat format,
PartitionKey partition) {
public SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory,
FileFormat format,
StructLike partition) {
this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD);
}

Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import java.util.List;
import java.util.function.Predicate;

public class ChainOrFilter<T> extends Filter<T> {
private final List<Predicate<T>> filters;

public ChainOrFilter(List<Predicate<T>> filters) {
this.filters = filters;
}

@Override
protected boolean shouldKeep(T item) {
for (Predicate<T> filter : filters) {
if (filter.test(item)) {
return true;
}
}

return false;
}
}
40 changes: 39 additions & 1 deletion data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -48,6 +49,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ChainOrFilter;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.Preconditions;
Expand Down Expand Up @@ -110,7 +113,42 @@ public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}

private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this method is mostly the same as applyEqDeletes except for the predicate evaluation, do we want to abstract the common logic out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (eqDeletes.isEmpty()) {
return records;
}

Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
for (DeleteFile delete : eqDeletes) {
filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
}

List<Predicate<T>> deleteSetFilters = Lists.newArrayList();
for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
Set<Integer> ids = entry.getKey();
Iterable<DeleteFile> deletes = entry.getValue();

Schema deleteSchema = TypeUtil.select(requiredSchema, ids);

// a projection to select and reorder fields of the file schema to match the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);

Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
delete -> openDeletes(delete, deleteSchema));
StructLikeSet deleteSet = Deletes.toEqualitySet(
// copy the delete records because they will be held in a set
CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
deleteSchema.asStruct());

Predicate<T> predicate = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
deleteSetFilters.add(predicate);
}

Filter<T> findDeleteRows = new ChainOrFilter<>(deleteSetFilters);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an extra class for this? This seems to be achievable via something like

return CloseableIterable.filter(records, record -> 
    deleteSetFilters.stream().anyMatch(filter -> filter.test(record)));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've removed the ChainOrFilter in the committed PR #2320. The reviewed patch should have fixed your concern.

return findDeleteRows.filter(records);
}

protected CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
if (eqDeletes.isEmpty()) {
return records;
}
Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/java/org/apache/iceberg/actions/Actions.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public ExpireSnapshotsAction expireSnapshots() {
return new ExpireSnapshotsAction(spark, table);
}

public ReplaceDeleteAction replaceEqDeleteToPosDelete() {
return new ReplaceDeleteAction(spark, table);
}

/**
* Converts the provided table into an Iceberg table in place. The table will no longer be accessible by it's
* previous implementation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.DeleteFile;

public class DeleteRewriteActionResult {
private List<DeleteFile> posDeletes;
private List<DeleteFile> eqDeletes;

public DeleteRewriteActionResult(List<DeleteFile> eqDeletes, List<DeleteFile> posDeletes) {
this.eqDeletes = eqDeletes;
this.posDeletes = posDeletes;
}

public List<DeleteFile> deletedFiles() {
return eqDeletes;
}

public List<DeleteFile> addedFiles() {
return posDeletes;
}
}
Loading