Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ private SparkSQLProperties() {}
// Controls whether to report locality information to Spark while allocating input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";

// Controls compression codec for write operations
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";

// Controls compression level for write operations
public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level";

// Controls compression strategy for write operations
public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy";

// Controls advisory partition size for write operations
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";

// Controls the spark input split size.
public static final String SPLIT_SIZE = "spark.sql.iceberg.split-size";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
import static org.apache.iceberg.DistributionMode.HASH;
import static org.apache.iceberg.DistributionMode.NONE;
import static org.apache.iceberg.DistributionMode.RANGE;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;

import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -384,4 +396,197 @@ public String branch() {

return branch;
}

public Map<String, String> writeProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
writeProperties.putAll(dataWriteProperties());
writeProperties.putAll(deleteWriteProperties());
return writeProperties;
}

private Map<String, String> dataWriteProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
FileFormat dataFormat = dataFileFormat();

switch (dataFormat) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel);
}
break;

case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}

private Map<String, String> deleteWriteProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
FileFormat deleteFormat = deleteFileFormat();

switch (deleteFormat) {
case PARQUET:
writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec());
String deleteParquetCompressionLevel = deleteParquetCompressionLevel();
if (deleteParquetCompressionLevel != null) {
writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(DELETE_AVRO_COMPRESSION, deleteAvroCompressionCodec());
String deleteAvroCompressionLevel = deleteAvroCompressionLevel();
if (deleteAvroCompressionLevel != null) {
writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel);
}
break;

case ORC:
writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec());
writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}

private String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

private String deleteParquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_PARQUET_COMPRESSION)
.defaultValue(parquetCompressionCodec())
.parse();
}

private String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

private String deleteParquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL)
.defaultValue(parquetCompressionLevel())
.parseOptional();
}

private String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

private String deleteAvroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_AVRO_COMPRESSION)
.defaultValue(avroCompressionCodec())
.parse();
}

private String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

private String deleteAvroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(DELETE_AVRO_COMPRESSION_LEVEL)
.defaultValue(avroCompressionLevel())
.parseOptional();
}

private String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

private String deleteOrcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_ORC_COMPRESSION)
.defaultValue(orcCompressionCodec())
.parse();
}

private String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}

private String deleteOrcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
.defaultValue(orcCompressionStrategy())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,15 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";

// Controls write compress options
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";

// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size";

// Overrides the delete granularity
public static final String DELETE_GRANULARITY = "delete-granularity";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final String fileSetId;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;

/**
* Constructs a {@link SparkPositionDeletesRewrite}.
Expand Down Expand Up @@ -106,6 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand All @@ -129,7 +132,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
writeSchema,
dsSchema,
specId,
partition);
partition,
writeProperties);
}

@Override
Expand Down Expand Up @@ -174,6 +178,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
private final StructType dsSchema;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;

PositionDeletesWriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -183,7 +188,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
Schema writeSchema,
StructType dsSchema,
int specId,
StructLike partition) {
StructLike partition,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.queryId = queryId;
this.format = format;
Expand All @@ -192,6 +198,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
this.dsSchema = dsSchema;
this.specId = specId;
this.partition = partition;
this.writeProperties = writeProperties;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Map<String, String> extraSnapshotMetadata;
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;

Expand Down Expand Up @@ -130,6 +131,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.requiredDistribution = requiredDistribution;
this.requiredOrdering = requiredOrdering;
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand All @@ -154,7 +156,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
// broadcast the table metadata as the writer factory will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
return new PositionDeltaWriteFactory(tableBroadcast, command, context);
return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties);
}

@Override
Expand Down Expand Up @@ -331,11 +333,17 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory {
private final Broadcast<Table> tableBroadcast;
private final Command command;
private final Context context;
private final Map<String, String> writeProperties;

PositionDeltaWriteFactory(Broadcast<Table> tableBroadcast, Command command, Context context) {
PositionDeltaWriteFactory(
Broadcast<Table> tableBroadcast,
Command command,
Context context,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.command = command;
this.context = context;
this.writeProperties = writeProperties;
}

@Override
Expand Down