Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
FileFormat format,
Expand All @@ -50,11 +51,13 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds) {
List<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.upsert = upsert;
}

abstract RowDataDeltaWriter route(RowData row);
Expand All @@ -70,6 +73,9 @@ public void write(RowData row) throws IOException {
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should only happen for the INSERT case because UPDATE_AFTER implies that there was an UPDATE_BEFORE that will perform the delete. This would delete the same row twice in that case, causing more equality deletes to be written for the row.

writer.delete(row);
}
writer.write(row);
break;

Expand Down
47 changes: 43 additions & 4 deletions flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -115,6 +116,7 @@ public static class Builder {
private TableSchema tableSchema;
private boolean overwrite = false;
private Integer writeParallelism = null;
private boolean upsert = false;
private List<String> equalityFieldColumns = null;

private Builder() {
Expand Down Expand Up @@ -172,6 +174,20 @@ public Builder writeParallelism(int newWriteParallelism) {
return this;
}

/**
* All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
* DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
* new row that located in partition-B.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anything validate this constraint?

*
* @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enable) {
this.upsert = enable;
return this;
}

/**
* Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
*
Expand Down Expand Up @@ -209,7 +225,22 @@ public DataStreamSink<RowData> build() {
}
}

IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
// Convert the iceberg schema to flink's RowType.
RowType flinkSchema = convertToRowType(table, tableSchema);

// Convert the INSERT stream to be an UPSERT stream if needed.
if (upsert) {
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField : table.spec().fields()) {
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
"Partition field '%s' is not included in equality fields: '%s'", partitionField, equalityFieldColumns);
}
}
}

IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkSchema, equalityFieldIds, upsert);
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);

this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
Expand All @@ -227,8 +258,7 @@ public DataStreamSink<RowData> build() {
}
}

static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
List<Integer> equalityFieldIds) {
private static RowType convertToRowType(Table table, TableSchema requestedSchema) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");

RowType flinkSchema;
Expand All @@ -246,13 +276,22 @@ static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema
flinkSchema = FlinkSchemaUtil.convert(table.schema());
}

return flinkSchema;
}

static IcebergStreamWriter<RowData> createStreamWriter(Table table,
RowType flinkSchema,
List<Integer> equalityFieldIds,
boolean upsert) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");

Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);

TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema,
table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props,
equalityFieldIds);
equalityFieldIds, upsert);

return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
List<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
upsert);
this.partitionKey = new PartitionKey(spec, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final long targetFileSizeBytes;
private final FileFormat format;
private final List<Integer> equalityFieldIds;
private final boolean upsert;
private final FileAppenderFactory<RowData> appenderFactory;

private transient OutputFileFactory outputFileFactory;
Expand All @@ -62,7 +63,8 @@ public RowDataTaskWriterFactory(Schema schema,
long targetFileSizeBytes,
FileFormat format,
Map<String, String> tableProperties,
List<Integer> equalityFieldIds) {
List<Integer> equalityFieldIds,
boolean upsert) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.spec = spec;
Expand All @@ -72,6 +74,7 @@ public RowDataTaskWriterFactory(Schema schema,
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, tableProperties, spec);
Expand Down Expand Up @@ -104,10 +107,10 @@ public TaskWriter<RowData> create() {
// Initialize a task writer to write both INSERT and equality DELETE.
if (spec.isUnpartitioned()) {
return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
} else {
return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
List<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
upsert);
this.writer = new RowDataDeltaWriter(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption
Long.MAX_VALUE,
format,
table.properties(),
null);
null,
false);
}

public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,6 @@ private StructLikeSet actualRowSet(String... columns) throws IOException {
private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalityFieldIds) {
return new RowDataTaskWriterFactory(table.schema(), FlinkSchemaUtil.convert(table.schema()),
table.spec(), table.locationProvider(), table.io(), table.encryption(), 128 * 1024 * 1024,
format, table.properties(), equalityFieldIds);
format, table.properties(), equalityFieldIds, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -132,6 +133,7 @@ private List<Snapshot> findValidSnapshots(Table table) {

private void testChangeLogs(List<String> equalityFieldColumns,
KeySelector<Row, Object> keySelector,
boolean insertAsUpsert,
List<List<Row>> elementsPerCheckpoint,
List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
Expand All @@ -145,6 +147,7 @@ private void testChangeLogs(List<String> equalityFieldColumns,
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.writeParallelism(parallelism)
.equalityFieldColumns(equalityFieldColumns)
.upsert(insertAsUpsert)
.build();

// Execute the program.
Expand Down Expand Up @@ -207,7 +210,8 @@ public void testChangeLogOnIdKey() throws Exception {
ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
);

testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), elementsPerCheckpoint, expectedRecords);
testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
elementsPerCheckpoint, expectedRecords);
}

@Test
Expand Down Expand Up @@ -238,7 +242,8 @@ public void testChangeLogOnDataKey() throws Exception {
ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))
);

testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), elementsPerCheckpoint, expectedRecords);
testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), false,
elementsPerCheckpoint, expectedRecords);
}

@Test
Expand Down Expand Up @@ -269,7 +274,7 @@ public void testChangeLogOnIdDataKey() throws Exception {
);

testChangeLogs(ImmutableList.of("data", "id"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
elementsPerCheckpoint, expectedRecords);
false, elementsPerCheckpoint, expectedRecords);
}

@Test
Expand Down Expand Up @@ -307,9 +312,103 @@ public void testChangeLogOnSameKey() throws Exception {
);

testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
false, elementsPerCheckpoint, expectedRecords);
}

@Test
public void testUpsertOnIdKey() throws Exception {
List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("+U", 1, "bbb")
),
ImmutableList.of(
row("+I", 1, "ccc")
),
ImmutableList.of(
row("+U", 1, "ddd"),
row("+I", 1, "eee")
)
);

List<List<Record>> expectedRecords = ImmutableList.of(
ImmutableList.of(record(1, "bbb")),
ImmutableList.of(record(1, "ccc")),
ImmutableList.of(record(1, "eee"))
);

if (!partitioned) {
testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true,
elementsPerCheckpoint, expectedRecords);
} else {
AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
IllegalStateException.class, "not included in equality fields",
() -> {
testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint,
expectedRecords);
return null;
});
}
}

@Test
public void testUpsertOnDataKey() throws Exception {
List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("+I", 2, "aaa"),
row("+I", 3, "bbb")
),
ImmutableList.of(
row("+U", 4, "aaa"),
row("-U", 3, "bbb"),
row("+U", 5, "bbb")
),
ImmutableList.of(
row("+I", 6, "aaa"),
row("+U", 7, "bbb")
)
);

List<List<Record>> expectedRecords = ImmutableList.of(
ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
ImmutableList.of(record(6, "aaa"), record(7, "bbb"))
);

testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), true,
elementsPerCheckpoint, expectedRecords);
}

@Test
public void testUpsertOnIdDataKey() throws Exception {
List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("+U", 1, "aaa"),
row("+I", 2, "bbb")
),
ImmutableList.of(
row("+I", 1, "aaa"),
row("-D", 2, "bbb"),
row("+I", 2, "ccc")
),
ImmutableList.of(
row("-U", 1, "aaa"),
row("+U", 1, "bbb")
)
);

List<List<Record>> expectedRecords = ImmutableList.of(
ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
ImmutableList.of(record(1, "bbb"), record(2, "ccc"))
);

testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
true, elementsPerCheckpoint, expectedRecords);
}

private StructLikeSet expectedRowSet(Record... records) {
return SimpleDataUtil.expectedRowSet(table, records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -337,7 +338,8 @@ private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStr

private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema) throws Exception {
IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema, null);
RowType rowType = (RowType) flinkSchema.toRowDataType().getLogicalType();
IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, rowType, null, false);
OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new OneInputStreamOperatorTestHarness<>(
streamWriter, 1, 1, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private TaskWriter<RowData> createTaskWriter(long targetFileSize) {
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(),
(RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), table.spec(),
table.locationProvider(), table.io(), table.encryption(),
targetFileSize, format, table.properties(), null);
targetFileSize, format, table.properties(), null, false);
taskWriterFactory.initialize(1, 1);
return taskWriterFactory.create();
}
Expand Down