diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 24e23941f6d7..bae585f247e3 100644 --- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -375,8 +375,23 @@ private SingleOutputStreamOperator appendWriter(DataStream boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(), UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT); - // Validate the equality fields and partition fields if we enable the upsert mode. + // `upsert` mode should not be used in Flink 1.12 due to correctness issues. + // As part of a patch release, apache-iceberg-flink-runtime_1.12:0.13.2, + // it has been decided the best course of action would be to log a warning + // asking people to upgrade, as Flink 1.12 has been deprecated in upstream Apache Flink + // for some time as well as will be removed in the next major Iceberg release, Iceberg 0.14.0. + // + // But we allow the configuration given that it's a patch release and the change would be otherwise + // too breaking for a patch release. + // + // See in https://github.com/apache/iceberg/pull/4364 for more information. if (upsertMode) { + LOG.error( + "This table sink is running in upsert mode. Upsert mode should not be used with Flink 1.12 because " + + "it will write incorrect delete file metadata, which could prevent deletes from being correctly" + + "applied. Upgrading to Flink 1.13+ is recommended. " + + "To safely use Flink 1.12, set manifest metrics to counts only."); + Preconditions.checkState(!overwrite, "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); Preconditions.checkState(!equalityFieldIds.isEmpty(),