Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ subprojects {
}
})

maxHeapSize = "1500m"

testLogging {
events "failed"
exceptionFormat "full"
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de
*/
protected abstract StructLike asStructLike(T data);

/**
* Wrap the passed in key of a row as a {@link StructLike}
*/
protected abstract StructLike asStructLikeKey(T key);

public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

Expand Down Expand Up @@ -167,7 +172,7 @@ public void delete(T row) throws IOException {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
if (!internalPosDelete(asStructLike(key))) {
if (!internalPosDelete(asStructLikeKey(key))) {
eqDeleteWriter.write(key);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public void delete(Record row) throws IOException {
deltaWriter.delete(row);
}

// The caller of this function is responsible for passing in a record with only the key fields
public void deleteKey(Record key) throws IOException {
deltaWriter.deleteKey(key);
}
Expand All @@ -479,6 +480,11 @@ private GenericEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema
protected StructLike asStructLike(Record row) {
return row;
}

@Override
protected StructLike asStructLikeKey(Record data) {
return data;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper keyWrapper;
private final RowDataProjection keyProjection;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.keyProjection = RowDataProjection.create(schema, deleteSchema);
this.upsert = upsert;
}

Expand All @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
Expand Down Expand Up @@ -103,5 +109,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table,

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
// may differ from the deleted row other than the primary key fields, and the delete file must contain values
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
// TODO provide the ability to customize the equality-delete row schema.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems worth removing, as not having the right equality-delete row schema can cause correctness issues.

this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
Expand Down
Loading