From e2837c34aa2d981598e48257e8f85186308803cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Mon, 6 Jun 2022 19:07:21 +0200 Subject: [PATCH 1/3] Use ConnectorSession based on DeltaLakeSessionProperties in Delta tests --- .../DeltaTestingConnectorSession.java | 28 +++++++++++++++++++ .../deltalake/TestDeltaLakePageSink.java | 2 +- .../TestDeltaLakeMetastoreStatistics.java | 8 +----- .../TestCheckpointEntryIterator.java | 2 +- .../checkpoint/TestCheckpointWriter.java | 20 ++++--------- 5 files changed, 36 insertions(+), 24 deletions(-) create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java new file mode 100644 index 000000000000..107467a53971 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.spi.connector.ConnectorSession; +import io.trino.testing.TestingConnectorSession; + +public final class DeltaTestingConnectorSession +{ + public static final ConnectorSession SESSION = TestingConnectorSession.builder() + .setPropertyMetadata(new DeltaLakeSessionProperties(new DeltaLakeConfig(), new ParquetReaderConfig(), new ParquetWriterConfig()).getSessionProperties()) + .build(); + + private DeltaTestingConnectorSession() {} +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index e7365729bf0d..02177d1ec61c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -49,8 +49,8 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.DoubleType.DOUBLE; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java index 858f53fa4d11..ab657035a5e5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java @@ -21,7 +21,6 @@ import io.airlift.json.JsonCodecFactory; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeConfig; -import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; import io.trino.plugin.deltalake.statistics.ExtendedStatistics; @@ -49,9 +48,7 @@ import io.trino.plugin.hive.metastore.file.FileHiveMetastore; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; -import io.trino.plugin.hive.parquet.ParquetWriterConfig; import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.PrincipalType; @@ -62,7 +59,6 @@ import io.trino.spi.type.DoubleType; import io.trino.spi.type.TypeManager; import io.trino.testing.TestingConnectorContext; -import io.trino.testing.TestingConnectorSession; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -74,6 +70,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.PATH_PROPERTY; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.spi.type.BigintType.BIGINT; @@ -91,9 +88,6 @@ public class TestDeltaLakeMetastoreStatistics { private static final ColumnHandle COLUMN_HANDLE = new DeltaLakeColumnHandle("val", DoubleType.DOUBLE, REGULAR); - private static final ConnectorSession SESSION = TestingConnectorSession.builder() - .setPropertyMetadata(new DeltaLakeSessionProperties(new DeltaLakeConfig(), new ParquetReaderConfig(), new ParquetWriterConfig()).getSessionProperties()) - .build(); private DeltaLakeMetastore deltaLakeMetastore; private HiveMetastore hiveMetastore; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 3c8109f189c1..c596fa132226 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -43,13 +43,13 @@ import java.util.Set; import static com.google.common.io.Resources.getResource; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.COMMIT; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static org.assertj.core.api.Assertions.assertThat; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index ee86c8ff449d..c0287c19e84f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -30,19 +30,15 @@ import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; import io.trino.plugin.hive.HdfsEnvironment; -import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HiveHdfsConfiguration; -import io.trino.plugin.hive.HiveSessionProperties; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.block.Block; import io.trino.spi.block.RowBlock; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.BigintType; import io.trino.spi.type.Int128; import io.trino.spi.type.IntegerType; import io.trino.spi.type.TypeManager; -import io.trino.testing.TestingConnectorSession; import io.trino.util.DateTimeUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -61,12 +57,12 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; -import static io.trino.plugin.hive.HiveTestUtils.getHiveSessionProperties; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; @@ -79,7 +75,6 @@ public class TestCheckpointWriter { private final TypeManager typeManager = TESTING_TYPE_MANAGER; - private ConnectorSession session; private CheckpointSchemaManager checkpointSchemaManager; private HdfsEnvironment hdfsEnvironment; @@ -90,11 +85,6 @@ public void setUp() HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), Set.of()); hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); - - HiveSessionProperties hiveSessionProperties = getHiveSessionProperties(new HiveConfig()); - session = TestingConnectorSession.builder() - .setPropertyMetadata(hiveSessionProperties.getSessionProperties()) - .build(); } @Test @@ -277,7 +267,7 @@ public void testCheckpointWriteReadRoundtrip() Path targetPath = new Path("file://" + targetFile.getAbsolutePath()); targetFile.delete(); // file must not exist when writer is called - writer.write(session, entries, targetPath); + writer.write(SESSION, entries, targetPath); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, true); assertEquals(readEntries.getTransactionEntries(), entries.getTransactionEntries()); @@ -351,7 +341,7 @@ public void testDisablingRowStatistics() Path targetPath = new Path("file://" + targetFile.getAbsolutePath()); targetFile.delete(); // file must not exist when writer is called - writer.write(session, entries, targetPath); + writer.write(SESSION, entries, targetPath); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, false); AddFileEntry addFileEntry = getOnlyElement(readEntries.getAddFileEntries()); @@ -416,12 +406,12 @@ private Optional> makeComparableStatistics(Optional checkpointEntryIterator = new CheckpointEntryIterator( checkpointPath, - session, + SESSION, fileStatus.getLen(), checkpointSchemaManager, typeManager, From 96f8723dd583d37b7caada2490d31cd4effd6a8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 31 May 2022 13:42:21 +0200 Subject: [PATCH 2/3] Extra abstraction layer on top of Hive compression codec config This will breaks the 1-1 mapping between compression codec used when writing data files via Hive connector, and value passed as hive.compression-codec configuration property or compression_codec session property. Now when choosing final compression codec to be used file format is also taken into account. Actual codec selection logic is not changed as part of this commit. New mechanics will be exploited in a following ones. --- .../plugin/hive/HiveCompressionCodecs.java | 69 +++++++++++++++++++ .../plugin/hive/HiveCompressionOption.java | 24 +++++++ .../java/io/trino/plugin/hive/HiveConfig.java | 6 +- .../io/trino/plugin/hive/HiveMetadata.java | 9 ++- .../plugin/hive/HiveSessionProperties.java | 6 +- .../trino/plugin/hive/HiveWriterFactory.java | 20 ++++-- .../hive/util/CompressionConfigUtil.java | 11 +++ .../io/trino/plugin/hive/TestHiveConfig.java | 4 +- .../trino/plugin/hive/TestHivePageSink.java | 4 +- .../hive/util/TestCompressionConfigUtil.java | 46 +++++++++++++ 10 files changed, 178 insertions(+), 21 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java new file mode 100644 index 000000000000..56085db1083c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.plugin.hive.metastore.StorageFormat; +import io.trino.spi.connector.ConnectorSession; + +public final class HiveCompressionCodecs +{ + private HiveCompressionCodecs() {} + + public static HiveCompressionCodec selectCompressionCodec(ConnectorSession session, StorageFormat storageFormat) + { + HiveCompressionOption compressionOption = HiveSessionProperties.getCompressionCodec(session); + return HiveStorageFormat.getHiveStorageFormat(storageFormat) + .map(format -> selectCompressionCodec(compressionOption, format)) + .orElse(selectCompressionCodecForUnknownStorageFormat(compressionOption)); + } + + public static HiveCompressionCodec selectCompressionCodec(ConnectorSession session, HiveStorageFormat storageFormat) + { + return selectCompressionCodec(HiveSessionProperties.getCompressionCodec(session), storageFormat); + } + + public static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption, HiveStorageFormat storageFormat) + { + switch (compressionOption) { + case NONE: + return HiveCompressionCodec.NONE; + case SNAPPY: + return HiveCompressionCodec.SNAPPY; + case LZ4: + return HiveCompressionCodec.LZ4; + case ZSTD: + return HiveCompressionCodec.ZSTD; + case GZIP: + return HiveCompressionCodec.GZIP; + } + throw new IllegalArgumentException("Unknown compressionOption " + compressionOption); + } + + private static HiveCompressionCodec selectCompressionCodecForUnknownStorageFormat(HiveCompressionOption compressionOption) + { + switch (compressionOption) { + case NONE: + return HiveCompressionCodec.NONE; + case SNAPPY: + return HiveCompressionCodec.SNAPPY; + case LZ4: + return HiveCompressionCodec.LZ4; + case ZSTD: + return HiveCompressionCodec.ZSTD; + case GZIP: + return HiveCompressionCodec.GZIP; + } + throw new IllegalArgumentException("Unknown compressionOption " + compressionOption); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java new file mode 100644 index 000000000000..f315af5016e0 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +public enum HiveCompressionOption +{ + NONE, + SNAPPY, + LZ4, + ZSTD, + GZIP, + /**/; +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 28aac0e3e8ba..1b3798d91fd7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -91,7 +91,7 @@ public class HiveConfig private long perTransactionMetastoreCacheMaximumSize = 1000; private HiveStorageFormat hiveStorageFormat = HiveStorageFormat.ORC; - private HiveCompressionCodec hiveCompressionCodec = HiveCompressionCodec.GZIP; + private HiveCompressionOption hiveCompressionCodec = HiveCompressionOption.GZIP; private boolean respectTableFormat = true; private boolean immutablePartitions; private Optional insertExistingPartitionsBehavior = Optional.empty(); @@ -475,13 +475,13 @@ public HiveConfig setHiveStorageFormat(HiveStorageFormat hiveStorageFormat) return this; } - public HiveCompressionCodec getHiveCompressionCodec() + public HiveCompressionOption getHiveCompressionCodec() { return hiveCompressionCodec; } @Config("hive.compression-codec") - public HiveConfig setHiveCompressionCodec(HiveCompressionCodec hiveCompressionCodec) + public HiveConfig setHiveCompressionCodec(HiveCompressionOption hiveCompressionCodec) { this.hiveCompressionCodec = hiveCompressionCodec; return this; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 02bed7b9622d..a5258fc4b099 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -167,6 +167,7 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveColumnHandle.updateRowIdColumnHandle; +import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; @@ -178,7 +179,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; -import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec; import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName; import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat; import static io.trino.plugin.hive.HiveSessionProperties.getIcebergCatalogName; @@ -1574,7 +1574,7 @@ private List computeFileNamesForMissingBuckets( } HdfsContext hdfsContext = new HdfsContext(session); JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath)); - configureCompression(conf, getCompressionCodec(session)); + configureCompression(conf, selectCompressionCodec(session, storageFormat)); String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat)); Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); Set bucketsWithFiles = fileNames.stream() @@ -1602,9 +1602,6 @@ private List computeFileNamesForMissingBuckets( private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional partition, List fileNames) { - JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); - configureCompression(conf, getCompressionCodec(session)); - Properties schema; StorageFormat format; if (partition.isPresent()) { @@ -1615,6 +1612,8 @@ private void createEmptyFiles(ConnectorSession session, Path path, Table table, schema = getHiveSchema(table); format = table.getStorage().getStorageFormat(); } + JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); + configureCompression(conf, selectCompressionCodec(session, format)); hdfsEnvironment.doAs(session.getIdentity(), () -> { for (String fileName : fileNames) { writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index 8ad731e93025..22052f828b87 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -290,7 +290,7 @@ public HiveSessionProperties( enumProperty( COMPRESSION_CODEC, "Compression codec to use when writing files", - HiveCompressionCodec.class, + HiveCompressionOption.class, hiveConfig.getHiveCompressionCodec(), false), booleanProperty( @@ -635,9 +635,9 @@ public static HiveStorageFormat getHiveStorageFormat(ConnectorSession session) return session.getProperty(HIVE_STORAGE_FORMAT, HiveStorageFormat.class); } - public static HiveCompressionCodec getCompressionCodec(ConnectorSession session) + public static HiveCompressionOption getCompressionCodec(ConnectorSession session) { - return session.getProperty(COMPRESSION_CODEC, HiveCompressionCodec.class); + return session.getProperty(COMPRESSION_CODEC, HiveCompressionOption.class); } public static boolean isRespectTableFormat(ConnectorSession session) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 815e7370139b..561ecbf1259d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -75,6 +75,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.immutableEntry; +import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY; @@ -83,7 +84,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_READ_ONLY; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; -import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec; import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior; import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath; import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision; @@ -91,6 +91,7 @@ import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY; import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema; import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; +import static io.trino.plugin.hive.util.CompressionConfigUtil.assertCompressionConfigured; import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; import static io.trino.plugin.hive.util.ConfigurationUtils.toJobConf; import static io.trino.plugin.hive.util.HiveUtil.getColumnNames; @@ -268,7 +269,6 @@ public HiveWriterFactory( .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString())); Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath); - configureCompression(conf, getCompressionCodec(session)); this.conf = toJobConf(conf); // make sure the FileSystem is created with the correct Configuration object @@ -312,6 +312,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt Properties schema; WriteInfo writeInfo; StorageFormat outputStorageFormat; + JobConf outputConf = new JobConf(conf); if (partition.isEmpty()) { if (table == null) { // Write to: a new partition in a new partitioned table, @@ -380,10 +381,12 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt if (partitionName.isPresent()) { // Write to a new partition outputStorageFormat = fromHiveStorageFormat(partitionStorageFormat); + configureCompression(outputConf, selectCompressionCodec(session, partitionStorageFormat)); } else { // Write to a new/existing unpartitioned table outputStorageFormat = fromHiveStorageFormat(tableStorageFormat); + configureCompression(outputConf, selectCompressionCodec(session, tableStorageFormat)); } } else { @@ -417,6 +420,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt HiveWriteUtils.checkPartitionIsWritable(partitionName.get(), partition.get()); outputStorageFormat = partition.get().getStorage().getStorageFormat(); + configureCompression(outputConf, selectCompressionCodec(session, outputStorageFormat)); schema = getHiveSchema(partition.get(), table); writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get()); @@ -430,6 +434,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt updateMode = UpdateMode.OVERWRITE; outputStorageFormat = fromHiveStorageFormat(partitionStorageFormat); + configureCompression(outputConf, selectCompressionCodec(session, partitionStorageFormat)); schema = getHiveSchema(table); writeInfo = locationService.getPartitionWriteInfo(locationHandle, Optional.empty(), partitionName.get()); @@ -441,6 +446,9 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt } } + // verify compression was properly set by each of code paths above + assertCompressionConfigured(outputConf); + additionalTableParameters.forEach(schema::setProperty); validateSchema(partitionName, schema); @@ -457,7 +465,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt } else { String fileName = computeFileName(bucketNumber); - fileNameWithExtension = fileName + getFileExtension(conf, outputStorageFormat); + fileNameWithExtension = fileName + getFileExtension(outputConf, outputStorageFormat); path = new Path(writeInfo.getWritePath(), fileNameWithExtension); } @@ -472,7 +480,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt .collect(toList()), outputStorageFormat, schema, - conf, + outputConf, session, bucketNumber, transaction, @@ -494,7 +502,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt outputStorageFormat, schema, partitionStorageFormat.getEstimatedWriterMemoryUsage(), - conf, + outputConf, typeManager, parquetTimeZone, session); @@ -542,7 +550,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName()); } try { - Configuration configuration = new Configuration(conf); + Configuration configuration = new Configuration(outputConf); // Explicitly set the default FS to local file system to avoid getting HDFS when sortedWritingTempStagingPath specifies no scheme configuration.set(FS_DEFAULT_NAME_KEY, "file:///"); fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), tempFilePath, configuration); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java index 6c9b5d46afa0..1de0e513526d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java @@ -19,11 +19,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.parquet.hadoop.ParquetOutputFormat; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; import static org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK; public final class CompressionConfigUtil { + private static final String COMPRESSION_CONFIGURED_MARKER = "trino.compression.configured"; + private CompressionConfigUtil() {} public static void configureCompression(Configuration config, HiveCompressionCodec compressionCodec) @@ -51,5 +54,13 @@ public static void configureCompression(Configuration config, HiveCompressionCod // For SequenceFile config.set(FileOutputFormat.COMPRESS_TYPE, BLOCK.toString()); + + config.set(COMPRESSION_CONFIGURED_MARKER, "true"); + } + + public static void assertCompressionConfigured(Configuration config) + { + String markerValue = config.get(COMPRESSION_CONFIGURED_MARKER); + checkArgument("true".equals(markerValue), "Compression should have been configured"); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index f7187f3c622c..4f79c3c1545d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -59,7 +59,7 @@ public void testDefaults() .setRecursiveDirWalkerEnabled(false) .setIgnoreAbsentPartitions(false) .setHiveStorageFormat(HiveStorageFormat.ORC) - .setHiveCompressionCodec(HiveCompressionCodec.GZIP) + .setHiveCompressionCodec(HiveCompressionOption.GZIP) .setRespectTableFormat(true) .setImmutablePartitions(false) .setInsertExistingPartitionsBehavior(APPEND) @@ -224,7 +224,7 @@ public void testExplicitPropertyMappings() .setRecursiveDirWalkerEnabled(true) .setIgnoreAbsentPartitions(true) .setHiveStorageFormat(HiveStorageFormat.SEQUENCEFILE) - .setHiveCompressionCodec(HiveCompressionCodec.NONE) + .setHiveCompressionCodec(HiveCompressionOption.NONE) .setRespectTableFormat(false) .setImmutablePartitions(true) .setInsertExistingPartitionsBehavior(OVERWRITE) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index 9134bc3f4378..0462fb7ef667 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -63,7 +63,7 @@ import static io.airlift.testing.Assertions.assertGreaterThan; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; -import static io.trino.plugin.hive.HiveCompressionCodec.NONE; +import static io.trino.plugin.hive.HiveCompressionOption.NONE; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; @@ -117,7 +117,7 @@ public void testAllFormats() long uncompressedLength = writeTestFile(config, metastore, makeFileName(tempDir, config)); assertGreaterThan(uncompressedLength, 0L); - for (HiveCompressionCodec codec : HiveCompressionCodec.values()) { + for (HiveCompressionOption codec : HiveCompressionOption.values()) { if (codec == NONE) { continue; } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java new file mode 100644 index 000000000000..914c4e8b9dd8 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.util; + +import io.trino.plugin.hive.HiveCompressionCodec; +import org.apache.hadoop.conf.Configuration; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Arrays; + +import static io.trino.plugin.hive.util.CompressionConfigUtil.assertCompressionConfigured; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestCompressionConfigUtil +{ + @Test(dataProvider = "compressionCodes") + public void testAssertCompressionConfigured(HiveCompressionCodec compressionCodec) + { + Configuration config = new Configuration(false); + assertThatThrownBy(() -> assertCompressionConfigured(config)) + .hasMessage("Compression should have been configured"); + + CompressionConfigUtil.configureCompression(config, compressionCodec); + assertCompressionConfigured(config); // ok now + } + + @DataProvider + public Object[][] compressionCodes() + { + return Arrays.stream(HiveCompressionCodec.values()) + .map(codec -> new Object[] {codec}) + .toArray(Object[][]::new); + } +} From a5d1de0e0c6316ce386c6e9cec3358b09b1a0045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 1 Jun 2022 20:24:43 +0200 Subject: [PATCH 3/3] Propagate selected compression codec to Avro file writer --- .../plugin/hive/HiveCompressionCodec.java | 27 ++++++++++++++----- .../plugin/hive/HiveCompressionCodecs.java | 13 +++++++++ .../hive/util/CompressionConfigUtil.java | 4 +++ .../plugin/hive/BaseHiveConnectorTest.java | 14 ++++++++++ .../trino/plugin/hive/TestHivePageSink.java | 17 ++++++++++++ 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java index 3fc4c626dfa3..7a9aab74fd9a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import io.trino.orc.metadata.CompressionKind; +import org.apache.avro.file.DataFileConstants; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; @@ -27,21 +28,30 @@ public enum HiveCompressionCodec { - NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED), - SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY), - LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4), - ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD), - GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP); + NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED, DataFileConstants.NULL_CODEC), + SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY, DataFileConstants.SNAPPY_CODEC), + LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4, null), + ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD, DataFileConstants.ZSTANDARD_CODEC), + // Using DEFLATE for GZIP for Avro for now so Avro files can be written in default configuration + // TODO(https://github.com/trinodb/trino/issues/12580) change GZIP to be unsupported for Avro when we change Trino default compression to be storage format aware + GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP, DataFileConstants.DEFLATE_CODEC); private final Optional> codec; private final CompressionKind orcCompressionKind; private final CompressionCodecName parquetCompressionCodec; - HiveCompressionCodec(Class codec, CompressionKind orcCompressionKind, CompressionCodecName parquetCompressionCodec) + private final Optional avroCompressionCodec; + + HiveCompressionCodec( + Class codec, + CompressionKind orcCompressionKind, + CompressionCodecName parquetCompressionCodec, + String avroCompressionCodec) { this.codec = Optional.ofNullable(codec); this.orcCompressionKind = requireNonNull(orcCompressionKind, "orcCompressionKind is null"); this.parquetCompressionCodec = requireNonNull(parquetCompressionCodec, "parquetCompressionCodec is null"); + this.avroCompressionCodec = Optional.ofNullable(avroCompressionCodec); } public Optional> getCodec() @@ -58,4 +68,9 @@ public CompressionCodecName getParquetCompressionCodec() { return parquetCompressionCodec; } + + public Optional getAvroCompressionCodec() + { + return avroCompressionCodec; + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java index 56085db1083c..8092db99bf73 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import io.trino.plugin.hive.metastore.StorageFormat; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; public final class HiveCompressionCodecs @@ -34,6 +35,18 @@ public static HiveCompressionCodec selectCompressionCodec(ConnectorSession sessi } public static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption, HiveStorageFormat storageFormat) + { + HiveCompressionCodec selectedCodec = selectCompressionCodec(compressionOption); + + // perform codec vs format validation + if (storageFormat == HiveStorageFormat.AVRO && selectedCodec.getAvroCompressionCodec().isEmpty()) { + throw new TrinoException(HiveErrorCode.HIVE_UNSUPPORTED_FORMAT, "Compression codec " + selectedCodec + " not supported for " + storageFormat); + } + + return selectedCodec; + } + + private static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption) { switch (compressionOption) { case NONE: diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java index 1de0e513526d..b7ced2c11f5f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java @@ -15,6 +15,7 @@ import io.trino.hive.orc.OrcConf; import io.trino.plugin.hive.HiveCompressionCodec; +import org.apache.avro.mapred.AvroJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -52,6 +53,9 @@ public static void configureCompression(Configuration config, HiveCompressionCod // For Parquet config.set(ParquetOutputFormat.COMPRESSION, compressionCodec.getParquetCompressionCodec().name()); + // For Avro + compressionCodec.getAvroCompressionCodec().ifPresent(codec -> config.set(AvroJob.OUTPUT_CODEC, codec)); + // For SequenceFile config.set(FileOutputFormat.COMPRESS_TYPE, BLOCK.toString()); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 7f6ffa9640bf..dfb8e8146655 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -7763,10 +7763,24 @@ public void testCreateTableWithCompressionCodec(HiveCompressionCodec compression .hasMessage("Unsupported codec: LZ4"); return; } + + if (!isSupportedCodec(hiveStorageFormat, compressionCodec)) { + assertThatThrownBy(() -> testCreateTableWithCompressionCodec(session, hiveStorageFormat, compressionCodec)) + .hasMessage("Compression codec " + compressionCodec + " not supported for " + hiveStorageFormat); + return; + } testCreateTableWithCompressionCodec(session, hiveStorageFormat, compressionCodec); }); } + private boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionCodec codec) + { + if (storageFormat == HiveStorageFormat.AVRO && codec == HiveCompressionCodec.LZ4) { + return false; + } + return true; + } + @DataProvider public Object[][] testCreateTableWithCompressionCodecDataProvider() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index 0462fb7ef667..97c960b328bc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -63,6 +63,7 @@ import static io.airlift.testing.Assertions.assertGreaterThan; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; +import static io.trino.plugin.hive.HiveCompressionOption.LZ4; import static io.trino.plugin.hive.HiveCompressionOption.NONE; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; @@ -91,6 +92,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertTrue; public class TestHivePageSink @@ -122,6 +124,13 @@ public void testAllFormats() continue; } config.setHiveCompressionCodec(codec); + + if (!isSupportedCodec(format, codec)) { + assertThatThrownBy(() -> writeTestFile(config, metastore, makeFileName(tempDir, config))) + .hasMessage("Compression codec " + codec + " not supported for " + format); + continue; + } + long length = writeTestFile(config, metastore, makeFileName(tempDir, config)); assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength)); } @@ -132,6 +141,14 @@ public void testAllFormats() } } + private boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionOption compressionOption) + { + if (storageFormat == HiveStorageFormat.AVRO && compressionOption == LZ4) { + return false; + } + return true; + } + private static String makeFileName(File tempDir, HiveConfig config) { return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getHiveCompressionCodec().name();