Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class FeaturesConfig
private boolean hideInaccessibleColumns;
private boolean forceSpillingJoin;

private boolean faultTolerantExecutionExchangeEncryptionEnabled = true;

public enum DataIntegrityVerification
{
NONE,
Expand Down Expand Up @@ -483,6 +485,18 @@ public FeaturesConfig setForceSpillingJoin(boolean forceSpillingJoin)
return this;
}

public boolean isFaultTolerantExecutionExchangeEncryptionEnabled()
{
return faultTolerantExecutionExchangeEncryptionEnabled;
}

@Config("fault-tolerant-execution.exchange-encryption-enabled")
public FeaturesConfig setFaultTolerantExecutionExchangeEncryptionEnabled(boolean faultTolerantExecutionExchangeEncryptionEnabled)
{
this.faultTolerantExecutionExchangeEncryptionEnabled = faultTolerantExecutionExchangeEncryptionEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
exchangeCompressionEnabled = true;
Expand Down
50 changes: 46 additions & 4 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.client.ProtocolHeaders;
Expand Down Expand Up @@ -82,6 +83,7 @@ public final class Session
private final SessionPropertyManager sessionPropertyManager;
private final Map<String, String> preparedStatements;
private final ProtocolHeaders protocolHeaders;
private final Optional<Slice> exchangeEncryptionKey;

public Session(
QueryId queryId,
Expand All @@ -106,7 +108,8 @@ public Session(
Map<String, Map<String, String>> catalogProperties,
SessionPropertyManager sessionPropertyManager,
Map<String, String> preparedStatements,
ProtocolHeaders protocolHeaders)
ProtocolHeaders protocolHeaders,
Optional<Slice> exchangeEncryptionKey)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
Expand All @@ -130,6 +133,7 @@ public Session(
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.preparedStatements = requireNonNull(preparedStatements, "preparedStatements is null");
this.protocolHeaders = requireNonNull(protocolHeaders, "protocolHeaders is null");
this.exchangeEncryptionKey = requireNonNull(exchangeEncryptionKey, "exchangeEncryptionKey is null");

requireNonNull(catalogProperties, "catalogProperties is null");
ImmutableMap.Builder<String, Map<String, String>> catalogPropertiesBuilder = ImmutableMap.builder();
Expand Down Expand Up @@ -284,6 +288,11 @@ public ProtocolHeaders getProtocolHeaders()
return protocolHeaders;
}

public Optional<Slice> getExchangeEncryptionKey()
{
return exchangeEncryptionKey;
}

public Session beginTransactionId(TransactionId transactionId, TransactionManager transactionManager, AccessControl accessControl)
{
requireNonNull(transactionId, "transactionId is null");
Expand Down Expand Up @@ -346,7 +355,8 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage
connectorProperties.buildOrThrow(),
sessionPropertyManager,
preparedStatements,
protocolHeaders);
protocolHeaders,
exchangeEncryptionKey);
}

public Session withDefaultProperties(Map<String, String> systemPropertyDefaults, Map<String, Map<String, String>> catalogPropertyDefaults, AccessControl accessControl)
Expand Down Expand Up @@ -392,7 +402,38 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,
catalogProperties,
sessionPropertyManager,
preparedStatements,
protocolHeaders);
protocolHeaders,
exchangeEncryptionKey);
}

public Session withExchangeEncryption(Slice encryptionKey)
{
checkState(exchangeEncryptionKey.isEmpty(), "exchangeEncryptionKey is already present");
return new Session(
queryId,
transactionId,
clientTransactionSupport,
identity,
source,
catalog,
schema,
path,
traceToken,
timeZoneKey,
locale,
remoteUserAddress,
userAgent,
clientInfo,
clientTags,
clientCapabilities,
resourceEstimates,
start,
systemProperties,
catalogProperties,
sessionPropertyManager,
preparedStatements,
protocolHeaders,
Optional.of(encryptionKey));
}

public ConnectorSession toConnectorSession()
Expand Down Expand Up @@ -832,7 +873,8 @@ public Session build()
catalogSessionProperties,
sessionPropertyManager,
preparedStatements,
protocolHeaders);
protocolHeaders,
Optional.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.trino.metadata.SessionPropertyManager;
import io.trino.spi.QueryId;
import io.trino.spi.security.BasicPrincipal;
Expand Down Expand Up @@ -309,10 +310,10 @@ public Identity toIdentity(Map<String, String> extraCredentials)

public Session toSession(SessionPropertyManager sessionPropertyManager)
{
return toSession(sessionPropertyManager, emptyMap());
return toSession(sessionPropertyManager, emptyMap(), Optional.empty());
}

public Session toSession(SessionPropertyManager sessionPropertyManager, Map<String, String> extraCredentials)
public Session toSession(SessionPropertyManager sessionPropertyManager, Map<String, String> extraCredentials, Optional<Slice> exchangeEncryptionKey)
{
return new Session(
new QueryId(queryId),
Expand All @@ -337,6 +338,7 @@ public Session toSession(SessionPropertyManager sessionPropertyManager, Map<Stri
catalogProperties,
sessionPropertyManager,
preparedStatements,
createProtocolHeaders(protocolName));
createProtocolHeaders(protocolName),
exchangeEncryptionKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.log.Logger;
import io.trino.FeaturesConfig;
import io.trino.Session;
import io.trino.event.QueryMonitor;
import io.trino.execution.ClusterSizeMonitor;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class LocalDispatchQueryFactory
private final Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories;
private final WarningCollectorFactory warningCollectorFactory;
private final ListeningExecutorService executor;
private final boolean faultTolerantExecutionExchangeEncryptionEnabled;

@Inject
public LocalDispatchQueryFactory(
Expand All @@ -75,7 +77,8 @@ public LocalDispatchQueryFactory(
Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories,
WarningCollectorFactory warningCollectorFactory,
ClusterSizeMonitor clusterSizeMonitor,
DispatchExecutor dispatchExecutor)
DispatchExecutor dispatchExecutor,
FeaturesConfig featuresConfig)
{
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -87,6 +90,7 @@ public LocalDispatchQueryFactory(
this.warningCollectorFactory = requireNonNull(warningCollectorFactory, "warningCollectorFactory is null");
this.clusterSizeMonitor = requireNonNull(clusterSizeMonitor, "clusterSizeMonitor is null");
this.executor = dispatchExecutor.getExecutor();
this.faultTolerantExecutionExchangeEncryptionEnabled = requireNonNull(featuresConfig, "featuresConfig is null").isFaultTolerantExecutionExchangeEncryptionEnabled();
}

@Override
Expand All @@ -112,7 +116,8 @@ public DispatchQuery createDispatchQuery(
executor,
metadata,
warningCollector,
getQueryType(preparedQuery.getStatement()));
getQueryType(preparedQuery.getStatement()),
faultTolerantExecutionExchangeEncryptionEnabled);

// It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below
// can result in delivering query-created event after query analysis has already started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@
import static io.trino.execution.QueryState.TERMINAL_QUERY_STATES;
import static io.trino.execution.QueryState.WAITING_FOR_RESOURCES;
import static io.trino.execution.StageInfo.getAllStages;
import static io.trino.operator.RetryPolicy.TASK;
import static io.trino.server.DynamicFilterService.DynamicFiltersStats;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.StandardErrorCode.USER_CANCELED;
import static io.trino.util.Ciphers.createRandomAesEncryptionKey;
import static io.trino.util.Ciphers.serializeAesEncryptionKey;
import static io.trino.util.Failures.toFailure;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -216,7 +219,8 @@ public static QueryStateMachine begin(
Executor executor,
Metadata metadata,
WarningCollector warningCollector,
Optional<QueryType> queryType)
Optional<QueryType> queryType,
boolean faultTolerantExecutionExchangeEncryptionEnabled)
{
return beginWithTicker(
existingTransactionId,
Expand All @@ -232,7 +236,8 @@ public static QueryStateMachine begin(
Ticker.systemTicker(),
metadata,
warningCollector,
queryType);
queryType,
faultTolerantExecutionExchangeEncryptionEnabled);
}

static QueryStateMachine beginWithTicker(
Expand All @@ -249,7 +254,8 @@ static QueryStateMachine beginWithTicker(
Ticker ticker,
Metadata metadata,
WarningCollector warningCollector,
Optional<QueryType> queryType)
Optional<QueryType> queryType,
boolean faultTolerantExecutionExchangeEncryptionEnabled)
{
// if there is an existing transaction, activate it
existingTransactionId.ifPresent(transactionId -> {
Expand All @@ -270,6 +276,11 @@ static QueryStateMachine beginWithTicker(
session = session.beginTransactionId(transactionId, transactionManager, accessControl);
}

if (getRetryPolicy(session) == TASK && faultTolerantExecutionExchangeEncryptionEnabled) {
// encryption is mandatory for fault tolerant execution as it relies on an external storage to store intermediate data generated during an exchange
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we should enforce that. It should be configurable, but enabled by default. I can easily think of a deployment scenario when you do not care about encryption at all if storage is not really "external"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, let me add an FTE specific property. I would prefer not to make this property generic though due to non obvious interactions between enabling HTTP's in the cluster and the exchange encryption.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comment is not 100% valid right now (it is not mandatory any more)

session = session.withExchangeEncryption(serializeAesEncryptionKey(createRandomAesEncryptionKey()));
}

QueryStateMachine queryStateMachine = new QueryStateMachine(
query,
preparedQuery,
Expand Down
Loading