Skip to content
Closed
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ subprojects {
}
})

maxHeapSize = "1500m"

testLogging {
events "failed"
exceptionFormat "full"
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de
*/
protected abstract StructLike asStructLike(T data);

/**
* Wrap the passed in key of a row as a {@link StructLike}
*/
protected abstract StructLike asStructLikeKey(T key);

public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

Expand Down Expand Up @@ -167,7 +172,7 @@ public void delete(T row) throws IOException {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
if (!internalPosDelete(asStructLike(key))) {
if (!internalPosDelete(asStructLikeKey(key))) {
eqDeleteWriter.write(key);
}
}
Expand Down
20 changes: 9 additions & 11 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import java.util.Set;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -64,7 +62,7 @@ public abstract class DeleteFilter<T> {
MetadataColumns.DELETE_FILE_POS);

private final long setFilterThreshold;
private final DataFile dataFile;
private final String filePath;
private final List<DeleteFile> posDeletes;
private final List<DeleteFile> eqDeletes;
private final Schema requiredSchema;
Expand All @@ -73,13 +71,13 @@ public abstract class DeleteFilter<T> {
private PositionDeleteIndex deleteRowPositions = null;
private Predicate<T> eqDeleteRows = null;

protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSchema, Schema requestedSchema) {
this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
this.dataFile = task.file();
this.filePath = filePath;

ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
for (DeleteFile delete : task.deletes()) {
for (DeleteFile delete : deletes) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(delete);
Expand Down Expand Up @@ -214,7 +212,7 @@ public PositionDeleteIndex deletedRowPositions() {

if (deleteRowPositions == null) {
List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
deleteRowPositions = Deletes.toPositionIndex(dataFile.path(), deletes);
deleteRowPositions = Deletes.toPositionIndex(filePath, deletes);
}
return deleteRowPositions;
}
Expand All @@ -228,10 +226,10 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
return Deletes.filter(records, this::pos, Deletes.toPositionIndex(dataFile.path(), deletes));
return Deletes.filter(records, this::pos, Deletes.toPositionIndex(filePath, deletes));
}

return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes));
}

private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
Expand All @@ -255,7 +253,7 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));

if (deleteFile.content() == FileContent.POSITION_DELETES) {
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

return builder.build();
Expand All @@ -267,7 +265,7 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema));

if (deleteFile.content() == FileContent.POSITION_DELETES) {
orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

return orcBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class GenericDeleteFilter extends DeleteFilter<Record> {
private final InternalRecordWrapper asStructLike;

public GenericDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema) {
super(task, tableSchema, requestedSchema);
super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
this.io = io;
this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public void delete(Record row) throws IOException {
deltaWriter.delete(row);
}

// The caller of this function is responsible for passing in a record with only the key fields
public void deleteKey(Record key) throws IOException {
deltaWriter.deleteKey(key);
}
Expand All @@ -479,6 +480,11 @@ private GenericEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema
protected StructLike asStructLike(Record row) {
return row;
}

@Override
protected StructLike asStructLikeKey(Record data) {
return data;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper keyWrapper;
private final RowDataProjection keyProjection;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.keyProjection = RowDataProjection.create(schema, deleteSchema);
this.upsert = upsert;
}

Expand All @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
Expand Down Expand Up @@ -103,5 +109,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table,

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
// may differ from the deleted row other than the primary key fields, and the delete file must contain values
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private static class FlinkDeleteFilter extends DeleteFilter<RowData> {

FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema,
InputFilesDecryptor inputFilesDecryptor) {
super(task, tableSchema, requestedSchema);
super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema);
this.requiredRowType = FlinkSchemaUtil.convert(requiredSchema());
this.asStructLike = new RowDataWrapper(requiredRowType, requiredSchema().asStruct());
this.inputFilesDecryptor = inputFilesDecryptor;
Expand Down
Loading