Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,23 @@ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData>
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);

// Validate the equality fields and partition fields if we enable the upsert mode.
// `upsert` mode should not be used in Flink 1.12 due to correctness issues.
// As part of a patch release, apache-iceberg-flink-runtime_1.12:0.13.2,
// it has been decided the best course of action would be to log a warning
// asking people to upgrade, as Flink 1.12 has been deprecated in upstream Apache Flink
// for some time as well as will be removed in the next major Iceberg release, Iceberg 0.14.0.
//
// But we allow the configuration given that it's a patch release and the change would be otherwise
// too breaking for a patch release.
//
// See in https://github.com/apache/iceberg/pull/4364 for more information.
if (upsertMode) {
LOG.error(
"This table sink is running in upsert mode. Upsert mode should not be used with Flink 1.12 because " +
"it will write incorrect delete file metadata, which could prevent deletes from being correctly" +
"applied. Upgrading to Flink 1.13+ is recommended. " +
"To safely use Flink 1.12, set manifest metrics to counts only.");

Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Preconditions.checkState(!equalityFieldIds.isEmpty(),
Expand Down