diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index f2982b9b3506..dfafe48b23c2 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -68,6 +68,11 @@ public BooleanConfParser defaultValue(boolean value) { return self(); } + public BooleanConfParser defaultValue(String value) { + this.defaultValue = Boolean.parseBoolean(value); + return self(); + } + public boolean parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Boolean::parseBoolean, defaultValue); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 4e575fc8104c..523ff6e58e83 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -100,6 +100,13 @@ public String overwriteMode() { return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null; } + public boolean wapEnabled() { + return confParser.booleanConf() + .tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED) + .defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT) + .parse(); + } + public String wapId() { return sessionConf.get("spark.wap.id", null); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 77703e9d99aa..19c4392d7f57 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -42,7 +42,6 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -100,6 +99,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final String queryId; private final FileFormat format; private final String applicationId; + private final boolean wapEnabled; private final String wapId; private final long targetFileSize; private final Schema writeSchema; @@ -118,6 +118,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.queryId = writeInfo.queryId(); this.format = writeConf.dataFileFormat(); this.applicationId = applicationId; + this.wapEnabled = writeConf.wapEnabled(); this.wapId = writeConf.wapId(); this.targetFileSize = writeConf.targetDataFileSize(); this.writeSchema = writeSchema; @@ -166,11 +167,6 @@ StreamingWrite asStreamingOverwrite() { return new StreamingOverwrite(); } - private boolean isWapTable() { - return Boolean.parseBoolean(table.properties().getOrDefault( - TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)); - } - // the writer factory works for both batch and streaming private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors @@ -188,7 +184,7 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } - if (isWapTable() && wapId != null) { + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);