Skip to content

Commit 89ac392

Browse files
SongChujunebyhr
authored andcommitted
Support setting compression_codec table property in Iceberg
Makes compression_codec a table property and drops support for compression_codec session property for simplicity
1 parent 0a39998 commit 89ac392

File tree

12 files changed

+432
-170
lines changed

12 files changed

+432
-170
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -973,8 +973,13 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
973973
- Description
974974
* - `format`
975975
- Optionally specifies the format of table data files; either `PARQUET`,
976-
`ORC`, or `AVRO`. Defaults to the value of the `iceberg.file-format` catalog
977-
configuration property, which defaults to `PARQUET`.
976+
`ORC`, or `AVRO`. Defaults to the value of the `iceberg.file-format`
977+
catalog configuration property, which defaults to `PARQUET`.
978+
* - `compression_codec`
979+
- Optionally specifies the compression-codec used for writing the table;
980+
either `NONE`, `ZSTD`, `SNAPPY`, `LZ4`, or `GZIP`. Defaults to the value
981+
of the `iceberg.compression-codec` catalog configuration property, which
982+
defaults to `ZSTD`.
978983
* - `partitioning`
979984
- Optionally specifies table partitioning. If a table is partitioned by
980985
columns `c1` and `c2`, the partitioning property is `partitioning =
@@ -1032,9 +1037,9 @@ WITH (
10321037
location = '/var/example_tables/test_table');
10331038
```
10341039

1035-
The table definition below specifies to use ORC files, bloom filter index by columns
1036-
`c1` and `c2`, fpp is 0.05, and a file system location of
1037-
`/var/example_tables/test_table`:
1040+
The table definition below specifies to use ORC files with compression_codec
1041+
SNAPPY, bloom filter index by columns `c1` and `c2`, fpp is 0.05, and a file
1042+
system location of `/var/example_tables/test_table`:
10381043

10391044
```sql
10401045
CREATE TABLE test_table (
@@ -1043,6 +1048,7 @@ CREATE TABLE test_table (
10431048
c3 DOUBLE)
10441049
WITH (
10451050
format = 'ORC',
1051+
compression_codec = 'SNAPPY',
10461052
location = '/var/example_tables/test_table',
10471053
orc_bloom_filter_columns = ARRAY['c1', 'c2'],
10481054
orc_bloom_filter_fpp = 0.05);

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import static io.trino.plugin.iceberg.IcebergAvroDataConversion.toIcebergRecords;
3636
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR;
3737
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
38+
import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO;
39+
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
3840
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
3941
import static java.util.Objects.requireNonNull;
4042
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
@@ -63,6 +65,8 @@ public IcebergAvroFileWriter(
6365
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
6466
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
6567

68+
validateCompression(AVRO, Optional.of(hiveCompressionCodec));
69+
6670
try {
6771
avroWriter = Avro.write(file)
6872
.schema(icebergSchema)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.trino.parquet.writer.ParquetWriterOptions;
3232
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
3333
import io.trino.plugin.hive.HiveCompressionCodec;
34+
import io.trino.plugin.hive.HiveCompressionOption;
3435
import io.trino.plugin.hive.NodeVersion;
3536
import io.trino.plugin.hive.orc.OrcWriterConfig;
3637
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
@@ -60,7 +61,9 @@
6061
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
6162
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
6263
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED;
63-
import static io.trino.plugin.iceberg.IcebergSessionProperties.getCompressionCodec;
64+
import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO;
65+
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
66+
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
6467
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcStringStatisticsLimit;
6568
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxDictionaryMemory;
6669
import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxRowGroupRows;
@@ -74,6 +77,7 @@
7477
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageValueCount;
7578
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
7679
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
80+
import static io.trino.plugin.iceberg.IcebergUtil.getHiveCompressionCodec;
7781
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterColumns;
7882
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp;
7983
import static io.trino.plugin.iceberg.IcebergUtil.getParquetBloomFilterColumns;
@@ -97,19 +101,22 @@ public class IcebergFileWriterFactory
97101
private final NodeVersion nodeVersion;
98102
private final FileFormatDataSourceStats readStats;
99103
private final OrcWriterStats orcWriterStats = new OrcWriterStats();
104+
private final HiveCompressionOption hiveCompressionOption;
100105
private final OrcWriterOptions orcWriterOptions;
101106

102107
@Inject
103108
public IcebergFileWriterFactory(
104109
TypeManager typeManager,
105110
NodeVersion nodeVersion,
106111
FileFormatDataSourceStats readStats,
112+
IcebergConfig icebergConfig,
107113
OrcWriterConfig orcWriterConfig)
108114
{
109115
checkArgument(!orcWriterConfig.isUseLegacyVersion(), "the ORC writer shouldn't be configured to use a legacy version");
110116
this.typeManager = requireNonNull(typeManager, "typeManager is null");
111117
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
112118
this.readStats = requireNonNull(readStats, "readStats is null");
119+
this.hiveCompressionOption = icebergConfig.getCompressionCodec();
113120
this.orcWriterOptions = orcWriterConfig.toOrcWriterOptions();
114121
}
115122

@@ -132,7 +139,7 @@ public IcebergFileWriter createDataFileWriter(
132139
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
133140
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
134141
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
135-
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, session);
142+
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties);
136143
};
137144
}
138145

@@ -146,7 +153,7 @@ public IcebergFileWriter createPositionDeleteWriter(
146153
return switch (fileFormat) {
147154
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
148155
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
149-
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
156+
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties);
150157
};
151158
}
152159

@@ -178,7 +185,9 @@ private IcebergFileWriter createParquetWriter(
178185
.setBloomFilterColumns(getParquetBloomFilterColumns(storageProperties))
179186
.build();
180187

181-
HiveCompressionCodec hiveCompressionCodec = toCompressionCodec(getCompressionCodec(session));
188+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(PARQUET, storageProperties)
189+
.orElse(toCompressionCodec(hiveCompressionOption));
190+
182191
return new IcebergParquetFileWriter(
183192
metricsConfig,
184193
outputFile,
@@ -189,8 +198,8 @@ private IcebergFileWriter createParquetWriter(
189198
makeTypeMap(fileColumnTypes, fileColumnNames),
190199
parquetWriterOptions,
191200
IntStream.range(0, fileColumnNames.size()).toArray(),
192-
hiveCompressionCodec.getParquetCompressionCodec()
193-
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(hiveCompressionCodec))),
201+
compressionCodec.getParquetCompressionCodec()
202+
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(compressionCodec))),
194203
nodeVersion.toString());
195204
}
196205
catch (IOException | UncheckedIOException e) {
@@ -234,6 +243,9 @@ private IcebergFileWriter createOrcWriter(
234243
});
235244
}
236245

246+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(ORC, storageProperties)
247+
.orElse(toCompressionCodec(hiveCompressionOption));
248+
237249
return new IcebergOrcFileWriter(
238250
metricsConfig,
239251
icebergSchema,
@@ -242,7 +254,7 @@ private IcebergFileWriter createOrcWriter(
242254
fileColumnNames,
243255
fileColumnTypes,
244256
toOrcType(icebergSchema),
245-
toCompressionCodec(getCompressionCodec(session)).getOrcCompressionKind(),
257+
compressionCodec.getOrcCompressionKind(),
246258
withBloomFilterOptions(orcWriterOptions, storageProperties)
247259
.withStripeMinSize(getOrcWriterMinStripeSize(session))
248260
.withStripeMaxSize(getOrcWriterMaxStripeSize(session))
@@ -287,19 +299,22 @@ private IcebergFileWriter createAvroWriter(
287299
TrinoFileSystem fileSystem,
288300
Location outputPath,
289301
Schema icebergSchema,
290-
ConnectorSession session)
302+
Map<String, String> storageProperties)
291303
{
292304
Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);
293305

294306
List<Type> columnTypes = icebergSchema.columns().stream()
295307
.map(column -> toTrinoType(column.type(), typeManager))
296308
.collect(toImmutableList());
297309

310+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(AVRO, storageProperties)
311+
.orElse(toCompressionCodec(hiveCompressionOption));
312+
298313
return new IcebergAvroFileWriter(
299314
new ForwardingOutputFile(fileSystem, outputPath),
300315
rollbackAction,
301316
icebergSchema,
302317
columnTypes,
303-
toCompressionCodec(getCompressionCodec(session)));
318+
compressionCodec);
304319
}
305320
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.trino.plugin.base.filter.UtcConstraintExtractor;
4444
import io.trino.plugin.base.projection.ApplyProjectionUtil;
4545
import io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
46+
import io.trino.plugin.hive.HiveCompressionCodec;
4647
import io.trino.plugin.hive.HiveStorageFormat;
4748
import io.trino.plugin.hive.HiveWrittenPartitions;
4849
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
@@ -273,6 +274,8 @@
273274
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
274275
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
275276
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_UNSUPPORTED_VIEW_DIALECT;
277+
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
278+
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
276279
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
277280
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
278281
import static io.trino.plugin.iceberg.IcebergMetadataColumn.PARTITION;
@@ -295,6 +298,7 @@
295298
import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName;
296299
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
297300
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
301+
import static io.trino.plugin.iceberg.IcebergTableProperties.COMPRESSION_CODEC;
298302
import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY;
299303
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
300304
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
@@ -307,6 +311,7 @@
307311
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
308312
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
309313
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
314+
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
310315
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
311316
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
312317
import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty;
@@ -318,7 +323,9 @@
318323
import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshotAfter;
319324
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
320325
import static io.trino.plugin.iceberg.IcebergUtil.getColumnMetadatas;
326+
import static io.trino.plugin.iceberg.IcebergUtil.getCompressionPropertyName;
321327
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
328+
import static io.trino.plugin.iceberg.IcebergUtil.getHiveCompressionCodec;
322329
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
323330
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
324331
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
@@ -426,6 +433,7 @@ public class IcebergMetadata
426433
.add(EXTRA_PROPERTIES_PROPERTY)
427434
.add(FILE_FORMAT_PROPERTY)
428435
.add(FORMAT_VERSION_PROPERTY)
436+
.add(COMPRESSION_CODEC)
429437
.add(MAX_COMMIT_RETRY)
430438
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
431439
.add(DATA_LOCATION_PROPERTY)
@@ -2497,10 +2505,13 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
24972505
}
24982506
}
24992507

2508+
IcebergFileFormat oldFileFormat = getFileFormat(icebergTable.properties());
2509+
IcebergFileFormat newFileFormat = oldFileFormat;
2510+
25002511
if (properties.containsKey(FILE_FORMAT_PROPERTY)) {
2501-
IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
2512+
newFileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
25022513
.orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty"));
2503-
updateProperties.defaultFormat(fileFormat.toIceberg());
2514+
updateProperties.defaultFormat(newFileFormat.toIceberg());
25042515
}
25052516

25062517
if (properties.containsKey(FORMAT_VERSION_PROPERTY)) {
@@ -2510,6 +2521,14 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
25102521
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
25112522
}
25122523

2524+
Map<String, String> propertiesForCompression = calculateTableCompressionProperties(oldFileFormat, newFileFormat, icebergTable.properties(), properties.entrySet().stream()
2525+
.filter(e -> e.getValue().isPresent())
2526+
.collect(toImmutableMap(
2527+
Map.Entry::getKey,
2528+
e -> e.getValue().get())));
2529+
2530+
propertiesForCompression.forEach(updateProperties::set);
2531+
25132532
if (properties.containsKey(MAX_COMMIT_RETRY)) {
25142533
int maxCommitRetry = (int) properties.get(MAX_COMMIT_RETRY)
25152534
.orElseThrow(() -> new IllegalArgumentException("The max_commit_retry property cannot be empty"));
@@ -2565,6 +2584,22 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
25652584
commitTransaction(transaction, "set table properties");
25662585
}
25672586

2587+
public static Map<String, String> calculateTableCompressionProperties(IcebergFileFormat oldFileFormat, IcebergFileFormat newFileFormat, Map<String, String> existingProperties, Map<String, Object> inputProperties)
2588+
{
2589+
ImmutableMap.Builder<String, String> newCompressionProperties = ImmutableMap.builder();
2590+
2591+
Optional<HiveCompressionCodec> oldCompressionCodec = getHiveCompressionCodec(oldFileFormat, existingProperties);
2592+
Optional<HiveCompressionCodec> newCompressionCodec = IcebergTableProperties.getCompressionCodec(inputProperties);
2593+
2594+
Optional<HiveCompressionCodec> compressionCodec = newCompressionCodec.or(() -> oldCompressionCodec);
2595+
2596+
validateCompression(newFileFormat, compressionCodec);
2597+
2598+
compressionCodec.ifPresent(hiveCompressionCodec -> newCompressionProperties.put(getCompressionPropertyName(newFileFormat), hiveCompressionCodec.name()));
2599+
2600+
return newCompressionProperties.buildOrThrow();
2601+
}
2602+
25682603
private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns)
25692604
{
25702605
UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec();
@@ -4051,7 +4086,7 @@ public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session
40514086
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;
40524087
IcebergFileFormat storageFormat = getFileFormat(tableHandle.getStorageProperties());
40534088

4054-
return storageFormat == IcebergFileFormat.ORC || storageFormat == IcebergFileFormat.PARQUET;
4089+
return storageFormat == ORC || storageFormat == PARQUET;
40554090
}
40564091

40574092
@Override

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.airlift.units.Duration;
2020
import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode;
2121
import io.trino.plugin.base.session.SessionPropertiesProvider;
22-
import io.trino.plugin.hive.HiveCompressionOption;
2322
import io.trino.plugin.hive.orc.OrcReaderConfig;
2423
import io.trino.plugin.hive.orc.OrcWriterConfig;
2524
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
@@ -65,7 +64,6 @@ public final class IcebergSessionProperties
6564
implements SessionPropertiesProvider
6665
{
6766
public static final String SPLIT_SIZE = "experimental_split_size";
68-
private static final String COMPRESSION_CODEC = "compression_codec";
6967
private static final String USE_FILE_SIZE_FROM_METADATA = "use_file_size_from_metadata";
7068
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
7169
private static final String ORC_MAX_MERGE_DISTANCE = "orc_max_merge_distance";
@@ -131,12 +129,6 @@ public IcebergSessionProperties(
131129
// See https://github.com/trinodb/trino/issues/9018#issuecomment-1752929193 for further discussion.
132130
null,
133131
true))
134-
.add(enumProperty(
135-
COMPRESSION_CODEC,
136-
"Compression codec to use when writing files",
137-
HiveCompressionOption.class,
138-
icebergConfig.getCompressionCodec(),
139-
false))
140132
.add(booleanProperty(
141133
USE_FILE_SIZE_FROM_METADATA,
142134
"Use file size stored in Iceberg metadata",
@@ -518,11 +510,6 @@ public static Optional<DataSize> getSplitSize(ConnectorSession session)
518510
return Optional.ofNullable(session.getProperty(SPLIT_SIZE, DataSize.class));
519511
}
520512

521-
public static HiveCompressionOption getCompressionCodec(ConnectorSession session)
522-
{
523-
return session.getProperty(COMPRESSION_CODEC, HiveCompressionOption.class);
524-
}
525-
526513
public static boolean isUseFileSizeFromMetadata(ConnectorSession session)
527514
{
528515
return session.getProperty(USE_FILE_SIZE_FROM_METADATA, Boolean.class);

0 commit comments

Comments
 (0)