diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index 33e5ca936800..6a0844df6a8c 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -67,6 +67,11 @@ public BooleanConfParser defaultValue(boolean value) { return self(); } + public BooleanConfParser defaultValue(String value) { + this.defaultValue = Boolean.parseBoolean(value); + return self(); + } + public boolean parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Boolean::parseBoolean, defaultValue); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 08b3fbee7590..0d0687465d37 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -103,6 +103,14 @@ public String overwriteMode() { return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null; } + public boolean wapEnabled() { + return confParser + .booleanConf() + .tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED) + .defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT) + .parse(); + } + public String wapId() { return sessionConf.get("spark.wap.id", null); } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 3ba40bc88582..8d955bdd21e8 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -51,7 +51,6 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -98,6 +97,7 @@ class SparkWrite { private final String queryId; private final FileFormat format; private final String applicationId; + private final boolean wapEnabled; private final String wapId; private final long targetFileSize; private final Schema writeSchema; @@ -120,6 +120,7 @@ class SparkWrite { this.queryId = writeInfo.queryId(); this.format = writeConf.dataFileFormat(); this.applicationId = applicationId; + this.wapEnabled = writeConf.wapEnabled(); this.wapId = writeConf.wapId(); this.targetFileSize = writeConf.targetDataFileSize(); this.writeSchema = writeSchema; @@ -156,15 +157,6 @@ StreamingWrite asStreamingOverwrite() { return new StreamingOverwrite(); } - private boolean isWapTable() { - return Boolean.parseBoolean( - table - .properties() - .getOrDefault( - TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, - TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)); - } - // the writer factory works for both batch and streaming private WriterFactory createWriterFactory() { // broadcast the table metadata as the writer factory will be sent to executors @@ -188,7 +180,7 @@ private void commitOperation(SnapshotUpdate operation, String description) { CommitMetadata.commitProperties().forEach(operation::set); } - if (isWapTable() && wapId != null) { + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);