Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ Property Name Description

``iceberg.compression-codec`` The compression codec to use when writing files. The ``GZIP`` Yes No, write is not supported yet
available values are ``NONE``, ``SNAPPY``, ``GZIP``,
and ``ZSTD``.
``LZ4``, and ``ZSTD``.

Note: ``ZSTD`` is only available when
Note: ``LZ4`` is only available when
``iceberg.file-format=ORC``.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.orc.metadata.CompressionKind;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

Expand All @@ -25,19 +26,20 @@
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static java.util.Objects.requireNonNull;

public enum HiveCompressionCodec
{
NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED, f -> true),
SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY, f -> true),
GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP, f -> true),
LZ4(null, CompressionKind.NONE, null, f -> f == PAGEFILE),
ZSTD(null, CompressionKind.ZSTD, null, f -> f == ORC || f == DWRF || f == PAGEFILE);
LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.UNCOMPRESSED, f -> f == PAGEFILE || f == ORC),
ZSTD(null, CompressionKind.ZSTD, CompressionCodecName.ZSTD, f -> f == ORC || f == DWRF || f == PAGEFILE || f == PARQUET);

private final Optional<Class<? extends CompressionCodec>> codec;
private final CompressionKind orcCompressionKind;
private final Optional<CompressionCodecName> parquetCompressionCodec;
private final CompressionCodecName parquetCompressionCodec;
private final Predicate<HiveStorageFormat> supportedStorageFormats;

HiveCompressionCodec(
Expand All @@ -48,7 +50,7 @@ public enum HiveCompressionCodec
{
this.codec = Optional.ofNullable(codec);
this.orcCompressionKind = requireNonNull(orcCompressionKind, "orcCompressionKind is null");
this.parquetCompressionCodec = Optional.ofNullable(parquetCompressionCodec);
this.parquetCompressionCodec = requireNonNull(parquetCompressionCodec, "parquetCompressionCodec is null");
this.supportedStorageFormats = requireNonNull(supportedStorageFormats, "supportedStorageFormats is null");
}

Expand All @@ -62,7 +64,7 @@ public CompressionKind getOrcCompressionKind()
return orcCompressionKind;
}

public Optional<CompressionCodecName> getParquetCompressionCodec()
public CompressionCodecName getParquetCompressionCodec()
{
return parquetCompressionCodec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public final class HiveSessionProperties
private static final String ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL = "orc_optimized_writer_compression_level";
private static final String PAGEFILE_WRITER_MAX_STRIPE_SIZE = "pagefile_writer_max_stripe_size";
public static final String HIVE_STORAGE_FORMAT = "hive_storage_format";
private static final String COMPRESSION_CODEC = "compression_codec";
static final String COMPRESSION_CODEC = "compression_codec";
private static final String ORC_COMPRESSION_CODEC = "orc_compression_codec";
public static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String CREATE_EMPTY_BUCKET_FILES = "create_empty_bucket_files";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private static void setCompressionProperties(Configuration config, HiveCompressi
config.unset(FileOutputFormat.COMPRESS_CODEC);
}
// For Parquet
compression.getParquetCompressionCodec().ifPresent(codec -> config.set(ParquetOutputFormat.COMPRESSION, codec.name()));
config.set(ParquetOutputFormat.COMPRESSION, compression.getParquetCompressionCodec().name());
// For SequenceFile
config.set(FileOutputFormat.COMPRESS_TYPE, BLOCK.toString());
// For PageFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN;
Expand Down Expand Up @@ -106,6 +107,7 @@
import static com.facebook.presto.hive.HiveQueryRunner.TPCH_SCHEMA;
import static com.facebook.presto.hive.HiveQueryRunner.createBucketedSession;
import static com.facebook.presto.hive.HiveQueryRunner.createMaterializeExchangesSession;
import static com.facebook.presto.hive.HiveSessionProperties.COMPRESSION_CODEC;
import static com.facebook.presto.hive.HiveSessionProperties.FILE_RENAMING_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.MANIFEST_VERIFICATION_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED;
Expand Down Expand Up @@ -159,6 +161,7 @@
import static io.airlift.tpch.TpchTable.PART_SUPPLIER;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ROOT;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -5267,17 +5270,6 @@ public void testPageFileFormatSmallSplitSize()
assertUpdate("DROP TABLE test_pagefile_small_split");
}

@Test
public void testPageFileCompression()
{
for (HiveCompressionCodec compression : HiveCompressionCodec.values()) {
if (!compression.isSupportedStorageFormat(PAGEFILE)) {
continue;
}
testPageFileCompression(compression.name());
}
}

@Test
public void testPartialAggregatePushdownORC()
{
Expand Down Expand Up @@ -5703,31 +5695,35 @@ public void testParquetSelectivePageSourceFails()
assertQueryFails(parquetFilterPushdownSession, "SELECT a FROM test_parquet_filter_pushdoown WHERE b = false", "Parquet reader doesn't support filter pushdown yet");
}

private void testPageFileCompression(String compression)
@DataProvider(name = "testFormatAndCompressionCodecs")
public Object[][] compressionCodecs()
{
Session testSession = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(catalog, "compression_codec", compression)
.setCatalogSessionProperty(catalog, "pagefile_writer_max_stripe_size", "100B")
.setCatalogSessionProperty(catalog, "max_split_size", "1kB")
.setCatalogSessionProperty(catalog, "max_initial_split_size", "1kB")
.build();

assertUpdate(
testSession,
"CREATE TABLE test_pagefile_compression\n" +
"WITH (\n" +
"format = 'PAGEFILE'\n" +
") AS\n" +
"SELECT\n" +
"*\n" +
"FROM tpch.orders",
"SELECT count(*) FROM orders");

assertQuery(testSession, "SELECT count(*) FROM test_pagefile_compression", "SELECT count(*) FROM orders");

assertQuery(testSession, "SELECT sum(custkey) FROM test_pagefile_compression", "SELECT sum(custkey) FROM orders");
return Stream.of(PARQUET, ORC, PAGEFILE)
.flatMap(format -> Arrays.stream(HiveCompressionCodec.values())
.map(codec -> new Object[] {codec, format}))
.toArray(Object[][]::new);
}

assertUpdate("DROP TABLE test_pagefile_compression");
@Test(dataProvider = "testFormatAndCompressionCodecs")
public void testFormatAndCompressionCodecs(HiveCompressionCodec codec, HiveStorageFormat format)
{
String tableName = "test_" + format.name().toLowerCase(ROOT) + "_compression_codec_" + codec.name().toLowerCase(ROOT);
Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", COMPRESSION_CODEC, codec.name()).build();
if (codec.isSupportedStorageFormat(format == PARQUET ? HiveStorageFormat.PARQUET : HiveStorageFormat.ORC)) {
assertUpdate(session,
format("CREATE TABLE %s WITH (format = '%s') AS SELECT * FROM orders",
tableName, format.name()),
"SELECT count(*) FROM orders");
assertQuery(format("SELECT count(*) FROM %s", tableName), "SELECT count(*) FROM orders");
assertQuery(format("SELECT sum(custkey) FROM %s", tableName), "SELECT sum(custkey) FROM orders");
assertQuerySucceeds(format("DROP TABLE %s", tableName));
}
else {
assertQueryFails(session, format("CREATE TABLE %s WITH (format = '%s') AS SELECT * FROM orders",
tableName, format.name()),
format("%s compression is not supported with %s", codec, format));
}
}

private static Consumer<Plan> assertTableWriterMergeNodeIsPresent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@

import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS;
import static com.facebook.presto.hive.HiveCompressionCodec.NONE;
import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION;
Expand Down Expand Up @@ -246,9 +245,6 @@ public FormatWriter createFileFormatWriter(
HiveCompressionCodec compressionCodec)
throws IOException
{
if (!compressionCodec.isSupportedStorageFormat(PAGEFILE)) {
compressionCodec = NONE;
}
return new PrestoPageFormatWriter(targetFile, compressionCodec);
}
},
Expand Down Expand Up @@ -696,7 +692,7 @@ public PrestoParquetFormatWriter(File targetFile, List<String> columnNames, List
columnNames,
types,
ParquetWriterOptions.builder().build(),
compressionCodec.getParquetCompressionCodec().get().getHadoopCompressionCodecClassName());
compressionCodec.getParquetCompressionCodec().getHadoopCompressionCodecClassName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private IcebergFileWriter createParquetWriter(
makeTypeMap(fileColumnTypes, fileColumnNames),
parquetWriterOptions,
IntStream.range(0, fileColumnNames.size()).toArray(),
getCompressionCodec(session).getParquetCompressionCodec().get(),
getCompressionCodec(session).getParquetCompressionCodec(),
outputPath,
hdfsEnvironment,
hdfsContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

public final class IcebergSessionProperties
{
private static final String COMPRESSION_CODEC = "compression_codec";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String PARQUET_WRITER_VERSION = "parquet_writer_version";
Expand All @@ -61,6 +60,7 @@ public final class IcebergSessionProperties
private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
private static final String NESSIE_REFERENCE_NAME = "nessie_reference_name";
private static final String NESSIE_REFERENCE_HASH = "nessie_reference_hash";
static final String COMPRESSION_CODEC = "compression_codec";
Comment thread
hantangwangd marked this conversation as resolved.
public static final String PARQUET_DEREFERENCE_PUSHDOWN_ENABLED = "parquet_dereference_pushdown_enabled";
public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled";
public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ public static Map<String, String> populateTableProperties(IcebergAbstractMetadat
if (!compressionCodec.isSupportedStorageFormat(HiveStorageFormat.PARQUET)) {
throw new PrestoException(NOT_SUPPORTED, format("Compression codec %s is not supported for Parquet format", compressionCodec));
}
propertiesBuilder.put(PARQUET_COMPRESSION, compressionCodec.getParquetCompressionCodec().get().toString());
propertiesBuilder.put(PARQUET_COMPRESSION, compressionCodec.getParquetCompressionCodec().name());
break;
case ORC:
if (!compressionCodec.isSupportedStorageFormat(HiveStorageFormat.ORC)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1111,9 +1111,9 @@ public Object[][] compressionCodecTestData()
{
return new Object[][] {
// codec, format, shouldSucceed, expectedErrorMessage
{"ZSTD", "PARQUET", false, "Compression codec ZSTD is not supported for Parquet format"},
{"ZSTD", "PARQUET", true, null},
{"LZ4", "PARQUET", false, "Compression codec LZ4 is not supported for Parquet format"},
{"LZ4", "ORC", false, "Compression codec LZ4 is not supported for ORC format"},
{"LZ4", "ORC", true, null},
{"ZSTD", "ORC", true, null},
{"SNAPPY", "ORC", true, null},
{"SNAPPY", "PARQUET", true, null},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.HiveStorageFormat;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.s3.HiveS3Config;
Expand Down Expand Up @@ -145,8 +147,11 @@
import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.FileFormat.ORC;
import static com.facebook.presto.iceberg.FileFormat.PARQUET;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.iceberg.IcebergSessionProperties.COMPRESSION_CODEC;
import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
Expand All @@ -169,6 +174,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.String.format;
import static java.nio.file.Files.createTempDirectory;
import static java.util.Locale.ROOT;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -2857,8 +2863,8 @@ private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)

private void testWithAllFileFormats(Session session, BiConsumer<Session, FileFormat> test)
{
test.accept(session, FileFormat.PARQUET);
test.accept(session, FileFormat.ORC);
test.accept(session, PARQUET);
test.accept(session, ORC);
}

private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount)
Expand Down Expand Up @@ -3076,6 +3082,36 @@ public void testStatisticsFileCacheInvalidationProcedure()
getQueryRunner().execute("DROP TABLE test_statistics_file_cache_procedure");
}

@DataProvider(name = "testFormatAndCompressionCodecs")
public Object[][] compressionCodecs()
{
return Stream.of(PARQUET, ORC)
.flatMap(format -> Arrays.stream(HiveCompressionCodec.values())
.map(codec -> new Object[] {codec, format}))
.toArray(Object[][]::new);
}

@Test(dataProvider = "testFormatAndCompressionCodecs")
public void testFormatAndCompressionCodecs(HiveCompressionCodec codec, FileFormat format)
{
String tableName = "test_" + format.name().toLowerCase(ROOT) + "_compression_codec_" + codec.name().toLowerCase(ROOT);
Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", COMPRESSION_CODEC, codec.name()).build();
if (codec.isSupportedStorageFormat(format == PARQUET ? HiveStorageFormat.PARQUET : HiveStorageFormat.ORC)) {
String codecName = format == PARQUET ? codec.getParquetCompressionCodec().name() : codec.getOrcCompressionKind().name();
assertQuerySucceeds(session, format("CREATE TABLE %s WITH (\"write.format.default\" = '%s') as select * from lineitem with no data", tableName, format.name()));
assertQuery(session, format("SELECT value FROM \"%s$properties\" WHERE key = 'write.%s.compression-codec'", tableName, format.name().toLowerCase(ROOT)), format("VALUES '%s'", codecName));
assertQuery(session, format("SELECT value FROM \"%s$properties\" WHERE key = 'write.format.default'", tableName), format("VALUES '%s'", format.name()));
assertUpdate(session, format("INSERT INTO %s SELECT * from lineitem", tableName), "select count(*) from lineitem");
assertQuery(session, format("SELECT * FROM %s", tableName), "select * from lineitem");
assertQuerySucceeds(format("DROP TABLE %s", tableName));
}
else {
assertQueryFails(session, format("CREATE TABLE %s WITH (\"write.format.default\" = '%s') as select * from lineitem with no data", tableName, format.name()),
format("Compression codec %s is not supported for .*", codec));
}
}

@DataProvider(name = "sortedTableWithSortTransform")
public static Object[][] sortedTableWithSortTransform()
{
Expand Down
Loading
Loading