Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,7 @@ private TableProperties() {

public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;

public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;
Comment on lines +226 to +227
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions, one that's somewhat unrelated:

  1. Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
  2. (Somewhat unrelated / thinking out loud) If we have this new write.upsert.enabled flag, could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used in streaming mode now? Or does this work with Flink batch sink as well?

Yes, it's only used for streaming mode right now. The batch upsert semantic has been implemented correctly by the MERGE INTO clause.

could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?

In theory, it's possible to add the CDC support for spark sturctured streaming, though the spark structured streaming does not support CDC event natively (I mean flink support INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER events natively while Spark streaming doesn't unless we add extra field to indicate what's the operation type it is). I think @XuQianJin-Stars @chenjunjiedada 's team are working on this issue in their own repo.

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
FileFormat format,
Expand All @@ -50,11 +51,13 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds) {
List<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.upsert = upsert;
}

abstract RowDataDeltaWriter route(RowData row);
Expand All @@ -70,11 +73,19 @@ public void write(RowData row) throws IOException {
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could only delete row on INSERT. I don't think there will be only have UPDATE_AFTER row and lost UPDATE_BEFORE situation. @openinx please check this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an update operation in flink, the UPDATE_AFTER event will must be emitted to the downstream, while the UPDATE_BEFORE is a best effort behavior or an configured behavior from the upstream flink source. You can take a look at this GroupAggFunction, if the flink source is configured to produce UPDATE_AFTER only, then it won't emit any UPDATE_BEFORE to the downstream.

For the downstream iceberg sink, we need to handle all the UPDATE_AFTER as UPSERT. That also means we need to do nothing for the UPDATE_BEFORE because we will remove the previous key in the next UPDATE_AFTER events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can only delete row on UPDATE_AFTER and keep UPDATE_BEFORE do nothing to prevent delete one row twice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By name, I thought INSERT is "add a new row". Then we don't need to add a delete for it. But I guess it actually means "append a row (new or updated)".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @openinx mentioned in #1996 (comment), we need to transform INSERT/UPDATE_AFTER to be UPSERT(delete + insert). If we don't add a delete on INSERT row when upsert mode is enable, we will get duplicate rows for same primary key.

writer.write(row);
break;

case DELETE:
case UPDATE_BEFORE:
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
Comment on lines +83 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking question:

Are there possible concerns with events coming out of order for some reason? I guess since the table commits are serializable, this isn't a concern as the same row for these equality fields shouldn't be updated twice in the same commit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question, @kbendick ! Let's describe the out-of-order in two dimension:

  1. Is possible to produce disordered events in a single iceberg transaction ? First of all, if we want to maintain the correct data semantics between the source table and iceberg sink table, the records consumed from source table must be the correct order. Second, the streaming job will need to shuffle based on the equality fields so that the records with same key are dispatched to the specialized parallelism task, otherwise the out-of-order issue happen if different tasks write the records with same equality fields to the iceberg table. In this way, the order in a single transaction is guaranteed.

  2. The out-of-order issue between two continues transaction. In our flink stream integration, we have guaranteed the exact commit order even if a failover happen. For the spark streaming, I think we will need more consideration to this issue.

Hopefully, I've answered your question, @kbendick :-)

writer.delete(row);
break;
case DELETE:
writer.delete(row);
break;

Expand Down
46 changes: 43 additions & 3 deletions flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
Expand All @@ -58,6 +59,8 @@

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE;
import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
Expand Down Expand Up @@ -125,6 +128,7 @@ public static class Builder {
private boolean overwrite = false;
private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;

Expand Down Expand Up @@ -212,6 +216,20 @@ public Builder writeParallelism(int newWriteParallelism) {
return this;
}

/**
* All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
* DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
* new row that located in partition-B.
*
* @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enable) {
this.upsert = enable;
return this;
}

/**
* Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
*
Expand Down Expand Up @@ -321,7 +339,27 @@ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData>
equalityFieldIds.add(field.fieldId());
}
}
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);

// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);

// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Comment on lines +348 to +350
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that verifies the builder doesn't allow overwrite and upsert / upsertMode?

Maybe I missed it, but seems like an important thing to have a test for in case of future code refractors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed !

Copy link
Contributor Author

@Reo-LEI Reo-LEI Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a unittest to cover this, thanks for your reminder!:) @kbendick

Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
if (!table.spec().isUnpartitioned()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my own learning, does partition field must be included in equality fields?

e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the

As @openinx comment as above, we shoule restrict the partition fields is a subset of equality fields to ensure we can delete the old data in same partition.

e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?

I think that is not a valid scenario, to keep user_id unique in all different hour parition is make no sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a table with user_id and hour, the business primary key is user_id, which mean the table should have at most one row for each given user_id. Now let's take about the partition strategy.

If we just partition the table by hour field, that means two different hour partitions may have the same user_id, because people may insert the user_id in hour=01 and hour=02. If we wanna to keep the primary key semantics, then we will need to delete the old user_id in the hour=01 first, then insert the new user_id in the hour=02. But when an INSERT come, we don't know which partition has the specific user_id, then we have to broadcast the DELETE to all the partitions, which is quite inefficient.

for (PartitionField partitionField : table.spec().fields()) {
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
partitionField, equalityFieldColumns);
}
}
}

IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);

int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
Expand Down Expand Up @@ -390,15 +428,17 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {

static IcebergStreamWriter<RowData> createStreamWriter(Table table,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
List<Integer> equalityFieldIds,
boolean upsert) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);

Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
serializableTable, flinkRowType, targetFileSize,
fileFormat, equalityFieldIds);
fileFormat, equalityFieldIds, upsert);

return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
List<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
upsert);
this.partitionKey = new PartitionKey(spec, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final long targetFileSizeBytes;
private final FileFormat format;
private final List<Integer> equalityFieldIds;
private final boolean upsert;
private final FileAppenderFactory<RowData> appenderFactory;

private transient OutputFileFactory outputFileFactory;
Expand All @@ -54,7 +55,8 @@ public RowDataTaskWriterFactory(Table table,
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
List<Integer> equalityFieldIds) {
List<Integer> equalityFieldIds,
boolean upsert) {
this.table = table;
this.schema = table.schema();
this.flinkSchema = flinkSchema;
Expand All @@ -63,6 +65,7 @@ public RowDataTaskWriterFactory(Table table,
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
Expand Down Expand Up @@ -95,10 +98,10 @@ public TaskWriter<RowData> create() {
// Initialize a task writer to write both INSERT and equality DELETE.
if (spec.isUnpartitioned()) {
return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
} else {
return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
List<Integer> equalityFieldIds) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
List<Integer> equalityFieldIds,
boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
upsert);
this.writer = new RowDataDeltaWriter(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption
flinkSchema,
Long.MAX_VALUE,
format,
null);
null,
false);
}

public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,6 @@ private StructLikeSet actualRowSet(String... columns) throws IOException {
private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalityFieldIds) {
return new RowDataTaskWriterFactory(
SerializableTable.copyOf(table), FlinkSchemaUtil.convert(table.schema()),
128 * 1024 * 1024, format, equalityFieldIds);
128 * 1024 * 1024, format, equalityFieldIds, false);
}
}
Loading