Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,6 @@ private TableProperties() {

public static final String MERGE_DISTRIBUTION_MODE = "write.merge.distribution-mode";

public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;
public static final String UPSERT_ENABLED = "write.upsert.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is a breaking change, should we allow both for at least one release?

Especially given that this could be set in a number of places such as:

  • table properties
  • at the job level
  • anything at the job level could be set in the clusters config file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any released version since the PR got merged: #2863. I think it's OK to rename it to enabled.

I think it's my mistake because I did not notice that all the iceberg switch are named as xxx.enabled before. Thanks for the fix before we have a release, so that we don't have to maintain this inconsistent name for at least one release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think we can just rename it as it hasn't been released.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's great. If it hasn't been released then no worries. Retroactive +1.

public static final boolean UPSERT_ENABLED_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE;
import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
Expand Down Expand Up @@ -222,11 +222,11 @@ public Builder writeParallelism(int newWriteParallelism) {
* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
* new row that located in partition-B.
*
* @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enable) {
this.upsert = enable;
public Builder upsert(boolean enabled) {
this.upsert = enabled;
return this;
}

Expand Down Expand Up @@ -364,7 +364,7 @@ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData>

// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a corresponding change for the job level check? The one that the variable upsert comes from?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean it's also good to name those variables to enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed, @openinx.


// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE;
import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
Expand Down Expand Up @@ -222,11 +222,11 @@ public Builder writeParallelism(int newWriteParallelism) {
* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
* new row that located in partition-B.
*
* @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enable) {
this.upsert = enable;
public Builder upsert(boolean enabled) {
this.upsert = enabled;
return this;
}

Expand Down Expand Up @@ -364,7 +364,7 @@ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData>

// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);

// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
Expand Down