diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 8c78f9b17028..28da572c5538 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -248,6 +248,6 @@ private TableProperties() { public static final String MERGE_DISTRIBUTION_MODE = "write.merge.distribution-mode"; - public static final String UPSERT_MODE_ENABLE = "write.upsert.enable"; - public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false; + public static final String UPSERT_ENABLED = "write.upsert.enabled"; + public static final boolean UPSERT_ENABLED_DEFAULT = false; } diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 055c6b56fe62..867f97d1b2be 100644 --- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -59,8 +59,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE; -import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT; +import static org.apache.iceberg.TableProperties.UPSERT_ENABLED; +import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; @@ -222,11 +222,11 @@ public Builder writeParallelism(int newWriteParallelism) { * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the * new row that located in partition-B. * - * @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. + * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. * @return {@link Builder} to connect the iceberg table. */ - public Builder upsert(boolean enable) { - this.upsert = enable; + public Builder upsert(boolean enabled) { + this.upsert = enabled; return this; } @@ -364,7 +364,7 @@ private SingleOutputStreamOperator appendWriter(DataStream // Fallback to use upsert mode parsed from table properties if don't specify in job level. boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(), - UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT); + UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT); // Validate the equality fields and partition fields if we enable the upsert mode. if (upsertMode) { diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 055c6b56fe62..867f97d1b2be 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -59,8 +59,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE; -import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT; +import static org.apache.iceberg.TableProperties.UPSERT_ENABLED; +import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; @@ -222,11 +222,11 @@ public Builder writeParallelism(int newWriteParallelism) { * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the * new row that located in partition-B. * - * @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. + * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. * @return {@link Builder} to connect the iceberg table. */ - public Builder upsert(boolean enable) { - this.upsert = enable; + public Builder upsert(boolean enabled) { + this.upsert = enabled; return this; } @@ -364,7 +364,7 @@ private SingleOutputStreamOperator appendWriter(DataStream // Fallback to use upsert mode parsed from table properties if don't specify in job level. boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(), - UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT); + UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT); // Validate the equality fields and partition fields if we enable the upsert mode. if (upsertMode) {