Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,23 +254,23 @@ public void testSession()
try (Connection connection = createConnection()) {
assertThat(listSession(connection))
.contains("join_distribution_type|AUTOMATIC|AUTOMATIC")
.contains("exchange_compression|false|false");
.contains("exchange_compression_codec|NONE|NONE");

try (Statement statement = connection.createStatement()) {
statement.execute("SET SESSION join_distribution_type = 'BROADCAST'");
}

assertThat(listSession(connection))
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
.contains("exchange_compression|false|false");
.contains("exchange_compression_codec|NONE|NONE");

try (Statement statement = connection.createStatement()) {
statement.execute("SET SESSION exchange_compression = true");
statement.execute("SET SESSION exchange_compression_codec = 'LZ4'");
}

assertThat(listSession(connection))
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
.contains("exchange_compression|true|false");
.contains("exchange_compression_codec|LZ4|NONE");

try (Statement statement = connection.createStatement()) {
// setting Hive session properties requires the admin role
Expand All @@ -286,7 +286,7 @@ public void testSession()

assertThat(listSession(connection))
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
.contains("exchange_compression|true|false")
.contains("exchange_compression_codec|LZ4|NONE")
.contains(format("spatial_partitioning_table_name|%s|", value));
}
catch (Exception e) {
Expand Down
26 changes: 19 additions & 7 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.sql.analyzer.RegexLibrary;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
Expand All @@ -35,6 +36,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.sql.analyzer.RegexLibrary.JONI;

@DefunctConfig({
Expand Down Expand Up @@ -80,7 +83,7 @@ public class FeaturesConfig
/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
private boolean exchangeCompressionEnabled;
private CompressionCodec exchangeCompressionCodec = NONE;
private boolean pagesIndexEagerCompactionEnabled;
private boolean omitDateTimeTypePrecision;
private int maxRecursionDepth = 10;
Expand Down Expand Up @@ -328,15 +331,24 @@ public FeaturesConfig setSpillMaxUsedSpaceThreshold(double spillMaxUsedSpaceThre
return this;
}

public boolean isExchangeCompressionEnabled()
@Deprecated
@LegacyConfig(value = "exchange.compression-enabled", replacedBy = "exchange.compression-codec")
public FeaturesConfig setExchangeCompressionEnabled(boolean exchangeCompressionEnabled)
{
return exchangeCompressionEnabled;
this.exchangeCompressionCodec = exchangeCompressionEnabled ? LZ4 : NONE;
return this;
}

@Config("exchange.compression-enabled")
public FeaturesConfig setExchangeCompressionEnabled(boolean exchangeCompressionEnabled)
public CompressionCodec getExchangeCompressionCodec()
{
return exchangeCompressionCodec;
}

@Config("exchange.compression-codec")
@ConfigDescription("Compression codec used for data in exchanges")
public FeaturesConfig setExchangeCompressionCodec(CompressionCodec exchangeCompressionCodec)
{
this.exchangeCompressionEnabled = exchangeCompressionEnabled;
this.exchangeCompressionCodec = exchangeCompressionCodec;
return this;
}

Expand Down Expand Up @@ -500,6 +512,6 @@ public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean

public void applyFaultTolerantExecutionDefaults()
{
exchangeCompressionEnabled = true;
exchangeCompressionCodec = LZ4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.memory.MemoryManagerConfig;
import io.trino.memory.NodeMemoryConfig;
Expand Down Expand Up @@ -107,7 +108,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZE_DISTINCT_AGGREGATIONS = "optimize_mixed_distinct_aggregations";
public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout";
public static final String ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID = "enable_forced_exchange_below_group_id";
public static final String EXCHANGE_COMPRESSION = "exchange_compression";
public static final String EXCHANGE_COMPRESSION_CODEC = "exchange_compression_codec";
public static final String ENABLE_INTERMEDIATE_AGGREGATIONS = "enable_intermediate_aggregations";
public static final String PUSH_AGGREGATION_THROUGH_OUTER_JOIN = "push_aggregation_through_outer_join";
public static final String PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN = "push_partial_aggregation_through_join";
Expand Down Expand Up @@ -505,10 +506,11 @@ public SystemSessionProperties(
"Enable a stats-based rule adding exchanges below GroupId",
optimizerConfig.isEnableForcedExchangeBelowGroupId(),
true),
booleanProperty(
EXCHANGE_COMPRESSION,
"Enable compression in exchanges",
featuresConfig.isExchangeCompressionEnabled(),
enumProperty(
EXCHANGE_COMPRESSION_CODEC,
"Compression codec used for data in exchanges, supports NONE, LZ4, ZSTD",
CompressionCodec.class,
featuresConfig.getExchangeCompressionCodec(),
false),
booleanProperty(
ENABLE_INTERMEDIATE_AGGREGATIONS,
Expand Down Expand Up @@ -1328,9 +1330,9 @@ public static boolean isEnableForcedExchangeBelowGroupId(Session session)
return session.getSystemProperty(ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID, Boolean.class);
}

public static boolean isExchangeCompressionEnabled(Session session)
public static CompressionCodec getExchangeCompressionCodec(Session session)
{
return session.getSystemProperty(EXCHANGE_COMPRESSION, Boolean.class);
return session.getSystemProperty(EXCHANGE_COMPRESSION_CODEC, CompressionCodec.class);
}

public static boolean isEnableIntermediateAggregations(Session session)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.buffer;

public enum CompressionCodec
{
NONE, LZ4, ZSTD
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ public class PageDeserializer

public PageDeserializer(
BlockEncodingSerde blockEncodingSerde,
boolean compressionEnabled,
Optional<Decompressor> decompressor,
Optional<SecretKey> encryptionKey,
int blockSizeInBytes)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
requireNonNull(encryptionKey, "encryptionKey is null");
encryptionKey.ifPresent(secretKey -> checkArgument(is256BitSecretKeySpec(secretKey), "encryptionKey is expected to be an instance of SecretKeySpec containing a 256bit key"));
input = new SerializedPageInput(
compressionEnabled ? Optional.of(new Lz4Decompressor()) : Optional.empty(),
requireNonNull(decompressor, "decompressor is null"),
encryptionKey,
blockSizeInBytes);
}
Expand All @@ -95,13 +95,13 @@ private static class SerializedPageInput
private static final int DECOMPRESSOR_RETAINED_SIZE = instanceSize(Lz4Decompressor.class);
private static final int ENCRYPTION_KEY_RETAINED_SIZE = toIntExact(instanceSize(SecretKeySpec.class) + sizeOfByteArray(256 / 8));

private final Optional<Lz4Decompressor> decompressor;
private final Optional<Decompressor> decompressor;
private final Optional<SecretKey> encryptionKey;
private final Optional<Cipher> cipher;

private final ReadBuffer[] buffers;

private SerializedPageInput(Optional<Lz4Decompressor> decompressor, Optional<SecretKey> encryptionKey, int blockSizeInBytes)
private SerializedPageInput(Optional<Decompressor> decompressor, Optional<SecretKey> encryptionKey, int blockSizeInBytes)
{
this.decompressor = requireNonNull(decompressor, "decompressor is null");
this.encryptionKey = requireNonNull(encryptionKey, "encryptionKey is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ public class PageSerializer

public PageSerializer(
BlockEncodingSerde blockEncodingSerde,
boolean compressionEnabled,
Optional<Compressor> compressor,
Optional<SecretKey> encryptionKey,
int blockSizeInBytes)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
requireNonNull(encryptionKey, "encryptionKey is null");
encryptionKey.ifPresent(secretKey -> checkArgument(is256BitSecretKeySpec(secretKey), "encryptionKey is expected to be an instance of SecretKeySpec containing a 256bit key"));
output = new SerializedPageOutput(
compressionEnabled ? Optional.of(new Lz4Compressor()) : Optional.empty(),
requireNonNull(compressor, "compressor is null"),
encryptionKey,
blockSizeInBytes);
}
Expand All @@ -100,7 +100,7 @@ private static class SerializedPageOutput

private static final double MINIMUM_COMPRESSION_RATIO = 0.8;

private final Optional<Lz4Compressor> compressor;
private final Optional<Compressor> compressor;
private final Optional<SecretKey> encryptionKey;
private final int markers;
private final Optional<Cipher> cipher;
Expand All @@ -109,7 +109,7 @@ private static class SerializedPageOutput
private int uncompressedSize;

private SerializedPageOutput(
Optional<Lz4Compressor> compressor,
Optional<Compressor> compressor,
Optional<SecretKey> encryptionKey,
int blockSizeInBytes)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
*/
package io.trino.execution.buffer;

import io.airlift.compress.Compressor;
import io.airlift.compress.Decompressor;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import io.trino.spi.block.BlockEncodingSerde;

import javax.crypto.SecretKey;
Expand All @@ -26,21 +32,39 @@ public class PagesSerdeFactory
private static final int SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES = 64 * 1024;

private final BlockEncodingSerde blockEncodingSerde;
private final boolean compressionEnabled;
private final CompressionCodec compressionCodec;

public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, boolean compressionEnabled)
public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.compressionEnabled = compressionEnabled;
this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null");
}

public PageSerializer createSerializer(Optional<SecretKey> encryptionKey)
{
return new PageSerializer(blockEncodingSerde, compressionEnabled, encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES);
return new PageSerializer(blockEncodingSerde, createCompressor(compressionCodec), encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES);
}

public PageDeserializer createDeserializer(Optional<SecretKey> encryptionKey)
{
return new PageDeserializer(blockEncodingSerde, compressionEnabled, encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES);
return new PageDeserializer(blockEncodingSerde, createDecompressor(compressionCodec), encryptionKey, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES);
}

public static Optional<Compressor> createCompressor(CompressionCodec compressionCodec)
{
return switch (compressionCodec) {
case NONE -> Optional.empty();
case LZ4 -> Optional.of(new Lz4Compressor());
case ZSTD -> Optional.of(new ZstdCompressor());
};
}

public static Optional<Decompressor> createDecompressor(CompressionCodec compressionCodec)
{
return switch (compressionCodec) {
case NONE -> Optional.empty();
case LZ4 -> Optional.of(new Lz4Decompressor());
case ZSTD -> Optional.of(new ZstdDecompressor());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.addTimeout;
import static io.trino.SystemSessionProperties.getExchangeCompressionCodec;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.isExchangeCompressionEnabled;
import static io.trino.execution.QueryState.FAILED;
import static io.trino.execution.QueryState.FINISHING;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
Expand Down Expand Up @@ -236,7 +236,7 @@ private Query(
this.resultsProcessorExecutor = resultsProcessorExecutor;
this.timeoutExecutor = timeoutExecutor;
this.supportsParametricDateTime = session.getClientCapabilities().contains(ClientCapabilities.PARAMETRIC_DATETIME.toString());
deserializer = new PagesSerdeFactory(blockEncodingSerde, isExchangeCompressionEnabled(session))
deserializer = new PagesSerdeFactory(blockEncodingSerde, getExchangeCompressionCodec(session))
.createDeserializer(session.getExchangeEncryptionKey().map(Ciphers::deserializeAesEncryptionKey));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.log.Logger;
import io.trino.FeaturesConfig;
import io.trino.cache.NonKeyEvictableLoadingCache;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.SpillContext;
Expand Down Expand Up @@ -92,7 +93,7 @@ public FileSingleStreamSpillerFactory(BlockEncodingSerde blockEncodingSerde, Spi
spillerStats,
featuresConfig.getSpillerSpillPaths(),
featuresConfig.getSpillMaxUsedSpaceThreshold(),
nodeSpillConfig.isSpillCompressionEnabled(),
nodeSpillConfig.getSpillCompressionCodec(),
nodeSpillConfig.isSpillEncryptionEnabled());
}

Expand All @@ -103,10 +104,10 @@ public FileSingleStreamSpillerFactory(
SpillerStats spillerStats,
List<Path> spillPaths,
double maxUsedSpaceThreshold,
boolean spillCompressionEnabled,
CompressionCodec compressionCodec,
boolean spillEncryptionEnabled)
{
this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, spillCompressionEnabled);
this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, compressionCodec);
this.executor = requireNonNull(executor, "executor is null");
this.spillerStats = requireNonNull(spillerStats, "spillerStats cannot be null");
requireNonNull(spillPaths, "spillPaths is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@
package io.trino.spiller;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.trino.execution.buffer.CompressionCodec;
import jakarta.validation.constraints.NotNull;

import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;

@DefunctConfig("experimental.spill-compression-enabled")
public class NodeSpillConfig
{
private DataSize maxSpillPerNode = DataSize.of(100, DataSize.Unit.GIGABYTE);
private DataSize queryMaxSpillPerNode = DataSize.of(100, DataSize.Unit.GIGABYTE);

private boolean spillCompressionEnabled;
private CompressionCodec spillCompressionCodec = NONE;
private boolean spillEncryptionEnabled;

@NotNull
Expand Down Expand Up @@ -54,16 +61,24 @@ public NodeSpillConfig setQueryMaxSpillPerNode(DataSize queryMaxSpillPerNode)
return this;
}

public boolean isSpillCompressionEnabled()
@Deprecated
@LegacyConfig(value = "spill-compression-enabled", replacedBy = "spill-compression-codec")
public NodeSpillConfig setSpillCompressionEnabled(boolean spillCompressionEnabled)
{
return spillCompressionEnabled;
this.spillCompressionCodec = spillCompressionEnabled ? LZ4 : NONE;
return this;
}

@Config("spill-compression-enabled")
@LegacyConfig("experimental.spill-compression-enabled")
public NodeSpillConfig setSpillCompressionEnabled(boolean spillCompressionEnabled)
public CompressionCodec getSpillCompressionCodec()
{
return spillCompressionCodec;
}

@Config("spill-compression-codec")
@ConfigDescription("Compression codec used for data in spills")
public NodeSpillConfig setSpillCompressionCodec(CompressionCodec spillCompressionCodec)
{
this.spillCompressionEnabled = spillCompressionEnabled;
this.spillCompressionCodec = spillCompressionCodec;
return this;
}

Expand Down
Loading