Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,16 @@ public void write(T row) throws IOException {
*
* @param key has the same columns with the equality fields.
*/
private void internalPosDelete(StructLike key) {
private boolean internalPosDelete(StructLike key) {
PathOffset previous = insertedRowMap.remove(key);

if (previous != null) {
// TODO attach the previous row if has a positional-delete row schema in appender factory.
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
return true;
}

return false;
}

/**
Expand All @@ -151,9 +154,9 @@ private void internalPosDelete(StructLike key) {
* @param row the given row to delete.
*/
public void delete(T row) throws IOException {
internalPosDelete(structProjection.wrap(asStructLike(row)));

eqDeleteWriter.write(row);
if (!internalPosDelete(structProjection.wrap(asStructLike(row)))) {
eqDeleteWriter.write(row);
}
}

/**
Expand All @@ -163,9 +166,9 @@ public void delete(T row) throws IOException {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
internalPosDelete(asStructLike(key));

eqDeleteWriter.write(key);
if (!internalPosDelete(asStructLike(key))) {
eqDeleteWriter.write(key);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,20 +209,16 @@ public void testUpsertSameRow() throws IOException {

WriteResult result = deltaWriter.complete();
Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
Assert.assertEquals("Should have a pos-delete file and an eq-delete file", 2, result.deleteFiles().length);
Assert.assertEquals("Should have a pos-delete file.", 1, result.deleteFiles().length);
commitTransaction(result);
Assert.assertEquals("Should have an expected record", expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));

// Check records in the data file.
DataFile dataFile = result.dataFiles()[0];
Assert.assertEquals(ImmutableList.of(record, record), readRecordsAsList(table.schema(), dataFile.path()));

// Check records in the eq-delete file.
DeleteFile eqDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(ImmutableList.of(record), readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));

// Check records in the pos-delete file.
DeleteFile posDeleteFile = result.deleteFiles()[1];
DeleteFile posDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(ImmutableList.of(
posRecord.copy("file_path", dataFile.path(), "pos", 0L)
), readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), posDeleteFile.path()));
Expand Down Expand Up @@ -305,7 +301,6 @@ public void testUpsertData() throws IOException {
DeleteFile eqDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
Assert.assertEquals(ImmutableList.of(
keyFunc.apply("aaa"),
keyFunc.apply("aaa"),
keyFunc.apply("ccc"),
keyFunc.apply("bbb")
Expand Down Expand Up @@ -389,7 +384,6 @@ public void testUpsertDataWithFullRowSchema() throws IOException {
Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
Assert.assertEquals(ImmutableList.of(
createRecord(3, "aaa"),
createRecord(5, "aaa"),
createRecord(4, "ccc"),
createRecord(2, "bbb")
), readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void testCdcEvents(boolean partitioned) throws IOException {

WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);

Assert.assertEquals("Should have expected records.", expectedRowSet(
Expand Down Expand Up @@ -301,13 +301,13 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));

writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
writer.write(createDelete(2, "aaa")); // 1 pos-delete.

WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
Assert.assertEquals(2, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content(), result.deleteFiles()[1].content()));
Assert.assertEquals(1, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content()));
commitTransaction(result);

Assert.assertEquals("Should have expected records", expectedRowSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void testCdcEvents(boolean partitioned) throws IOException {

WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);

Assert.assertEquals("Should have expected records.", expectedRowSet(
Expand Down Expand Up @@ -302,13 +302,13 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));

writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
writer.write(createDelete(2, "aaa")); // 1 pos-delete.

WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
Assert.assertEquals(2, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content(), result.deleteFiles()[1].content()));
Assert.assertEquals(1, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary whitespace change, and this is not the correct indentation.

commitTransaction(result);

Assert.assertEquals("Should have expected records", expectedRowSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void testCdcEvents(boolean partitioned) throws IOException {

WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);

Assert.assertEquals("Should have expected records.", expectedRowSet(
Expand Down Expand Up @@ -302,13 +302,13 @@ public void testPartitionedTableWithDataAndIdAsKey() throws IOException {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));

writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
writer.write(createDelete(2, "aaa")); // 1 pos-delete.

WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
Assert.assertEquals(2, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content(), result.deleteFiles()[1].content()));
Assert.assertEquals(1, result.deleteFiles().length);
Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
Sets.newHashSet(result.deleteFiles()[0].content()));
commitTransaction(result);

Assert.assertEquals("Should have expected records", expectedRowSet(
Expand Down