Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
Expand Down Expand Up @@ -1204,6 +1205,41 @@ public void testDeleteWithMultipleSpecs() {
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
}

@Test
public void testOverrideModeInSQLConf() throws NoSuchTableException {
createAndInitPartitionedTable();
Table table = validationCatalog.loadTable(tableIdent);
RowLevelOperationMode tableMode = mode(table);
if (tableMode == COPY_ON_WRITE) {
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
tableName, TableProperties.FORMAT_VERSION, "2");
}

append(
tableName,
new Employee(1, "hr"),
new Employee(1, "hardware"),
new Employee(2, "hardware"),
new Employee(3, "hr"));
createBranchIfNeeded();

RowLevelOperationMode newMode = (tableMode == COPY_ON_WRITE) ? MERGE_ON_READ : COPY_ON_WRITE;
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WRITE_DELETE_MODE, newMode.modeName()),
() -> {
sql("DELETE FROM %s WHERE id = 1", commitTarget());
});

table.refresh();
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
if (newMode == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "2", "2", "2");
} else {
validateMergeOnRead(currentSnapshot, "2", "2", null);
}
}

@Test
public void testDeleteToWapBranch() throws NoSuchTableException {
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ private SparkSQLProperties() {}
// Controls write distribution mode
public static final String DISTRIBUTION_MODE = "spark.sql.iceberg.distribution-mode";

// Controls write delete/update/merge mode (copy-on-write or merge-on-read)
public static final String WRITE_DELETE_MODE = "spark.sql.iceberg.write.delete.mode";
public static final String WRITE_UPDATE_MODE = "spark.sql.iceberg.write.update.mode";
public static final String WRITE_MERGE_MODE = "spark.sql.iceberg.write.merge.mode";

// Controls the WAP ID used for write-audit-publish workflow.
// When set, new snapshots will be staged with this ID in snapshot summary.
public static final String WAP_ID = "spark.wap.id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,33 @@ private boolean ignoreTableDistributionAndOrdering() {
.parse();
}

public String writeDeleteMode() {
return confParser
.stringConf()
.sessionConf(SparkSQLProperties.WRITE_DELETE_MODE)
.tableProperty(TableProperties.DELETE_MODE)
.defaultValue(TableProperties.DELETE_MODE_DEFAULT)
.parse();
}

public String writeUpdateMode() {
return confParser
.stringConf()
.sessionConf(SparkSQLProperties.WRITE_UPDATE_MODE)
.tableProperty(TableProperties.UPDATE_MODE)
.defaultValue(TableProperties.UPDATE_MODE_DEFAULT)
.parse();
}

public String writeMergeMode() {
return confParser
.stringConf()
.sessionConf(SparkSQLProperties.WRITE_MERGE_MODE)
.tableProperty(TableProperties.MERGE_MODE)
.defaultValue(TableProperties.MERGE_MODE_DEFAULT)
.parse();
}

public Long validateFromSnapshotId() {
return confParser
.longConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@

import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;

import java.util.Map;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.write.RowLevelOperation;
import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
Expand All @@ -56,7 +52,7 @@ class SparkRowLevelOperationBuilder implements RowLevelOperationBuilder {
this.table = table;
this.branch = branch;
this.info = info;
this.mode = mode(table.properties(), info.command());
this.mode = mode(new SparkWriteConf(spark, table, branch, ImmutableMap.of()), info.command());
this.isolationLevel = isolationLevel(table.properties(), info.command());
}

Expand All @@ -72,18 +68,18 @@ public RowLevelOperation build() {
}
}

private RowLevelOperationMode mode(Map<String, String> properties, Command command) {
private RowLevelOperationMode mode(SparkWriteConf conf, Command command) {
String modeName;

switch (command) {
case DELETE:
modeName = properties.getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT);
modeName = conf.writeDeleteMode();
break;
case UPDATE:
modeName = properties.getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT);
modeName = conf.writeUpdateMode();
break;
case MERGE:
modeName = properties.getOrDefault(MERGE_MODE, MERGE_MODE_DEFAULT);
modeName = conf.writeMergeMode();
break;
default:
throw new IllegalArgumentException("Unsupported command: " + command);
Expand Down