diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 8169bbafe..a653a112b 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -67,6 +67,18 @@ private SparkSQLProperties() {} // Controls whether to report locality information to Spark while allocating input partitions public static final String LOCALITY = "spark.sql.iceberg.locality.enabled"; + // Controls compression codec for write operations + public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + + // Controls compression level for write operations + public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level"; + + // Controls compression strategy for write operations + public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy"; + + // Controls advisory partition size for write operations + public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size"; + // Controls the spark input split size. public static final String SPLIT_SIZE = "spark.sql.iceberg.split-size"; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 28b4d7db6..7cb1d16e2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -21,6 +21,18 @@ import static org.apache.iceberg.DistributionMode.HASH; import static org.apache.iceberg.DistributionMode.NONE; import static org.apache.iceberg.DistributionMode.RANGE; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import java.util.Locale; import java.util.Map; @@ -384,4 +396,197 @@ public String branch() { return branch; } + + public Map writeProperties() { + Map writeProperties = Maps.newHashMap(); + writeProperties.putAll(dataWriteProperties()); + writeProperties.putAll(deleteWriteProperties()); + return writeProperties; + } + + private Map dataWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat dataFormat = dataFileFormat(); + + switch (dataFormat) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); + String parquetCompressionLevel = parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec()); + String avroCompressionLevel = avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(ORC_COMPRESSION, orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } + + private Map deleteWriteProperties() { + Map writeProperties = Maps.newHashMap(); + FileFormat deleteFormat = deleteFileFormat(); + + switch (deleteFormat) { + case PARQUET: + writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); + String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); + if (deleteParquetCompressionLevel != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); + } + break; + + case AVRO: + writeProperties.put(DELETE_AVRO_COMPRESSION, deleteAvroCompressionCodec()); + String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); + if (deleteAvroCompressionLevel != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); + } + break; + + case ORC: + writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec()); + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy()); + break; + + default: + // skip + } + + return writeProperties; + } + + private String parquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.PARQUET_COMPRESSION) + .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT) + .parse(); + } + + private String deleteParquetCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_PARQUET_COMPRESSION) + .defaultValue(parquetCompressionCodec()) + .parse(); + } + + private String parquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL) + .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + private String deleteParquetCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) + .defaultValue(parquetCompressionLevel()) + .parseOptional(); + } + + private String avroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.AVRO_COMPRESSION) + .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT) + .parse(); + } + + private String deleteAvroCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_AVRO_COMPRESSION) + .defaultValue(avroCompressionCodec()) + .parse(); + } + + private String avroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL) + .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT) + .parseOptional(); + } + + private String deleteAvroCompressionLevel() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_LEVEL) + .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) + .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) + .defaultValue(avroCompressionLevel()) + .parseOptional(); + } + + private String orcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(TableProperties.ORC_COMPRESSION) + .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT) + .parse(); + } + + private String deleteOrcCompressionCodec() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_CODEC) + .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) + .tableProperty(DELETE_ORC_COMPRESSION) + .defaultValue(orcCompressionCodec()) + .parse(); + } + + private String orcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY) + .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT) + .parse(); + } + + private String deleteOrcCompressionStrategy() { + return confParser + .stringConf() + .option(SparkWriteOptions.COMPRESSION_STRATEGY) + .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) + .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) + .defaultValue(orcCompressionStrategy()) + .parse(); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index d1fecf10f..cab919b20 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -85,4 +85,15 @@ private SparkWriteOptions() {} // Isolation Level for DataFrame calls. Currently supported by overwritePartitions public static final String ISOLATION_LEVEL = "isolation-level"; + + // Controls write compress options + public static final String COMPRESSION_CODEC = "compression-codec"; + public static final String COMPRESSION_LEVEL = "compression-level"; + public static final String COMPRESSION_STRATEGY = "compression-strategy"; + + // Overrides the advisory partition size + public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size"; + + // Overrides the delete granularity + public static final String DELETE_GRANULARITY = "delete-granularity"; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 0aebb6bdb..7f95f92ae 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -74,6 +75,7 @@ public class SparkPositionDeletesRewrite implements Write { private final String fileSetId; private final int specId; private final StructLike partition; + private final Map writeProperties; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -106,6 +108,7 @@ public class SparkPositionDeletesRewrite implements Write { this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; + this.writeProperties = writeConf.writeProperties(); } @Override @@ -129,7 +132,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { writeSchema, dsSchema, specId, - partition); + partition, + writeProperties); } @Override @@ -174,6 +178,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final StructType dsSchema; private final int specId; private final StructLike partition; + private final Map writeProperties; PositionDeletesWriterFactory( Broadcast tableBroadcast, @@ -183,7 +188,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { Schema writeSchema, StructType dsSchema, int specId, - StructLike partition) { + StructLike partition, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -192,6 +198,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.dsSchema = dsSchema; this.specId = specId; this.partition = partition; + this.writeProperties = writeProperties; } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 577e94a82..b0b8316fe 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -102,6 +102,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Map extraSnapshotMetadata; private final Distribution requiredDistribution; private final SortOrder[] requiredOrdering; + private final Map writeProperties; private boolean cleanupOnAbort = true; @@ -130,6 +131,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); this.requiredDistribution = requiredDistribution; this.requiredOrdering = requiredOrdering; + this.writeProperties = writeConf.writeProperties(); } @Override @@ -154,7 +156,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context); + return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); } @Override @@ -331,11 +333,17 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; private final Command command; private final Context context; + private final Map writeProperties; - PositionDeltaWriteFactory(Broadcast
tableBroadcast, Command command, Context context) { + PositionDeltaWriteFactory( + Broadcast
tableBroadcast, + Command command, + Context context, + Map writeProperties) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; + this.writeProperties = writeProperties; } @Override