Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public BooleanConfParser defaultValue(boolean value) {
return self();
}

public BooleanConfParser defaultValue(String value) {
this.defaultValue = Boolean.parseBoolean(value);
return self();
}

public boolean parse() {
Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
return parse(Boolean::parseBoolean, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ public String overwriteMode() {
return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null;
}

public boolean wapEnabled() {
return confParser.booleanConf()
.tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED)
.defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)
.parse();
}

public String wapId() {
return sessionConf.get("spark.wap.id", null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -100,6 +99,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final String queryId;
private final FileFormat format;
private final String applicationId;
private final boolean wapEnabled;
private final String wapId;
private final long targetFileSize;
private final Schema writeSchema;
Expand All @@ -118,6 +118,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.queryId = writeInfo.queryId();
this.format = writeConf.dataFileFormat();
this.applicationId = applicationId;
this.wapEnabled = writeConf.wapEnabled();
this.wapId = writeConf.wapId();
this.targetFileSize = writeConf.targetDataFileSize();
this.writeSchema = writeSchema;
Expand Down Expand Up @@ -166,11 +167,6 @@ StreamingWrite asStreamingOverwrite() {
return new StreamingOverwrite();
}

private boolean isWapTable() {
return Boolean.parseBoolean(table.properties().getOrDefault(
TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
}

// the writer factory works for both batch and streaming
private WriterFactory createWriterFactory() {
// broadcast the table metadata as the writer factory will be sent to executors
Expand All @@ -188,7 +184,7 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
extraSnapshotMetadata.forEach(operation::set);
}

if (isWapTable() && wapId != null) {
if (wapEnabled && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);
Expand Down