Skip to content

Commit 3bc727c

Browse files
committed
Fix the broken TestFlinkUpsert.
1 parent f485a28 commit 3bc727c

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public RowDataTaskWriterFactory(
9090
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
9191
this.deleteSchema = null;
9292
this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table)
93+
.dataSchema(schema)
9394
.dataFlinkType(flinkSchema)
9495
.build();
9596
} else if (upsert) {
@@ -98,13 +99,15 @@ public RowDataTaskWriterFactory(
9899
// that are correct for the deleted row. Therefore, only write the equality delete fields.
99100
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
100101
this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table)
102+
.dataSchema(schema)
101103
.dataFlinkType(flinkSchema)
102104
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
103105
.equalityDeleteRowSchema(deleteSchema)
104106
.build();
105107
} else {
106108
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
107109
this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table)
110+
.dataSchema(schema)
108111
.dataFlinkType(flinkSchema)
109112
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
110113
.equalityDeleteRowSchema(schema)

0 commit comments

Comments
 (0)