From 39fedf0dd82b37fe2a6a8c5698f53d16bb5d1c55 Mon Sep 17 00:00:00 2001 From: wangd Date: Mon, 20 Oct 2025 17:58:42 +0800 Subject: [PATCH] feat: Add support for LZ4 and ZSTD compression codecs These codecs are available in the writers, but don't seem to have been configured correctly. Trying to write tables with these formats previously threw errors. This change enables ZSTD on Parquet and LZ4 on ORC for data writers in Hive and Iceberg --- .../src/main/sphinx/connector/iceberg.rst | 4 +- .../presto/hive/HiveCompressionCodec.java | 12 ++-- .../presto/hive/HiveSessionProperties.java | 2 +- .../presto/hive/util/ConfigurationUtils.java | 2 +- .../hive/TestHiveIntegrationSmokeTest.java | 64 +++++++++---------- .../presto/hive/benchmark/FileFormat.java | 6 +- .../iceberg/IcebergFileWriterFactory.java | 2 +- .../iceberg/IcebergSessionProperties.java | 2 +- .../facebook/presto/iceberg/IcebergUtil.java | 2 +- .../IcebergDistributedSmokeTestBase.java | 4 +- .../iceberg/IcebergDistributedTestBase.java | 40 +++++++++++- .../presto/iceberg/TestIcebergUtil.java | 14 ++-- .../presto/parquet/writer/ParquetWriter.java | 15 +++-- 13 files changed, 100 insertions(+), 69 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 0efa0afa3114c..dcfef634da5eb 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -347,9 +347,9 @@ Property Name Description ``iceberg.compression-codec`` The compression codec to use when writing files. The ``GZIP`` Yes No, write is not supported yet available values are ``NONE``, ``SNAPPY``, ``GZIP``, - and ``ZSTD``. + ``LZ4``, and ``ZSTD``. - Note: ``ZSTD`` is only available when + Note: ``LZ4`` is only available when ``iceberg.file-format=ORC``. diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCompressionCodec.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCompressionCodec.java index 020007c885382..e352603559517 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCompressionCodec.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCompressionCodec.java @@ -16,6 +16,7 @@ import com.facebook.presto.orc.metadata.CompressionKind; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -25,6 +26,7 @@ import static com.facebook.presto.hive.HiveStorageFormat.DWRF; import static com.facebook.presto.hive.HiveStorageFormat.ORC; import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE; +import static com.facebook.presto.hive.HiveStorageFormat.PARQUET; import static java.util.Objects.requireNonNull; public enum HiveCompressionCodec @@ -32,12 +34,12 @@ public enum HiveCompressionCodec NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED, f -> true), SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY, f -> true), GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP, f -> true), - LZ4(null, CompressionKind.NONE, null, f -> f == PAGEFILE), - ZSTD(null, CompressionKind.ZSTD, null, f -> f == ORC || f == DWRF || f == PAGEFILE); + LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.UNCOMPRESSED, f -> f == PAGEFILE || f == ORC), + ZSTD(null, CompressionKind.ZSTD, CompressionCodecName.ZSTD, f -> f == ORC || f == DWRF || f == PAGEFILE || f == PARQUET); private final Optional> codec; private final CompressionKind orcCompressionKind; - private final Optional parquetCompressionCodec; + private final CompressionCodecName parquetCompressionCodec; private final Predicate supportedStorageFormats; HiveCompressionCodec( @@ -48,7 +50,7 @@ public enum HiveCompressionCodec { this.codec = Optional.ofNullable(codec); this.orcCompressionKind = requireNonNull(orcCompressionKind, "orcCompressionKind is null"); - this.parquetCompressionCodec = Optional.ofNullable(parquetCompressionCodec); + this.parquetCompressionCodec = requireNonNull(parquetCompressionCodec, "parquetCompressionCodec is null"); this.supportedStorageFormats = requireNonNull(supportedStorageFormats, "supportedStorageFormats is null"); } @@ -62,7 +64,7 @@ public CompressionKind getOrcCompressionKind() return orcCompressionKind; } - public Optional getParquetCompressionCodec() + public CompressionCodecName getParquetCompressionCodec() { return parquetCompressionCodec; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 2a1acfb9f835b..4879d5cb0d4ad 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -59,7 +59,7 @@ public final class HiveSessionProperties private static final String ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL = "orc_optimized_writer_compression_level"; private static final String PAGEFILE_WRITER_MAX_STRIPE_SIZE = "pagefile_writer_max_stripe_size"; public static final String HIVE_STORAGE_FORMAT = "hive_storage_format"; - private static final String COMPRESSION_CODEC = "compression_codec"; + static final String COMPRESSION_CODEC = "compression_codec"; private static final String ORC_COMPRESSION_CODEC = "orc_compression_codec"; public static final String RESPECT_TABLE_FORMAT = "respect_table_format"; private static final String CREATE_EMPTY_BUCKET_FILES = "create_empty_bucket_files"; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/ConfigurationUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/ConfigurationUtils.java index cdd38ffdda171..5bacde49d52d8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/util/ConfigurationUtils.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/ConfigurationUtils.java @@ -120,7 +120,7 @@ private static void setCompressionProperties(Configuration config, HiveCompressi config.unset(FileOutputFormat.COMPRESS_CODEC); } // For Parquet - compression.getParquetCompressionCodec().ifPresent(codec -> config.set(ParquetOutputFormat.COMPRESSION, codec.name())); + config.set(ParquetOutputFormat.COMPRESSION, compression.getParquetCompressionCodec().name()); // For SequenceFile config.set(FileOutputFormat.COMPRESS_TYPE, BLOCK.toString()); // For PageFile diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 0018dcf180516..d7e06a1f926db 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -74,6 +74,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.LongStream; +import java.util.stream.Stream; import static com.facebook.airlift.json.JsonCodec.jsonCodec; import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN; @@ -106,6 +107,7 @@ import static com.facebook.presto.hive.HiveQueryRunner.TPCH_SCHEMA; import static com.facebook.presto.hive.HiveQueryRunner.createBucketedSession; import static com.facebook.presto.hive.HiveQueryRunner.createMaterializeExchangesSession; +import static com.facebook.presto.hive.HiveSessionProperties.COMPRESSION_CODEC; import static com.facebook.presto.hive.HiveSessionProperties.FILE_RENAMING_ENABLED; import static com.facebook.presto.hive.HiveSessionProperties.MANIFEST_VERIFICATION_ENABLED; import static com.facebook.presto.hive.HiveSessionProperties.OPTIMIZED_PARTITION_UPDATE_SERIALIZATION_ENABLED; @@ -159,6 +161,7 @@ import static io.airlift.tpch.TpchTable.PART_SUPPLIER; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Locale.ROOT; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; import static org.assertj.core.api.Assertions.assertThat; @@ -5267,17 +5270,6 @@ public void testPageFileFormatSmallSplitSize() assertUpdate("DROP TABLE test_pagefile_small_split"); } - @Test - public void testPageFileCompression() - { - for (HiveCompressionCodec compression : HiveCompressionCodec.values()) { - if (!compression.isSupportedStorageFormat(PAGEFILE)) { - continue; - } - testPageFileCompression(compression.name()); - } - } - @Test public void testPartialAggregatePushdownORC() { @@ -5703,31 +5695,35 @@ public void testParquetSelectivePageSourceFails() assertQueryFails(parquetFilterPushdownSession, "SELECT a FROM test_parquet_filter_pushdoown WHERE b = false", "Parquet reader doesn't support filter pushdown yet"); } - private void testPageFileCompression(String compression) + @DataProvider(name = "testFormatAndCompressionCodecs") + public Object[][] compressionCodecs() { - Session testSession = Session.builder(getQueryRunner().getDefaultSession()) - .setCatalogSessionProperty(catalog, "compression_codec", compression) - .setCatalogSessionProperty(catalog, "pagefile_writer_max_stripe_size", "100B") - .setCatalogSessionProperty(catalog, "max_split_size", "1kB") - .setCatalogSessionProperty(catalog, "max_initial_split_size", "1kB") - .build(); - - assertUpdate( - testSession, - "CREATE TABLE test_pagefile_compression\n" + - "WITH (\n" + - "format = 'PAGEFILE'\n" + - ") AS\n" + - "SELECT\n" + - "*\n" + - "FROM tpch.orders", - "SELECT count(*) FROM orders"); - - assertQuery(testSession, "SELECT count(*) FROM test_pagefile_compression", "SELECT count(*) FROM orders"); - - assertQuery(testSession, "SELECT sum(custkey) FROM test_pagefile_compression", "SELECT sum(custkey) FROM orders"); + return Stream.of(PARQUET, ORC, PAGEFILE) + .flatMap(format -> Arrays.stream(HiveCompressionCodec.values()) + .map(codec -> new Object[] {codec, format})) + .toArray(Object[][]::new); + } - assertUpdate("DROP TABLE test_pagefile_compression"); + @Test(dataProvider = "testFormatAndCompressionCodecs") + public void testFormatAndCompressionCodecs(HiveCompressionCodec codec, HiveStorageFormat format) + { + String tableName = "test_" + format.name().toLowerCase(ROOT) + "_compression_codec_" + codec.name().toLowerCase(ROOT); + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("hive", COMPRESSION_CODEC, codec.name()).build(); + if (codec.isSupportedStorageFormat(format == PARQUET ? HiveStorageFormat.PARQUET : HiveStorageFormat.ORC)) { + assertUpdate(session, + format("CREATE TABLE %s WITH (format = '%s') AS SELECT * FROM orders", + tableName, format.name()), + "SELECT count(*) FROM orders"); + assertQuery(format("SELECT count(*) FROM %s", tableName), "SELECT count(*) FROM orders"); + assertQuery(format("SELECT sum(custkey) FROM %s", tableName), "SELECT sum(custkey) FROM orders"); + assertQuerySucceeds(format("DROP TABLE %s", tableName)); + } + else { + assertQueryFails(session, format("CREATE TABLE %s WITH (format = '%s') AS SELECT * FROM orders", + tableName, format.name()), + format("%s compression is not supported with %s", codec, format)); + } } private static Consumer assertTableWriterMergeNodeIsPresent() diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java index eea2d2a876fd3..41a5844cdf0ae 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java @@ -83,7 +83,6 @@ import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS; -import static com.facebook.presto.hive.HiveCompressionCodec.NONE; import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE; import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER; import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION; @@ -246,9 +245,6 @@ public FormatWriter createFileFormatWriter( HiveCompressionCodec compressionCodec) throws IOException { - if (!compressionCodec.isSupportedStorageFormat(PAGEFILE)) { - compressionCodec = NONE; - } return new PrestoPageFormatWriter(targetFile, compressionCodec); } }, @@ -696,7 +692,7 @@ public PrestoParquetFormatWriter(File targetFile, List columnNames, List columnNames, types, ParquetWriterOptions.builder().build(), - compressionCodec.getParquetCompressionCodec().get().getHadoopCompressionCodecClassName()); + compressionCodec.getParquetCompressionCodec().getHadoopCompressionCodecClassName()); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergFileWriterFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergFileWriterFactory.java index b61befce07a26..62c02c881a950 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergFileWriterFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergFileWriterFactory.java @@ -160,7 +160,7 @@ private IcebergFileWriter createParquetWriter( makeTypeMap(fileColumnTypes, fileColumnNames), parquetWriterOptions, IntStream.range(0, fileColumnNames.size()).toArray(), - getCompressionCodec(session).getParquetCompressionCodec().get(), + getCompressionCodec(session).getParquetCompressionCodec(), outputPath, hdfsEnvironment, hdfsContext, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index 7c1cca1d73715..38bf69a01c8a6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -48,7 +48,6 @@ public final class IcebergSessionProperties { - private static final String COMPRESSION_CODEC = "compression_codec"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String PARQUET_WRITER_VERSION = "parquet_writer_version"; @@ -61,6 +60,7 @@ public final class IcebergSessionProperties private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; private static final String NESSIE_REFERENCE_NAME = "nessie_reference_name"; private static final String NESSIE_REFERENCE_HASH = "nessie_reference_hash"; + static final String COMPRESSION_CODEC = "compression_codec"; public static final String PARQUET_DEREFERENCE_PUSHDOWN_ENABLED = "parquet_dereference_pushdown_enabled"; public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled"; public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled"; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index fb56644872728..ea55a189f4018 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -1171,7 +1171,7 @@ public static Map populateTableProperties(IcebergAbstractMetadat if (!compressionCodec.isSupportedStorageFormat(HiveStorageFormat.PARQUET)) { throw new PrestoException(NOT_SUPPORTED, format("Compression codec %s is not supported for Parquet format", compressionCodec)); } - propertiesBuilder.put(PARQUET_COMPRESSION, compressionCodec.getParquetCompressionCodec().get().toString()); + propertiesBuilder.put(PARQUET_COMPRESSION, compressionCodec.getParquetCompressionCodec().name()); break; case ORC: if (!compressionCodec.isSupportedStorageFormat(HiveStorageFormat.ORC)) { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 97ef9bb9ff1f3..39e13724d44e9 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -1111,9 +1111,9 @@ public Object[][] compressionCodecTestData() { return new Object[][] { // codec, format, shouldSucceed, expectedErrorMessage - {"ZSTD", "PARQUET", false, "Compression codec ZSTD is not supported for Parquet format"}, + {"ZSTD", "PARQUET", true, null}, {"LZ4", "PARQUET", false, "Compression codec LZ4 is not supported for Parquet format"}, - {"LZ4", "ORC", false, "Compression codec LZ4 is not supported for ORC format"}, + {"LZ4", "ORC", true, null}, {"ZSTD", "ORC", true, null}, {"SNAPPY", "ORC", true, null}, {"SNAPPY", "PARQUET", true, null}, diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 2ee75839c69e5..0682a476293db 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -29,7 +29,9 @@ import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveCompressionCodec; import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.HiveStorageFormat; import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.authentication.NoHdfsAuthentication; import com.facebook.presto.hive.s3.HiveS3Config; @@ -145,8 +147,11 @@ import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED; import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES; import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES; +import static com.facebook.presto.iceberg.FileFormat.ORC; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; +import static com.facebook.presto.iceberg.IcebergSessionProperties.COMPRESSION_CODEC; import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_ENABLED; import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; @@ -169,6 +174,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.lang.String.format; import static java.nio.file.Files.createTempDirectory; +import static java.util.Locale.ROOT; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static java.util.function.Function.identity; @@ -2857,8 +2863,8 @@ private Session sessionForTimezone(String zoneId, boolean legacyTimestamp) private void testWithAllFileFormats(Session session, BiConsumer test) { - test.accept(session, FileFormat.PARQUET); - test.accept(session, FileFormat.ORC); + test.accept(session, PARQUET); + test.accept(session, ORC); } private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount) @@ -3076,6 +3082,36 @@ public void testStatisticsFileCacheInvalidationProcedure() getQueryRunner().execute("DROP TABLE test_statistics_file_cache_procedure"); } + @DataProvider(name = "testFormatAndCompressionCodecs") + public Object[][] compressionCodecs() + { + return Stream.of(PARQUET, ORC) + .flatMap(format -> Arrays.stream(HiveCompressionCodec.values()) + .map(codec -> new Object[] {codec, format})) + .toArray(Object[][]::new); + } + + @Test(dataProvider = "testFormatAndCompressionCodecs") + public void testFormatAndCompressionCodecs(HiveCompressionCodec codec, FileFormat format) + { + String tableName = "test_" + format.name().toLowerCase(ROOT) + "_compression_codec_" + codec.name().toLowerCase(ROOT); + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", COMPRESSION_CODEC, codec.name()).build(); + if (codec.isSupportedStorageFormat(format == PARQUET ? HiveStorageFormat.PARQUET : HiveStorageFormat.ORC)) { + String codecName = format == PARQUET ? codec.getParquetCompressionCodec().name() : codec.getOrcCompressionKind().name(); + assertQuerySucceeds(session, format("CREATE TABLE %s WITH (\"write.format.default\" = '%s') as select * from lineitem with no data", tableName, format.name())); + assertQuery(session, format("SELECT value FROM \"%s$properties\" WHERE key = 'write.%s.compression-codec'", tableName, format.name().toLowerCase(ROOT)), format("VALUES '%s'", codecName)); + assertQuery(session, format("SELECT value FROM \"%s$properties\" WHERE key = 'write.format.default'", tableName), format("VALUES '%s'", format.name())); + assertUpdate(session, format("INSERT INTO %s SELECT * from lineitem", tableName), "select count(*) from lineitem"); + assertQuery(session, format("SELECT * FROM %s", tableName), "select * from lineitem"); + assertQuerySucceeds(format("DROP TABLE %s", tableName)); + } + else { + assertQueryFails(session, format("CREATE TABLE %s WITH (\"write.format.default\" = '%s') as select * from lineitem with no data", tableName, format.name()), + format("Compression codec %s is not supported for .*", codec)); + } + } + @DataProvider(name = "sortedTableWithSortTransform") public static Object[][] sortedTableWithSortTransform() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java index f8ed4bc75be9e..d426f9df7dd8d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java @@ -390,12 +390,12 @@ public Object[][] compressionCodecMatrix() {HiveStorageFormat.PARQUET, HiveCompressionCodec.SNAPPY, true}, {HiveStorageFormat.PARQUET, HiveCompressionCodec.GZIP, true}, {HiveStorageFormat.PARQUET, HiveCompressionCodec.LZ4, false}, - {HiveStorageFormat.PARQUET, HiveCompressionCodec.ZSTD, false}, + {HiveStorageFormat.PARQUET, HiveCompressionCodec.ZSTD, true}, {HiveStorageFormat.ORC, HiveCompressionCodec.NONE, true}, {HiveStorageFormat.ORC, HiveCompressionCodec.SNAPPY, true}, {HiveStorageFormat.ORC, HiveCompressionCodec.GZIP, true}, {HiveStorageFormat.ORC, HiveCompressionCodec.ZSTD, true}, - {HiveStorageFormat.ORC, HiveCompressionCodec.LZ4, false}, + {HiveStorageFormat.ORC, HiveCompressionCodec.LZ4, true}, }; } @@ -410,11 +410,11 @@ public void testCompressionCodecSupport(HiveStorageFormat format, HiveCompressio @Test public void testParquetCompressionCodecAvailability() { - assertThat(HiveCompressionCodec.NONE.getParquetCompressionCodec().isPresent()).isTrue(); - assertThat(HiveCompressionCodec.SNAPPY.getParquetCompressionCodec().isPresent()).isTrue(); - assertThat(HiveCompressionCodec.GZIP.getParquetCompressionCodec().isPresent()).isTrue(); + assertThat(HiveCompressionCodec.NONE.getParquetCompressionCodec()).isNotNull(); + assertThat(HiveCompressionCodec.SNAPPY.getParquetCompressionCodec()).isNotNull(); + assertThat(HiveCompressionCodec.GZIP.getParquetCompressionCodec()).isNotNull(); - assertThat(HiveCompressionCodec.LZ4.getParquetCompressionCodec().isPresent()).isFalse(); - assertThat(HiveCompressionCodec.ZSTD.getParquetCompressionCodec().isPresent()).isFalse(); + assertThat(HiveCompressionCodec.LZ4.getParquetCompressionCodec()).isNotNull(); + assertThat(HiveCompressionCodec.ZSTD.getParquetCompressionCodec()).isNotNull(); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java index 456fd89bf41e4..9b3ed1b5add55 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriter.java @@ -89,12 +89,12 @@ public class ParquetWriter public static final Slice MAGIC = wrappedBuffer("PAR1".getBytes(US_ASCII)); public ParquetWriter(OutputStream outputStream, - MessageType messageType, - Map, Type> primitiveTypes, - List columnNames, - List types, - ParquetWriterOptions writerOption, - String compressionCodecClass) + MessageType messageType, + Map, Type> primitiveTypes, + List columnNames, + List types, + ParquetWriterOptions writerOption, + String compressionCodecClass) { this.outputStream = new OutputStreamSliceOutput(requireNonNull(outputStream, "outputstream is null")); this.names = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); @@ -335,7 +335,8 @@ else if (compressionCodecClass.equals("org.apache.hadoop.io.compress.BrotliCodec else if (compressionCodecClass.equals("org.apache.hadoop.io.compress.Lz4Codec")) { return LZ4; } - else if (compressionCodecClass.equals("org.apache.hadoop.io.compress.ZStandardCodec")) { + else if (compressionCodecClass.equals("org.apache.hadoop.io.compress.ZStandardCodec") || + compressionCodecClass.equals("org.apache.parquet.hadoop.codec.ZstandardCodec")) { return ZSTD; } throw new IllegalArgumentException("Invalid compressionCodec: " + compressionCodecClass);