diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java index 453010f8ff23..c6c3fcb098ed 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java @@ -254,7 +254,7 @@ public void testSession() try (Connection connection = createConnection()) { assertThat(listSession(connection)) .contains("join_distribution_type|AUTOMATIC|AUTOMATIC") - .contains("exchange_compression|false|false"); + .contains("exchange_compression_codec|NONE|NONE"); try (Statement statement = connection.createStatement()) { statement.execute("SET SESSION join_distribution_type = 'BROADCAST'"); @@ -262,15 +262,15 @@ public void testSession() assertThat(listSession(connection)) .contains("join_distribution_type|BROADCAST|AUTOMATIC") - .contains("exchange_compression|false|false"); + .contains("exchange_compression_codec|NONE|NONE"); try (Statement statement = connection.createStatement()) { - statement.execute("SET SESSION exchange_compression = true"); + statement.execute("SET SESSION exchange_compression_codec = 'LZ4'"); } assertThat(listSession(connection)) .contains("join_distribution_type|BROADCAST|AUTOMATIC") - .contains("exchange_compression|true|false"); + .contains("exchange_compression_codec|LZ4|NONE"); try (Statement statement = connection.createStatement()) { // setting Hive session properties requires the admin role @@ -286,7 +286,7 @@ public void testSession() assertThat(listSession(connection)) .contains("join_distribution_type|BROADCAST|AUTOMATIC") - .contains("exchange_compression|true|false") + .contains("exchange_compression_codec|LZ4|NONE") .contains(format("spatial_partitioning_table_name|%s|", value)); } catch (Exception e) { diff --git a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java index 84ace3eea33a..34c6d699b470 100644 --- a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java +++ b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java @@ -22,6 +22,7 @@ import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import io.airlift.units.MaxDataSize; +import io.trino.execution.buffer.CompressionCodec; import io.trino.sql.analyzer.RegexLibrary; import jakarta.validation.constraints.DecimalMax; import jakarta.validation.constraints.DecimalMin; @@ -35,6 +36,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static io.airlift.units.DataSize.succinctBytes; +import static io.trino.execution.buffer.CompressionCodec.LZ4; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.sql.analyzer.RegexLibrary.JONI; @DefunctConfig({ @@ -80,7 +83,7 @@ public class FeaturesConfig /** * default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}} */ - private boolean exchangeCompressionEnabled; + private CompressionCodec exchangeCompressionCodec = NONE; private boolean pagesIndexEagerCompactionEnabled; private boolean omitDateTimeTypePrecision; private int maxRecursionDepth = 10; @@ -328,15 +331,24 @@ public FeaturesConfig setSpillMaxUsedSpaceThreshold(double spillMaxUsedSpaceThre return this; } - public boolean isExchangeCompressionEnabled() + @Deprecated + @LegacyConfig(value = "exchange.compression-enabled", replacedBy = "exchange.compression-codec") + public FeaturesConfig setExchangeCompressionEnabled(boolean exchangeCompressionEnabled) { - return exchangeCompressionEnabled; + this.exchangeCompressionCodec = exchangeCompressionEnabled ? LZ4 : NONE; + return this; } - @Config("exchange.compression-enabled") - public FeaturesConfig setExchangeCompressionEnabled(boolean exchangeCompressionEnabled) + public CompressionCodec getExchangeCompressionCodec() + { + return exchangeCompressionCodec; + } + + @Config("exchange.compression-codec") + @ConfigDescription("Compression codec used for data in exchanges") + public FeaturesConfig setExchangeCompressionCodec(CompressionCodec exchangeCompressionCodec) { - this.exchangeCompressionEnabled = exchangeCompressionEnabled; + this.exchangeCompressionCodec = exchangeCompressionCodec; return this; } @@ -500,6 +512,6 @@ public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean public void applyFaultTolerantExecutionDefaults() { - exchangeCompressionEnabled = true; + exchangeCompressionCodec = LZ4; } } diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 44d2805976aa..0e6cbb17d701 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -20,6 +20,7 @@ import io.trino.execution.DynamicFilterConfig; import io.trino.execution.QueryManagerConfig; import io.trino.execution.TaskManagerConfig; +import io.trino.execution.buffer.CompressionCodec; import io.trino.execution.scheduler.NodeSchedulerConfig; import io.trino.memory.MemoryManagerConfig; import io.trino.memory.NodeMemoryConfig; @@ -107,7 +108,7 @@ public final class SystemSessionProperties public static final String OPTIMIZE_DISTINCT_AGGREGATIONS = "optimize_mixed_distinct_aggregations"; public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout"; public static final String ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID = "enable_forced_exchange_below_group_id"; - public static final String EXCHANGE_COMPRESSION = "exchange_compression"; + public static final String EXCHANGE_COMPRESSION_CODEC = "exchange_compression_codec"; public static final String ENABLE_INTERMEDIATE_AGGREGATIONS = "enable_intermediate_aggregations"; public static final String PUSH_AGGREGATION_THROUGH_OUTER_JOIN = "push_aggregation_through_outer_join"; public static final String PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN = "push_partial_aggregation_through_join"; @@ -505,10 +506,11 @@ public SystemSessionProperties( "Enable a stats-based rule adding exchanges below GroupId", optimizerConfig.isEnableForcedExchangeBelowGroupId(), true), - booleanProperty( - EXCHANGE_COMPRESSION, - "Enable compression in exchanges", - featuresConfig.isExchangeCompressionEnabled(), + enumProperty( + EXCHANGE_COMPRESSION_CODEC, + "Compression codec used for data in exchanges, supports NONE, LZ4, ZSTD", + CompressionCodec.class, + featuresConfig.getExchangeCompressionCodec(), false), booleanProperty( ENABLE_INTERMEDIATE_AGGREGATIONS, @@ -1328,9 +1330,9 @@ public static boolean isEnableForcedExchangeBelowGroupId(Session session) return session.getSystemProperty(ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID, Boolean.class); } - public static boolean isExchangeCompressionEnabled(Session session) + public static CompressionCodec getExchangeCompressionCodec(Session session) { - return session.getSystemProperty(EXCHANGE_COMPRESSION, Boolean.class); + return session.getSystemProperty(EXCHANGE_COMPRESSION_CODEC, CompressionCodec.class); } public static boolean isEnableIntermediateAggregations(Session session) diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/CompressionCodec.java b/core/trino-main/src/main/java/io/trino/execution/buffer/CompressionCodec.java new file mode 100644 index 000000000000..4e8d88e57c83 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/CompressionCodec.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.buffer; + +public enum CompressionCodec +{ + NONE, LZ4, ZSTD +} diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PageDeserializer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PageDeserializer.java index 9fb386d00383..8a2bc1dbb7fc 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PageDeserializer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PageDeserializer.java @@ -61,7 +61,7 @@ public class PageDeserializer public PageDeserializer( BlockEncodingSerde blockEncodingSerde, - boolean compressionEnabled, + Optional decompressor, Optional encryptionKey, int blockSizeInBytes) { @@ -69,7 +69,7 @@ public PageDeserializer( requireNonNull(encryptionKey, "encryptionKey is null"); encryptionKey.ifPresent(secretKey -> checkArgument(is256BitSecretKeySpec(secretKey), "encryptionKey is expected to be an instance of SecretKeySpec containing a 256bit key")); input = new SerializedPageInput( - compressionEnabled ? Optional.of(new Lz4Decompressor()) : Optional.empty(), + requireNonNull(decompressor, "decompressor is null"), encryptionKey, blockSizeInBytes); } @@ -95,13 +95,13 @@ private static class SerializedPageInput private static final int DECOMPRESSOR_RETAINED_SIZE = instanceSize(Lz4Decompressor.class); private static final int ENCRYPTION_KEY_RETAINED_SIZE = toIntExact(instanceSize(SecretKeySpec.class) + sizeOfByteArray(256 / 8)); - private final Optional decompressor; + private final Optional decompressor; private final Optional encryptionKey; private final Optional cipher; private final ReadBuffer[] buffers; - private SerializedPageInput(Optional decompressor, Optional encryptionKey, int blockSizeInBytes) + private SerializedPageInput(Optional decompressor, Optional encryptionKey, int blockSizeInBytes) { this.decompressor = requireNonNull(decompressor, "decompressor is null"); this.encryptionKey = requireNonNull(encryptionKey, "encryptionKey is null"); diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PageSerializer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PageSerializer.java index 31a3a62c35b1..cf77087d28ce 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PageSerializer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PageSerializer.java @@ -65,7 +65,7 @@ public class PageSerializer public PageSerializer( BlockEncodingSerde blockEncodingSerde, - boolean compressionEnabled, + Optional compressor, Optional encryptionKey, int blockSizeInBytes) { @@ -73,7 +73,7 @@ public PageSerializer( requireNonNull(encryptionKey, "encryptionKey is null"); encryptionKey.ifPresent(secretKey -> checkArgument(is256BitSecretKeySpec(secretKey), "encryptionKey is expected to be an instance of SecretKeySpec containing a 256bit key")); output = new SerializedPageOutput( - compressionEnabled ? Optional.of(new Lz4Compressor()) : Optional.empty(), + requireNonNull(compressor, "compressor is null"), encryptionKey, blockSizeInBytes); } @@ -100,7 +100,7 @@ private static class SerializedPageOutput private static final double MINIMUM_COMPRESSION_RATIO = 0.8; - private final Optional compressor; + private final Optional compressor; private final Optional encryptionKey; private final int markers; private final Optional cipher; @@ -109,7 +109,7 @@ private static class SerializedPageOutput private int uncompressedSize; private SerializedPageOutput( - Optional compressor, + Optional compressor, Optional encryptionKey, int blockSizeInBytes) { diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeFactory.java b/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeFactory.java index 103f5394b731..ed897df6cd25 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeFactory.java @@ -13,6 +13,12 @@ */ package io.trino.execution.buffer; +import io.airlift.compress.Compressor; +import io.airlift.compress.Decompressor; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.lz4.Lz4Decompressor; +import io.airlift.compress.zstd.ZstdCompressor; +import io.airlift.compress.zstd.ZstdDecompressor; import io.trino.spi.block.BlockEncodingSerde; import javax.crypto.SecretKey; @@ -26,21 +32,39 @@ public class PagesSerdeFactory private static final int SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES = 64 * 1024; private final BlockEncodingSerde blockEncodingSerde; - private final boolean compressionEnabled; + private final CompressionCodec compressionCodec; - public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, boolean compressionEnabled) + public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec) { this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); - this.compressionEnabled = compressionEnabled; + this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null"); } public PageSerializer createSerializer(Optional encryptionKey) { - return new PageSerializer(blockEncodingSerde, compressionEnabled, encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES); + return new PageSerializer(blockEncodingSerde, createCompressor(compressionCodec), encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES); } public PageDeserializer createDeserializer(Optional encryptionKey) { - return new PageDeserializer(blockEncodingSerde, compressionEnabled, encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES); + return new PageDeserializer(blockEncodingSerde, createDecompressor(compressionCodec), encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES); + } + + public static Optional createCompressor(CompressionCodec compressionCodec) + { + return switch (compressionCodec) { + case NONE -> Optional.empty(); + case LZ4 -> Optional.of(new Lz4Compressor()); + case ZSTD -> Optional.of(new ZstdCompressor()); + }; + } + + public static Optional createDecompressor(CompressionCodec compressionCodec) + { + return switch (compressionCodec) { + case NONE -> Optional.empty(); + case LZ4 -> Optional.of(new Lz4Decompressor()); + case ZSTD -> Optional.of(new ZstdDecompressor()); + }; } } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java index 385cd0dc05c0..34b18c018e51 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java @@ -75,8 +75,8 @@ import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.addTimeout; +import static io.trino.SystemSessionProperties.getExchangeCompressionCodec; import static io.trino.SystemSessionProperties.getRetryPolicy; -import static io.trino.SystemSessionProperties.isExchangeCompressionEnabled; import static io.trino.execution.QueryState.FAILED; import static io.trino.execution.QueryState.FINISHING; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -236,7 +236,7 @@ private Query( this.resultsProcessorExecutor = resultsProcessorExecutor; this.timeoutExecutor = timeoutExecutor; this.supportsParametricDateTime = session.getClientCapabilities().contains(ClientCapabilities.PARAMETRIC_DATETIME.toString()); - deserializer = new PagesSerdeFactory(blockEncodingSerde, isExchangeCompressionEnabled(session)) + deserializer = new PagesSerdeFactory(blockEncodingSerde, getExchangeCompressionCodec(session)) .createDeserializer(session.getExchangeEncryptionKey().map(Ciphers::deserializeAesEncryptionKey)); } diff --git a/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java b/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java index 8a2c49057204..eed4a9426318 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java +++ b/core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java @@ -22,6 +22,7 @@ import io.airlift.log.Logger; import io.trino.FeaturesConfig; import io.trino.cache.NonKeyEvictableLoadingCache; +import io.trino.execution.buffer.CompressionCodec; import io.trino.execution.buffer.PagesSerdeFactory; import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.SpillContext; @@ -92,7 +93,7 @@ public FileSingleStreamSpillerFactory(BlockEncodingSerde blockEncodingSerde, Spi spillerStats, featuresConfig.getSpillerSpillPaths(), featuresConfig.getSpillMaxUsedSpaceThreshold(), - nodeSpillConfig.isSpillCompressionEnabled(), + nodeSpillConfig.getSpillCompressionCodec(), nodeSpillConfig.isSpillEncryptionEnabled()); } @@ -103,10 +104,10 @@ public FileSingleStreamSpillerFactory( SpillerStats spillerStats, List spillPaths, double maxUsedSpaceThreshold, - boolean spillCompressionEnabled, + CompressionCodec compressionCodec, boolean spillEncryptionEnabled) { - this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, spillCompressionEnabled); + this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, compressionCodec); this.executor = requireNonNull(executor, "executor is null"); this.spillerStats = requireNonNull(spillerStats, "spillerStats cannot be null"); requireNonNull(spillPaths, "spillPaths is null"); diff --git a/core/trino-main/src/main/java/io/trino/spiller/NodeSpillConfig.java b/core/trino-main/src/main/java/io/trino/spiller/NodeSpillConfig.java index c46f8c97f23f..5cb34e25ed6a 100644 --- a/core/trino-main/src/main/java/io/trino/spiller/NodeSpillConfig.java +++ b/core/trino-main/src/main/java/io/trino/spiller/NodeSpillConfig.java @@ -14,16 +14,23 @@ package io.trino.spiller; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.DefunctConfig; import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; +import io.trino.execution.buffer.CompressionCodec; import jakarta.validation.constraints.NotNull; +import static io.trino.execution.buffer.CompressionCodec.LZ4; +import static io.trino.execution.buffer.CompressionCodec.NONE; + +@DefunctConfig("experimental.spill-compression-enabled") public class NodeSpillConfig { private DataSize maxSpillPerNode = DataSize.of(100, DataSize.Unit.GIGABYTE); private DataSize queryMaxSpillPerNode = DataSize.of(100, DataSize.Unit.GIGABYTE); - private boolean spillCompressionEnabled; + private CompressionCodec spillCompressionCodec = NONE; private boolean spillEncryptionEnabled; @NotNull @@ -54,16 +61,24 @@ public NodeSpillConfig setQueryMaxSpillPerNode(DataSize queryMaxSpillPerNode) return this; } - public boolean isSpillCompressionEnabled() + @Deprecated + @LegacyConfig(value = "spill-compression-enabled", replacedBy = "spill-compression-codec") + public NodeSpillConfig setSpillCompressionEnabled(boolean spillCompressionEnabled) { - return spillCompressionEnabled; + this.spillCompressionCodec = spillCompressionEnabled ? LZ4 : NONE; + return this; } - @Config("spill-compression-enabled") - @LegacyConfig("experimental.spill-compression-enabled") - public NodeSpillConfig setSpillCompressionEnabled(boolean spillCompressionEnabled) + public CompressionCodec getSpillCompressionCodec() + { + return spillCompressionCodec; + } + + @Config("spill-compression-codec") + @ConfigDescription("Compression codec used for data in spills") + public NodeSpillConfig setSpillCompressionCodec(CompressionCodec spillCompressionCodec) { - this.spillCompressionEnabled = spillCompressionEnabled; + this.spillCompressionCodec = spillCompressionCodec; return this; } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 55be3ab21cc6..446a3eec81d5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -298,6 +298,7 @@ import static com.google.common.collect.Sets.difference; import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatioThreshold; import static io.trino.SystemSessionProperties.getAggregationOperatorUnspillMemoryLimit; +import static io.trino.SystemSessionProperties.getExchangeCompressionCodec; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageSize; import static io.trino.SystemSessionProperties.getPagePartitioningBufferPoolSize; @@ -309,7 +310,6 @@ import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled; import static io.trino.SystemSessionProperties.isEnableCoordinatorDynamicFiltersDistribution; import static io.trino.SystemSessionProperties.isEnableLargeDynamicFilters; -import static io.trino.SystemSessionProperties.isExchangeCompressionEnabled; import static io.trino.SystemSessionProperties.isForceSpillingOperator; import static io.trino.SystemSessionProperties.isSpillEnabled; import static io.trino.cache.CacheUtils.uncheckedCacheGet; @@ -653,7 +653,7 @@ public LocalExecutionPlan plan( plan.getId(), outputTypes, pagePreprocessor, - new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session))), + new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), getExchangeCompressionCodec(session))), physicalOperation), context); @@ -920,7 +920,7 @@ private PhysicalOperation createMergeSource(RemoteSourceNode node, LocalExecutio context.getNextOperatorId(), node.getId(), directExchangeClientSupplier, - new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session)), + new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), getExchangeCompressionCodec(session)), orderingCompiler, types, outputChannels, @@ -940,7 +940,7 @@ private PhysicalOperation createRemoteSource(RemoteSourceNode node, LocalExecuti context.getNextOperatorId(), node.getId(), directExchangeClientSupplier, - new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session)), + new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), getExchangeCompressionCodec(session)), node.getRetryPolicy(), exchangeManagerRegistry); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index aae9ad8661ac..0a6603af679f 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -76,6 +76,7 @@ import static io.trino.execution.TaskState.RUNNING; import static io.trino.execution.TaskTestUtils.TABLE_SCAN_NODE_ID; import static io.trino.execution.TaskTestUtils.createTestSplitMonitor; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.execution.buffer.PagesSerdeUtil.getSerializedPagePositionCount; import static io.trino.execution.buffer.PipelinedOutputBuffers.BufferType.PARTITIONED; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -123,7 +124,7 @@ public void testSimple() TABLE_SCAN_NODE_ID, outputBuffer, Function.identity(), - new PagesSerdeFactory(new TestingBlockEncodingSerde(), false)); + new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE)); LocalExecutionPlan localExecutionPlan = new LocalExecutionPlan( ImmutableList.of(new DriverFactory( 0, diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java index a74249f2d615..00eb240d9e4f 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java @@ -55,6 +55,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.execution.buffer.BenchmarkDataGenerator.createValues; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.execution.buffer.PagesSerdeUtil.readPages; import static io.trino.execution.buffer.PagesSerdeUtil.writePages; import static io.trino.jmh.Benchmarks.benchmark; @@ -206,7 +207,7 @@ public abstract static class TypeBenchmarkData public void setup(Type type, Function valueGenerator) { - PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), false); + PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE); PageSerializer serializer = serdeFactory.createSerializer(Optional.empty()); PageDeserializer deserializer = serdeFactory.createDeserializer(Optional.empty()); PageBuilder pageBuilder = new PageBuilder(ImmutableList.of(type)); @@ -403,7 +404,7 @@ public static class LineitemBenchmarkData @Setup public void setup() { - PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), false); + PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE); PageSerializer serializer = serdeFactory.createSerializer(Optional.empty()); PageDeserializer deserializer = serdeFactory.createDeserializer(Optional.empty()); diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java index ea4fc3b395d0..864202c12837 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java @@ -44,6 +44,8 @@ import java.util.Random; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.execution.buffer.CompressionCodec.LZ4; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.jmh.Benchmarks.benchmark; import static io.trino.operator.PageAssertions.assertPageEquals; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -83,7 +85,7 @@ public void deserialize(BenchmarkData data, Blackhole blackhole) public void testBenchmarkData() { BenchmarkData data = new BenchmarkData(); - data.compressed = true; + data.compressionCodec = LZ4; data.initialize(); Slice[] serializedPages = data.serializedPages; PageDeserializer deserializer = data.deserializer; @@ -100,8 +102,8 @@ public static class BenchmarkData private static final List TYPES = ImmutableList.of(VARCHAR); @Param({"true", "false"}) private boolean encrypted; - @Param({"true", "false"}) - private boolean compressed; + @Param({"LZ4", "NONE"}) + private CompressionCodec compressionCodec = NONE; @Param("1000") private int randomSeed = 1000; @@ -113,7 +115,7 @@ public static class BenchmarkData @Setup public void initialize() { - PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), compressed); + PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), compressionCodec); Optional encryptionKey = encrypted ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty(); serializer = serdeFactory.createSerializer(encryptionKey); deserializer = serdeFactory.createDeserializer(encryptionKey); @@ -206,7 +208,7 @@ public static void main(String[] args) throws RunnerException { BenchmarkData data = new BenchmarkData(); - data.compressed = true; // Get usable stats on compressibility + data.compressionCodec = LZ4; // Get usable stats on compressibility data.initialize(); System.out.println("Page Size Avg: " + Arrays.stream(data.dataPages).mapToLong(Page::getSizeInBytes).average().getAsDouble()); System.out.println("Page Size Min: " + Arrays.stream(data.dataPages).mapToLong(Page::getSizeInBytes).min().getAsLong()); diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java index 27186d5ddd73..d81fcb25cc80 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java @@ -44,6 +44,9 @@ import java.util.stream.IntStream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.execution.buffer.CompressionCodec.NONE; +import static io.trino.execution.buffer.PagesSerdeFactory.createCompressor; +import static io.trino.execution.buffer.PagesSerdeFactory.createDecompressor; import static io.trino.execution.buffer.PagesSerdeUtil.readPages; import static io.trino.execution.buffer.PagesSerdeUtil.writePages; import static io.trino.operator.PageAssertions.assertPageEquals; @@ -133,25 +136,23 @@ private void testRoundTrip(List types, List pages) private void testRoundTrip(List types, List pages, int blockSizeInBytes) { - // without compression, encryption - testRoundTrip(types, pages, false, false, blockSizeInBytes); - // with compression, without encryption - testRoundTrip(types, pages, true, false, blockSizeInBytes); - // without compression, with encryption - testRoundTrip(types, pages, false, true, blockSizeInBytes); - // with compression, encryption - testRoundTrip(types, pages, true, true, blockSizeInBytes); + // without encryption + testRoundTrip(types, pages, false, blockSizeInBytes); + // with encryption + testRoundTrip(types, pages, true, blockSizeInBytes); } - private void testRoundTrip(List types, List pages, boolean compressionEnabled, boolean encryptionEnabled, int blockSizeInBytes) + private void testRoundTrip(List types, List pages, boolean encryptionEnabled, int blockSizeInBytes) { Optional encryptionKey = encryptionEnabled ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty(); - PageSerializer serializer = new PageSerializer(blockEncodingSerde, compressionEnabled, encryptionKey, blockSizeInBytes); - PageDeserializer deserializer = new PageDeserializer(blockEncodingSerde, compressionEnabled, encryptionKey, blockSizeInBytes); - for (Page page : pages) { - Slice serialized = serializer.serialize(page); - Page deserialized = deserializer.deserialize(serialized); - assertPageEquals(types, deserialized, page); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + PageSerializer serializer = new PageSerializer(blockEncodingSerde, createCompressor(compressionCodec), encryptionKey, blockSizeInBytes); + PageDeserializer deserializer = new PageDeserializer(blockEncodingSerde, createDecompressor(compressionCodec), encryptionKey, blockSizeInBytes); + for (Page page : pages) { + Slice serialized = serializer.serialize(page); + Page deserialized = deserializer.deserialize(serialized); + assertPageEquals(types, deserialized, page); + } } } @@ -228,7 +229,7 @@ public void testVarcharSerializedSize() private int serializedSize(List types, Page expectedPage) { - PagesSerdeFactory serdeFactory = new PagesSerdeFactory(blockEncodingSerde, false); + PagesSerdeFactory serdeFactory = new PagesSerdeFactory(blockEncodingSerde, NONE); PageSerializer serializer = serdeFactory.createSerializer(Optional.empty()); PageDeserializer deserializer = serdeFactory.createDeserializer(Optional.empty()); DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1024); @@ -260,28 +261,28 @@ public void testDeserializationWithRollover() private void testDeserializationWithRollover(int blockSize, int numberOfEntries) { - testDeserializationWithRollover(false, false, numberOfEntries, blockSize); - testDeserializationWithRollover(false, true, numberOfEntries, blockSize); - testDeserializationWithRollover(true, false, numberOfEntries, blockSize); - testDeserializationWithRollover(true, true, numberOfEntries, blockSize); + testDeserializationWithRollover(false, numberOfEntries, blockSize); + testDeserializationWithRollover(true, numberOfEntries, blockSize); } - private void testDeserializationWithRollover(boolean encryptionEnabled, boolean compressionEnabled, int numberOfEntries, int blockSize) + private void testDeserializationWithRollover(boolean encryptionEnabled, int numberOfEntries, int blockSize) { RolloverBlockSerde blockSerde = new RolloverBlockSerde(); Optional encryptionKey = encryptionEnabled ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty(); - PageSerializer serializer = new PageSerializer(blockSerde, compressionEnabled, encryptionKey, blockSize); - PageDeserializer deserializer = new PageDeserializer(blockSerde, compressionEnabled, encryptionKey, blockSize); + for (CompressionCodec compressionCodec : CompressionCodec.values()) { + PageSerializer serializer = new PageSerializer(blockSerde, createCompressor(compressionCodec), encryptionKey, blockSize); + PageDeserializer deserializer = new PageDeserializer(blockSerde, createDecompressor(compressionCodec), encryptionKey, blockSize); - Page page = createTestPage(numberOfEntries); - Slice serialized = serializer.serialize(page); - Page deserialized = deserializer.deserialize(serialized); - assertThat(deserialized.getChannelCount()).isEqualTo(1); + Page page = createTestPage(numberOfEntries); + Slice serialized = serializer.serialize(page); + Page deserialized = deserializer.deserialize(serialized); + assertThat(deserialized.getChannelCount()).isEqualTo(1); - VariableWidthBlock expected = (VariableWidthBlock) page.getBlock(0); - VariableWidthBlock actual = (VariableWidthBlock) deserialized.getBlock(0); + VariableWidthBlock expected = (VariableWidthBlock) page.getBlock(0); + VariableWidthBlock actual = (VariableWidthBlock) deserialized.getBlock(0); - assertThat(actual.getRawSlice().getBytes()).isEqualTo(expected.getRawSlice().getBytes()); + assertThat(actual.getRawSlice().getBytes()).isEqualTo(expected.getRawSlice().getBytes()); + } } private static Page createTestPage(int numberOfEntries) diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java index 98d591f54def..f047c2dc17fb 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java @@ -42,6 +42,7 @@ import static io.trino.execution.buffer.BufferState.FINISHED; import static io.trino.execution.buffer.BufferState.FLUSHING; import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; @@ -310,7 +311,7 @@ private static Slice createPage(String value) VariableWidthBlockBuilder blockBuilder = (VariableWidthBlockBuilder) pageBuilder.getBlockBuilder(0); blockBuilder.writeEntry(valueSlice); Page page = pageBuilder.build(); - PageSerializer serializer = new PagesSerdeFactory(new TestingBlockEncodingSerde(), false).createSerializer(Optional.empty()); + PageSerializer serializer = new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE).createSerializer(Optional.empty()); return serializer.serialize(page); } diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java index 19227447867f..3f3cea79b7f1 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdeFactory.java @@ -16,6 +16,7 @@ import io.trino.metadata.BlockEncodingManager; import io.trino.metadata.InternalBlockEncodingSerde; +import static io.trino.execution.buffer.CompressionCodec.LZ4; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; public class TestingPagesSerdeFactory @@ -26,6 +27,6 @@ public class TestingPagesSerdeFactory public TestingPagesSerdeFactory() { // compression should be enabled in as many tests as possible - super(BLOCK_ENCODING_SERDE, true); + super(BLOCK_ENCODING_SERDE, LZ4); } } diff --git a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java index f8d714857117..9983f98ca750 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java @@ -18,6 +18,7 @@ import io.airlift.units.DataSize; import io.trino.execution.StageId; import io.trino.execution.TaskId; +import io.trino.execution.buffer.CompressionCodec; import io.trino.execution.buffer.OutputBufferStateMachine; import io.trino.execution.buffer.PagesSerdeFactory; import io.trino.execution.buffer.PartitionedOutputBuffer; @@ -90,6 +91,7 @@ import static io.trino.block.BlockAssertions.createRandomBlockForType; import static io.trino.block.BlockAssertions.createRandomLongsBlock; import static io.trino.block.BlockAssertions.createRepeatedValuesBlock; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.execution.buffer.PipelinedOutputBuffers.BufferType.PARTITIONED; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.operator.output.BenchmarkPartitionedOutputOperator.BenchmarkData.TestType; @@ -143,8 +145,8 @@ public static class BenchmarkData @Param({"2", "16", "256"}) private int partitionCount = 256; - @Param({"true", "false"}) - private boolean enableCompression; + @Param({"LZ4", "NONE"}) + private CompressionCodec compressionCodec = NONE; @Param({"1", "2"}) private int channelCount = 1; @@ -449,7 +451,7 @@ private PartitionedOutputOperator createPartitionedOutputOperator() PartitionFunction partitionFunction = new BucketPartitionFunction( new HashBucketFunction(new PrecomputedHashGenerator(0), partitionCount), IntStream.range(0, partitionCount).toArray()); - PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), enableCompression); + PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), compressionCodec); PartitionedOutputBuffer buffer = createPartitionedOutputBuffer(); diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java index 898f5802e0f6..79b1fa250a40 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java @@ -74,6 +74,7 @@ import static io.trino.block.BlockAssertions.createLongsBlock; import static io.trino.block.BlockAssertions.createRandomBlockForType; import static io.trino.block.BlockAssertions.createRepeatedValuesBlock; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -109,7 +110,7 @@ public class TestPagePartitioner private static final int POSITIONS_PER_PAGE = 8; private static final int PARTITION_COUNT = 2; - private static final PagesSerdeFactory PAGES_SERDE_FACTORY = new PagesSerdeFactory(new TestingBlockEncodingSerde(), false); + private static final PagesSerdeFactory PAGES_SERDE_FACTORY = new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE); private static final PageDeserializer PAGE_DESERIALIZER = PAGES_SERDE_FACTORY.createDeserializer(Optional.empty()); private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed(getClass().getSimpleName() + "-executor-%s")); diff --git a/core/trino-main/src/test/java/io/trino/operator/spiller/BenchmarkBinaryFileSpiller.java b/core/trino-main/src/test/java/io/trino/operator/spiller/BenchmarkBinaryFileSpiller.java index 800169f04d04..758647453f31 100644 --- a/core/trino-main/src/test/java/io/trino/operator/spiller/BenchmarkBinaryFileSpiller.java +++ b/core/trino-main/src/test/java/io/trino/operator/spiller/BenchmarkBinaryFileSpiller.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; +import io.trino.execution.buffer.CompressionCodec; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.block.BlockEncodingSerde; @@ -95,8 +96,8 @@ public static class BenchmarkData @Param("10") private int pagesCount = 10; - @Param("false") - private boolean compressionEnabled; + @Param("NONE") + private CompressionCodec compressionCodec; @Param("true") private boolean encryptionEnabled; @@ -117,7 +118,7 @@ public void setup() spillerStats, ImmutableList.of(SPILL_PATH), 1.0, - compressionEnabled, + compressionCodec, encryptionEnabled); spillerFactory = new GenericSpillerFactory(singleStreamSpillerFactory); pages = createInputPages(); diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java b/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java index ca723183c382..924d821836bd 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestBinaryFileSpiller.java @@ -83,7 +83,7 @@ public void setUp() BlockEncodingSerde blockEncodingSerde = new TestingBlockEncodingSerde(); singleStreamSpillerFactory = new FileSingleStreamSpillerFactory(blockEncodingSerde, spillerStats, featuresConfig, nodeSpillConfig); factory = new GenericSpillerFactory(singleStreamSpillerFactory); - PagesSerdeFactory pagesSerdeFactory = new PagesSerdeFactory(blockEncodingSerde, nodeSpillConfig.isSpillCompressionEnabled()); + PagesSerdeFactory pagesSerdeFactory = new PagesSerdeFactory(blockEncodingSerde, nodeSpillConfig.getSpillCompressionCodec()); serializer = pagesSerdeFactory.createSerializer(Optional.empty()); memoryContext = newSimpleAggregatedMemoryContext(); } diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java index 97379246db10..4517965e09a1 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java @@ -18,6 +18,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.execution.buffer.CompressionCodec; import io.trino.execution.buffer.PagesSerdeUtil; import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.PageAssertions; @@ -42,6 +43,8 @@ import static com.google.common.io.MoreFiles.listFiles; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.trino.execution.buffer.CompressionCodec.LZ4; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.execution.buffer.PagesSerdeUtil.isSerializedPageCompressed; import static io.trino.execution.buffer.PagesSerdeUtil.isSerializedPageEncrypted; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -83,31 +86,31 @@ public void tearDown() public void testSpill() throws Exception { - assertSpill(false, false); + assertSpill(NONE, false); } @Test public void testSpillCompression() throws Exception { - assertSpill(true, false); + assertSpill(LZ4, false); } @Test public void testSpillEncryption() throws Exception { - assertSpill(false, true); + assertSpill(NONE, true); } @Test public void testSpillEncryptionWithCompression() throws Exception { - assertSpill(true, true); + assertSpill(LZ4, true); } - private void assertSpill(boolean compression, boolean encryption) + private void assertSpill(CompressionCodec compressionCodec, boolean encryption) throws Exception { FileSingleStreamSpillerFactory spillerFactory = new FileSingleStreamSpillerFactory( @@ -116,7 +119,7 @@ private void assertSpill(boolean compression, boolean encryption) new SpillerStats(), ImmutableList.of(spillPath.toPath()), 1.0, - compression, + compressionCodec, encryption); LocalMemoryContext memoryContext = newSimpleAggregatedMemoryContext().newLocalMemoryContext("test"); SingleStreamSpiller singleStreamSpiller = spillerFactory.create(TYPES, bytes -> {}, memoryContext); @@ -138,7 +141,7 @@ private void assertSpill(boolean compression, boolean encryption) .describedAs("at least one page should be successfully read back") .isTrue(); Slice serializedPage = serializedPages.next(); - assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compression); + assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compressionCodec == LZ4); assertThat(isSerializedPageEncrypted(serializedPage)).isEqualTo(encryption); } diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpillerFactory.java b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpillerFactory.java index 1d4d283bf0e2..1cc2d98c6fdb 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpillerFactory.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpillerFactory.java @@ -41,6 +41,7 @@ import static com.google.common.io.MoreFiles.listFiles; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static com.google.common.util.concurrent.Futures.getUnchecked; +import static io.trino.execution.buffer.CompressionCodec.NONE; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_PREFIX; @@ -279,7 +280,7 @@ private FileSingleStreamSpillerFactory spillerFactoryFactory(List paths, D new SpillerStats(), paths, maxUsedSpaceThreshold, - false, + NONE, false); } } diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestNodeSpillConfig.java b/core/trino-main/src/test/java/io/trino/spiller/TestNodeSpillConfig.java index c467042be916..cc078ae16a96 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestNodeSpillConfig.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestNodeSpillConfig.java @@ -24,6 +24,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.execution.buffer.CompressionCodec.NONE; +import static io.trino.execution.buffer.CompressionCodec.ZSTD; public class TestNodeSpillConfig { @@ -33,7 +35,7 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(NodeSpillConfig.class) .setMaxSpillPerNode(DataSize.of(100, GIGABYTE)) .setQueryMaxSpillPerNode(DataSize.of(100, GIGABYTE)) - .setSpillCompressionEnabled(false) + .setSpillCompressionCodec(NONE) .setSpillEncryptionEnabled(false)); } @@ -43,14 +45,14 @@ public void testExplicitPropertyMappings() Map properties = ImmutableMap.builder() .put("max-spill-per-node", "10MB") .put("query-max-spill-per-node", "15 MB") - .put("spill-compression-enabled", "true") + .put("spill-compression-codec", "ZSTD") .put("spill-encryption-enabled", "true") .buildOrThrow(); NodeSpillConfig expected = new NodeSpillConfig() .setMaxSpillPerNode(DataSize.of(10, MEGABYTE)) .setQueryMaxSpillPerNode(DataSize.of(15, MEGABYTE)) - .setSpillCompressionEnabled(true) + .setSpillCompressionCodec(ZSTD) .setSpillEncryptionEnabled(true); assertFullMapping(properties, expected); diff --git a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java index c1abf12975d9..3de5873787c0 100644 --- a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java +++ b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java @@ -27,6 +27,8 @@ import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.execution.buffer.CompressionCodec.NONE; +import static io.trino.execution.buffer.CompressionCodec.ZSTD; import static io.trino.sql.analyzer.RegexLibrary.JONI; import static io.trino.sql.analyzer.RegexLibrary.RE2J; @@ -50,7 +52,7 @@ public void testDefaults() .setSpillMaxUsedSpaceThreshold(0.9) .setMemoryRevokingThreshold(0.9) .setMemoryRevokingTarget(0.5) - .setExchangeCompressionEnabled(false) + .setExchangeCompressionCodec(NONE) .setExchangeDataIntegrityVerification(DataIntegrityVerification.ABORT) .setPagesIndexEagerCompactionEnabled(false) .setFilterAndProjectMinOutputPageSize(DataSize.of(500, KILOBYTE)) @@ -84,7 +86,7 @@ public void testExplicitPropertyMappings() .put("spiller-max-used-space-threshold", "0.8") .put("memory-revoking-threshold", "0.2") .put("memory-revoking-target", "0.8") - .put("exchange.compression-enabled", "true") + .put("exchange.compression-codec", "ZSTD") .put("exchange.data-integrity-verification", "RETRY") .put("pages-index.eager-compaction-enabled", "true") .put("filter-and-project-min-output-page-size", "1MB") @@ -115,7 +117,7 @@ public void testExplicitPropertyMappings() .setSpillMaxUsedSpaceThreshold(0.8) .setMemoryRevokingThreshold(0.2) .setMemoryRevokingTarget(0.8) - .setExchangeCompressionEnabled(true) + .setExchangeCompressionCodec(ZSTD) .setExchangeDataIntegrityVerification(DataIntegrityVerification.RETRY) .setPagesIndexEagerCompactionEnabled(true) .setFilterAndProjectMinOutputPageSize(DataSize.of(1, MEGABYTE)) diff --git a/docs/src/main/sphinx/admin/properties-exchange.md b/docs/src/main/sphinx/admin/properties-exchange.md index 3abaf957f667..ebaa3f9c0a9f 100644 --- a/docs/src/main/sphinx/admin/properties-exchange.md +++ b/docs/src/main/sphinx/admin/properties-exchange.md @@ -32,6 +32,15 @@ the maximum number of clients is value adjusts the heuristic, which may increase concurrency and improve network utilization. +## `exchange.compression-codec` + +- **Type:** {ref}`prop-type-string` +- **Allowed values:** `NONE`, `LZ4`, `ZSTD` +- **Default value:** `NONE` + +The compression codec to use when exchanging data between nodes. +Use `LZ4` in `Fault Tolerant Execution` mode by default. + ## `exchange.data-integrity-verification` - **Type:** {ref}`prop-type-string` diff --git a/docs/src/main/sphinx/admin/properties-spilling.md b/docs/src/main/sphinx/admin/properties-spilling.md index 62b4ad71d08b..cef400de958a 100644 --- a/docs/src/main/sphinx/admin/properties-spilling.md +++ b/docs/src/main/sphinx/admin/properties-spilling.md @@ -70,7 +70,15 @@ Limit for memory used for unspilling a single aggregation operator instance. - **Type:** {ref}`prop-type-boolean` - **Default value:** `false` -Enables data compression for pages spilled to disk. +Enables data compression for pages spilled to disk. It is replaced by `spill-compression-codec`. + +## `spill-compression-codec` + +- **Type:** {ref}`prop-type-string` +- **Allowed values:** `NONE`, `LZ4`, `ZSTD` +- **Default value:** `NONE` + +The compression codec to use when spilling pages to disk. ## `spill-encryption-enabled`