diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java new file mode 100644 index 000000000000..107467a53971 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaTestingConnectorSession.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.spi.connector.ConnectorSession; +import io.trino.testing.TestingConnectorSession; + +public final class DeltaTestingConnectorSession +{ + public static final ConnectorSession SESSION = TestingConnectorSession.builder() + .setPropertyMetadata(new DeltaLakeSessionProperties(new DeltaLakeConfig(), new ParquetReaderConfig(), new ParquetWriterConfig()).getSessionProperties()) + .build(); + + private DeltaTestingConnectorSession() {} +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index e7365729bf0d..02177d1ec61c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -49,8 +49,8 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.DoubleType.DOUBLE; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java index 858f53fa4d11..ab657035a5e5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java @@ -21,7 +21,6 @@ import io.airlift.json.JsonCodecFactory; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeConfig; -import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; import io.trino.plugin.deltalake.statistics.ExtendedStatistics; @@ -49,9 +48,7 @@ import io.trino.plugin.hive.metastore.file.FileHiveMetastore; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; -import io.trino.plugin.hive.parquet.ParquetWriterConfig; import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.PrincipalType; @@ -62,7 +59,6 @@ import io.trino.spi.type.DoubleType; import io.trino.spi.type.TypeManager; import io.trino.testing.TestingConnectorContext; -import io.trino.testing.TestingConnectorSession; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -74,6 +70,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.PATH_PROPERTY; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY; import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.spi.type.BigintType.BIGINT; @@ -91,9 +88,6 @@ public class TestDeltaLakeMetastoreStatistics { private static final ColumnHandle COLUMN_HANDLE = new DeltaLakeColumnHandle("val", DoubleType.DOUBLE, REGULAR); - private static final ConnectorSession SESSION = TestingConnectorSession.builder() - .setPropertyMetadata(new DeltaLakeSessionProperties(new DeltaLakeConfig(), new ParquetReaderConfig(), new ParquetWriterConfig()).getSessionProperties()) - .build(); private DeltaLakeMetastore deltaLakeMetastore; private HiveMetastore hiveMetastore; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 3c8109f189c1..c596fa132226 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -43,13 +43,13 @@ import java.util.Set; import static com.google.common.io.Resources.getResource; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.COMMIT; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static org.assertj.core.api.Assertions.assertThat; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index ee86c8ff449d..c0287c19e84f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -30,19 +30,15 @@ import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; import io.trino.plugin.hive.HdfsEnvironment; -import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HiveHdfsConfiguration; -import io.trino.plugin.hive.HiveSessionProperties; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.block.Block; import io.trino.spi.block.RowBlock; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.BigintType; import io.trino.spi.type.Int128; import io.trino.spi.type.IntegerType; import io.trino.spi.type.TypeManager; -import io.trino.testing.TestingConnectorSession; import io.trino.util.DateTimeUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -61,12 +57,12 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; -import static io.trino.plugin.hive.HiveTestUtils.getHiveSessionProperties; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; @@ -79,7 +75,6 @@ public class TestCheckpointWriter { private final TypeManager typeManager = TESTING_TYPE_MANAGER; - private ConnectorSession session; private CheckpointSchemaManager checkpointSchemaManager; private HdfsEnvironment hdfsEnvironment; @@ -90,11 +85,6 @@ public void setUp() HdfsConfig hdfsConfig = new HdfsConfig(); HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), Set.of()); hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); - - HiveSessionProperties hiveSessionProperties = getHiveSessionProperties(new HiveConfig()); - session = TestingConnectorSession.builder() - .setPropertyMetadata(hiveSessionProperties.getSessionProperties()) - .build(); } @Test @@ -277,7 +267,7 @@ public void testCheckpointWriteReadRoundtrip() Path targetPath = new Path("file://" + targetFile.getAbsolutePath()); targetFile.delete(); // file must not exist when writer is called - writer.write(session, entries, targetPath); + writer.write(SESSION, entries, targetPath); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, true); assertEquals(readEntries.getTransactionEntries(), entries.getTransactionEntries()); @@ -351,7 +341,7 @@ public void testDisablingRowStatistics() Path targetPath = new Path("file://" + targetFile.getAbsolutePath()); targetFile.delete(); // file must not exist when writer is called - writer.write(session, entries, targetPath); + writer.write(SESSION, entries, targetPath); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, false); AddFileEntry addFileEntry = getOnlyElement(readEntries.getAddFileEntries()); @@ -416,12 +406,12 @@ private Optional> makeComparableStatistics(Optional checkpointEntryIterator = new CheckpointEntryIterator( checkpointPath, - session, + SESSION, fileStatus.getLen(), checkpointSchemaManager, typeManager, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java index 3fc4c626dfa3..7a9aab74fd9a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodec.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import io.trino.orc.metadata.CompressionKind; +import org.apache.avro.file.DataFileConstants; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; @@ -27,21 +28,30 @@ public enum HiveCompressionCodec { - NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED), - SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY), - LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4), - ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD), - GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP); + NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED, DataFileConstants.NULL_CODEC), + SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY, DataFileConstants.SNAPPY_CODEC), + LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4, null), + ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD, DataFileConstants.ZSTANDARD_CODEC), + // Using DEFLATE for GZIP for Avro for now so Avro files can be written in default configuration + // TODO(https://github.com/trinodb/trino/issues/12580) change GZIP to be unsupported for Avro when we change Trino default compression to be storage format aware + GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP, DataFileConstants.DEFLATE_CODEC); private final Optional> codec; private final CompressionKind orcCompressionKind; private final CompressionCodecName parquetCompressionCodec; - HiveCompressionCodec(Class codec, CompressionKind orcCompressionKind, CompressionCodecName parquetCompressionCodec) + private final Optional avroCompressionCodec; + + HiveCompressionCodec( + Class codec, + CompressionKind orcCompressionKind, + CompressionCodecName parquetCompressionCodec, + String avroCompressionCodec) { this.codec = Optional.ofNullable(codec); this.orcCompressionKind = requireNonNull(orcCompressionKind, "orcCompressionKind is null"); this.parquetCompressionCodec = requireNonNull(parquetCompressionCodec, "parquetCompressionCodec is null"); + this.avroCompressionCodec = Optional.ofNullable(avroCompressionCodec); } public Optional> getCodec() @@ -58,4 +68,9 @@ public CompressionCodecName getParquetCompressionCodec() { return parquetCompressionCodec; } + + public Optional getAvroCompressionCodec() + { + return avroCompressionCodec; + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java new file mode 100644 index 000000000000..8092db99bf73 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionCodecs.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.plugin.hive.metastore.StorageFormat; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; + +public final class HiveCompressionCodecs +{ + private HiveCompressionCodecs() {} + + public static HiveCompressionCodec selectCompressionCodec(ConnectorSession session, StorageFormat storageFormat) + { + HiveCompressionOption compressionOption = HiveSessionProperties.getCompressionCodec(session); + return HiveStorageFormat.getHiveStorageFormat(storageFormat) + .map(format -> selectCompressionCodec(compressionOption, format)) + .orElse(selectCompressionCodecForUnknownStorageFormat(compressionOption)); + } + + public static HiveCompressionCodec selectCompressionCodec(ConnectorSession session, HiveStorageFormat storageFormat) + { + return selectCompressionCodec(HiveSessionProperties.getCompressionCodec(session), storageFormat); + } + + public static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption, HiveStorageFormat storageFormat) + { + HiveCompressionCodec selectedCodec = selectCompressionCodec(compressionOption); + + // perform codec vs format validation + if (storageFormat == HiveStorageFormat.AVRO && selectedCodec.getAvroCompressionCodec().isEmpty()) { + throw new TrinoException(HiveErrorCode.HIVE_UNSUPPORTED_FORMAT, "Compression codec " + selectedCodec + " not supported for " + storageFormat); + } + + return selectedCodec; + } + + private static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption compressionOption) + { + switch (compressionOption) { + case NONE: + return HiveCompressionCodec.NONE; + case SNAPPY: + return HiveCompressionCodec.SNAPPY; + case LZ4: + return HiveCompressionCodec.LZ4; + case ZSTD: + return HiveCompressionCodec.ZSTD; + case GZIP: + return HiveCompressionCodec.GZIP; + } + throw new IllegalArgumentException("Unknown compressionOption " + compressionOption); + } + + private static HiveCompressionCodec selectCompressionCodecForUnknownStorageFormat(HiveCompressionOption compressionOption) + { + switch (compressionOption) { + case NONE: + return HiveCompressionCodec.NONE; + case SNAPPY: + return HiveCompressionCodec.SNAPPY; + case LZ4: + return HiveCompressionCodec.LZ4; + case ZSTD: + return HiveCompressionCodec.ZSTD; + case GZIP: + return HiveCompressionCodec.GZIP; + } + throw new IllegalArgumentException("Unknown compressionOption " + compressionOption); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java new file mode 100644 index 000000000000..f315af5016e0 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveCompressionOption.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +public enum HiveCompressionOption +{ + NONE, + SNAPPY, + LZ4, + ZSTD, + GZIP, + /**/; +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 28aac0e3e8ba..1b3798d91fd7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -91,7 +91,7 @@ public class HiveConfig private long perTransactionMetastoreCacheMaximumSize = 1000; private HiveStorageFormat hiveStorageFormat = HiveStorageFormat.ORC; - private HiveCompressionCodec hiveCompressionCodec = HiveCompressionCodec.GZIP; + private HiveCompressionOption hiveCompressionCodec = HiveCompressionOption.GZIP; private boolean respectTableFormat = true; private boolean immutablePartitions; private Optional insertExistingPartitionsBehavior = Optional.empty(); @@ -475,13 +475,13 @@ public HiveConfig setHiveStorageFormat(HiveStorageFormat hiveStorageFormat) return this; } - public HiveCompressionCodec getHiveCompressionCodec() + public HiveCompressionOption getHiveCompressionCodec() { return hiveCompressionCodec; } @Config("hive.compression-codec") - public HiveConfig setHiveCompressionCodec(HiveCompressionCodec hiveCompressionCodec) + public HiveConfig setHiveCompressionCodec(HiveCompressionOption hiveCompressionCodec) { this.hiveCompressionCodec = hiveCompressionCodec; return this; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 02bed7b9622d..a5258fc4b099 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -167,6 +167,7 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveColumnHandle.updateRowIdColumnHandle; +import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; @@ -178,7 +179,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; -import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec; import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName; import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat; import static io.trino.plugin.hive.HiveSessionProperties.getIcebergCatalogName; @@ -1574,7 +1574,7 @@ private List computeFileNamesForMissingBuckets( } HdfsContext hdfsContext = new HdfsContext(session); JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath)); - configureCompression(conf, getCompressionCodec(session)); + configureCompression(conf, selectCompressionCodec(session, storageFormat)); String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat)); Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); Set bucketsWithFiles = fileNames.stream() @@ -1602,9 +1602,6 @@ private List computeFileNamesForMissingBuckets( private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional partition, List fileNames) { - JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); - configureCompression(conf, getCompressionCodec(session)); - Properties schema; StorageFormat format; if (partition.isPresent()) { @@ -1615,6 +1612,8 @@ private void createEmptyFiles(ConnectorSession session, Path path, Table table, schema = getHiveSchema(table); format = table.getStorage().getStorageFormat(); } + JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); + configureCompression(conf, selectCompressionCodec(session, format)); hdfsEnvironment.doAs(session.getIdentity(), () -> { for (String fileName : fileNames) { writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index 8ad731e93025..22052f828b87 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -290,7 +290,7 @@ public HiveSessionProperties( enumProperty( COMPRESSION_CODEC, "Compression codec to use when writing files", - HiveCompressionCodec.class, + HiveCompressionOption.class, hiveConfig.getHiveCompressionCodec(), false), booleanProperty( @@ -635,9 +635,9 @@ public static HiveStorageFormat getHiveStorageFormat(ConnectorSession session) return session.getProperty(HIVE_STORAGE_FORMAT, HiveStorageFormat.class); } - public static HiveCompressionCodec getCompressionCodec(ConnectorSession session) + public static HiveCompressionOption getCompressionCodec(ConnectorSession session) { - return session.getProperty(COMPRESSION_CODEC, HiveCompressionCodec.class); + return session.getProperty(COMPRESSION_CODEC, HiveCompressionOption.class); } public static boolean isRespectTableFormat(ConnectorSession session) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 815e7370139b..561ecbf1259d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -75,6 +75,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.immutableEntry; +import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY; @@ -83,7 +84,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_READ_ONLY; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; -import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec; import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior; import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath; import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision; @@ -91,6 +91,7 @@ import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY; import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema; import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; +import static io.trino.plugin.hive.util.CompressionConfigUtil.assertCompressionConfigured; import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; import static io.trino.plugin.hive.util.ConfigurationUtils.toJobConf; import static io.trino.plugin.hive.util.HiveUtil.getColumnNames; @@ -268,7 +269,6 @@ public HiveWriterFactory( .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString())); Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath); - configureCompression(conf, getCompressionCodec(session)); this.conf = toJobConf(conf); // make sure the FileSystem is created with the correct Configuration object @@ -312,6 +312,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt Properties schema; WriteInfo writeInfo; StorageFormat outputStorageFormat; + JobConf outputConf = new JobConf(conf); if (partition.isEmpty()) { if (table == null) { // Write to: a new partition in a new partitioned table, @@ -380,10 +381,12 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt if (partitionName.isPresent()) { // Write to a new partition outputStorageFormat = fromHiveStorageFormat(partitionStorageFormat); + configureCompression(outputConf, selectCompressionCodec(session, partitionStorageFormat)); } else { // Write to a new/existing unpartitioned table outputStorageFormat = fromHiveStorageFormat(tableStorageFormat); + configureCompression(outputConf, selectCompressionCodec(session, tableStorageFormat)); } } else { @@ -417,6 +420,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt HiveWriteUtils.checkPartitionIsWritable(partitionName.get(), partition.get()); outputStorageFormat = partition.get().getStorage().getStorageFormat(); + configureCompression(outputConf, selectCompressionCodec(session, outputStorageFormat)); schema = getHiveSchema(partition.get(), table); writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get()); @@ -430,6 +434,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt updateMode = UpdateMode.OVERWRITE; outputStorageFormat = fromHiveStorageFormat(partitionStorageFormat); + configureCompression(outputConf, selectCompressionCodec(session, partitionStorageFormat)); schema = getHiveSchema(table); writeInfo = locationService.getPartitionWriteInfo(locationHandle, Optional.empty(), partitionName.get()); @@ -441,6 +446,9 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt } } + // verify compression was properly set by each of code paths above + assertCompressionConfigured(outputConf); + additionalTableParameters.forEach(schema::setProperty); validateSchema(partitionName, schema); @@ -457,7 +465,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt } else { String fileName = computeFileName(bucketNumber); - fileNameWithExtension = fileName + getFileExtension(conf, outputStorageFormat); + fileNameWithExtension = fileName + getFileExtension(outputConf, outputStorageFormat); path = new Path(writeInfo.getWritePath(), fileNameWithExtension); } @@ -472,7 +480,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt .collect(toList()), outputStorageFormat, schema, - conf, + outputConf, session, bucketNumber, transaction, @@ -494,7 +502,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt outputStorageFormat, schema, partitionStorageFormat.getEstimatedWriterMemoryUsage(), - conf, + outputConf, typeManager, parquetTimeZone, session); @@ -542,7 +550,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName()); } try { - Configuration configuration = new Configuration(conf); + Configuration configuration = new Configuration(outputConf); // Explicitly set the default FS to local file system to avoid getting HDFS when sortedWritingTempStagingPath specifies no scheme configuration.set(FS_DEFAULT_NAME_KEY, "file:///"); fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), tempFilePath, configuration); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java index 6c9b5d46afa0..b7ced2c11f5f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CompressionConfigUtil.java @@ -15,15 +15,19 @@ import io.trino.hive.orc.OrcConf; import io.trino.plugin.hive.HiveCompressionCodec; +import org.apache.avro.mapred.AvroJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.parquet.hadoop.ParquetOutputFormat; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; import static org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK; public final class CompressionConfigUtil { + private static final String COMPRESSION_CONFIGURED_MARKER = "trino.compression.configured"; + private CompressionConfigUtil() {} public static void configureCompression(Configuration config, HiveCompressionCodec compressionCodec) @@ -49,7 +53,18 @@ public static void configureCompression(Configuration config, HiveCompressionCod // For Parquet config.set(ParquetOutputFormat.COMPRESSION, compressionCodec.getParquetCompressionCodec().name()); + // For Avro + compressionCodec.getAvroCompressionCodec().ifPresent(codec -> config.set(AvroJob.OUTPUT_CODEC, codec)); + // For SequenceFile config.set(FileOutputFormat.COMPRESS_TYPE, BLOCK.toString()); + + config.set(COMPRESSION_CONFIGURED_MARKER, "true"); + } + + public static void assertCompressionConfigured(Configuration config) + { + String markerValue = config.get(COMPRESSION_CONFIGURED_MARKER); + checkArgument("true".equals(markerValue), "Compression should have been configured"); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 7f6ffa9640bf..dfb8e8146655 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -7763,10 +7763,24 @@ public void testCreateTableWithCompressionCodec(HiveCompressionCodec compression .hasMessage("Unsupported codec: LZ4"); return; } + + if (!isSupportedCodec(hiveStorageFormat, compressionCodec)) { + assertThatThrownBy(() -> testCreateTableWithCompressionCodec(session, hiveStorageFormat, compressionCodec)) + .hasMessage("Compression codec " + compressionCodec + " not supported for " + hiveStorageFormat); + return; + } testCreateTableWithCompressionCodec(session, hiveStorageFormat, compressionCodec); }); } + private boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionCodec codec) + { + if (storageFormat == HiveStorageFormat.AVRO && codec == HiveCompressionCodec.LZ4) { + return false; + } + return true; + } + @DataProvider public Object[][] testCreateTableWithCompressionCodecDataProvider() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index f7187f3c622c..4f79c3c1545d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -59,7 +59,7 @@ public void testDefaults() .setRecursiveDirWalkerEnabled(false) .setIgnoreAbsentPartitions(false) .setHiveStorageFormat(HiveStorageFormat.ORC) - .setHiveCompressionCodec(HiveCompressionCodec.GZIP) + .setHiveCompressionCodec(HiveCompressionOption.GZIP) .setRespectTableFormat(true) .setImmutablePartitions(false) .setInsertExistingPartitionsBehavior(APPEND) @@ -224,7 +224,7 @@ public void testExplicitPropertyMappings() .setRecursiveDirWalkerEnabled(true) .setIgnoreAbsentPartitions(true) .setHiveStorageFormat(HiveStorageFormat.SEQUENCEFILE) - .setHiveCompressionCodec(HiveCompressionCodec.NONE) + .setHiveCompressionCodec(HiveCompressionOption.NONE) .setRespectTableFormat(false) .setImmutablePartitions(true) .setInsertExistingPartitionsBehavior(OVERWRITE) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index 9134bc3f4378..97c960b328bc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -63,7 +63,8 @@ import static io.airlift.testing.Assertions.assertGreaterThan; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; -import static io.trino.plugin.hive.HiveCompressionCodec.NONE; +import static io.trino.plugin.hive.HiveCompressionOption.LZ4; +import static io.trino.plugin.hive.HiveCompressionOption.NONE; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; @@ -91,6 +92,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertTrue; public class TestHivePageSink @@ -117,11 +119,18 @@ public void testAllFormats() long uncompressedLength = writeTestFile(config, metastore, makeFileName(tempDir, config)); assertGreaterThan(uncompressedLength, 0L); - for (HiveCompressionCodec codec : HiveCompressionCodec.values()) { + for (HiveCompressionOption codec : HiveCompressionOption.values()) { if (codec == NONE) { continue; } config.setHiveCompressionCodec(codec); + + if (!isSupportedCodec(format, codec)) { + assertThatThrownBy(() -> writeTestFile(config, metastore, makeFileName(tempDir, config))) + .hasMessage("Compression codec " + codec + " not supported for " + format); + continue; + } + long length = writeTestFile(config, metastore, makeFileName(tempDir, config)); assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength)); } @@ -132,6 +141,14 @@ public void testAllFormats() } } + private boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionOption compressionOption) + { + if (storageFormat == HiveStorageFormat.AVRO && compressionOption == LZ4) { + return false; + } + return true; + } + private static String makeFileName(File tempDir, HiveConfig config) { return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getHiveCompressionCodec().name(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java new file mode 100644 index 000000000000..914c4e8b9dd8 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestCompressionConfigUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.util; + +import io.trino.plugin.hive.HiveCompressionCodec; +import org.apache.hadoop.conf.Configuration; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Arrays; + +import static io.trino.plugin.hive.util.CompressionConfigUtil.assertCompressionConfigured; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestCompressionConfigUtil +{ + @Test(dataProvider = "compressionCodes") + public void testAssertCompressionConfigured(HiveCompressionCodec compressionCodec) + { + Configuration config = new Configuration(false); + assertThatThrownBy(() -> assertCompressionConfigured(config)) + .hasMessage("Compression should have been configured"); + + CompressionConfigUtil.configureCompression(config, compressionCodec); + assertCompressionConfigured(config); // ok now + } + + @DataProvider + public Object[][] compressionCodes() + { + return Arrays.stream(HiveCompressionCodec.values()) + .map(codec -> new Object[] {codec}) + .toArray(Object[][]::new); + } +}