Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de
*/
protected abstract StructLike asStructLike(T data);

/**
* Wrap the passed in key of a row as a {@link StructLike}
*/
protected abstract StructLike asStructLikeKey(T key);

public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

Expand All @@ -136,13 +141,16 @@ public void write(T row) throws IOException {
*
* @param key has the same columns with the equality fields.
*/
private void internalPosDelete(StructLike key) {
private boolean internalPosDelete(StructLike key) {
PathOffset previous = insertedRowMap.remove(key);

if (previous != null) {
// TODO attach the previous row if has a positional-delete row schema in appender factory.
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
return true;
}

return false;
}

/**
Expand All @@ -152,9 +160,9 @@ private void internalPosDelete(StructLike key) {
* @param row the given row to delete.
*/
public void delete(T row) throws IOException {
internalPosDelete(structProjection.wrap(asStructLike(row)));

eqDeleteWriter.write(row);
if (!internalPosDelete(structProjection.wrap(asStructLike(row)))) {
eqDeleteWriter.write(row);
}
}

/**
Expand All @@ -164,9 +172,9 @@ public void delete(T row) throws IOException {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
internalPosDelete(asStructLike(key));

eqDeleteWriter.write(key);
if (!internalPosDelete(asStructLikeKey(key))) {
eqDeleteWriter.write(key);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,20 +209,16 @@ public void testUpsertSameRow() throws IOException {

WriteResult result = deltaWriter.complete();
Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
Assert.assertEquals("Should have a pos-delete file and an eq-delete file", 2, result.deleteFiles().length);
Assert.assertEquals("Should have a pos-delete file.", 1, result.deleteFiles().length);
commitTransaction(result);
Assert.assertEquals("Should have an expected record", expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));

// Check records in the data file.
DataFile dataFile = result.dataFiles()[0];
Assert.assertEquals(ImmutableList.of(record, record), readRecordsAsList(table.schema(), dataFile.path()));

// Check records in the eq-delete file.
DeleteFile eqDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(ImmutableList.of(record), readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));

// Check records in the pos-delete file.
DeleteFile posDeleteFile = result.deleteFiles()[1];
DeleteFile posDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(ImmutableList.of(
posRecord.copy("file_path", dataFile.path(), "pos", 0L)
), readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), posDeleteFile.path()));
Expand Down Expand Up @@ -305,7 +301,6 @@ public void testUpsertData() throws IOException {
DeleteFile eqDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
Assert.assertEquals(ImmutableList.of(
keyFunc.apply("aaa"),
keyFunc.apply("aaa"),
keyFunc.apply("ccc"),
keyFunc.apply("bbb")
Expand Down Expand Up @@ -389,7 +384,6 @@ public void testUpsertDataWithFullRowSchema() throws IOException {
Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
Assert.assertEquals(ImmutableList.of(
createRecord(3, "aaa"),
createRecord(5, "aaa"),
createRecord(4, "ccc"),
createRecord(2, "bbb")
), readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));
Expand Down Expand Up @@ -467,6 +461,7 @@ public void delete(Record row) throws IOException {
deltaWriter.delete(row);
}

// The caller of this function is responsible for passing in a record with only the key fields
public void deleteKey(Record key) throws IOException {
deltaWriter.deleteKey(key);
}
Expand All @@ -485,6 +480,11 @@ private GenericEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema
protected StructLike asStructLike(Record row) {
return row;
}

@Override
protected StructLike asStructLikeKey(Record data) {
return data;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return wrapper.wrap(data);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick should this throw an exception like in

throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review");
or does it even matter what we return here for Flink 1.12?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it this way upon initial inspection.

It needs to be implemented for the API and it will possibly get called. Keeping it the same as asStructLike seems safest for backwards compatibility.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private void testCdcEvents(boolean partitioned) throws IOException {

WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);

Assert.assertEquals("Should have expected records.", expectedRowSet(
Expand Down Expand Up @@ -305,13 +305,13 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));

writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
writer.write(createDelete(2, "aaa")); // 1 pos-delete.

WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
Assert.assertEquals(2, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content(), result.deleteFiles()[1].content()));
Assert.assertEquals(1, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content()));
commitTransaction(result);

Assert.assertEquals("Should have expected records", expectedRowSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper keyWrapper;
private final RowDataProjection keyProjection;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -58,6 +62,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.upsert = upsert;
this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.keyProjection = RowDataProjection.create(schema, deleteSchema);
}

abstract RowDataDeltaWriter route(RowData row);
Expand All @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
Expand Down Expand Up @@ -103,5 +109,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table,

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
// may differ from the deleted row other than the primary key fields, and the delete file must contain values
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
Expand Down
Loading