Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ jobs:
- ":trino-clickhouse"
- ":trino-hive,:trino-orc"
- ":trino-hive,:trino-parquet -P test-parquet"
- ":trino-hive -P test-failure-recovery"
- ":trino-mongodb,:trino-kafka,:trino-elasticsearch"
- ":trino-elasticsearch -P test-stats"
- ":trino-redis"
Expand Down
36 changes: 36 additions & 0 deletions client/trino-client/src/main/java/io/trino/client/StageStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class StageStats
private final long processedRows;
private final long processedBytes;
private final long physicalInputBytes;
private final int failedTasks;
private final boolean coordinatorOnly;
private final List<StageStats> subStages;

@JsonCreator
Expand All @@ -57,6 +59,8 @@ public StageStats(
@JsonProperty("processedRows") long processedRows,
@JsonProperty("processedBytes") long processedBytes,
@JsonProperty("physicalInputBytes") long physicalInputBytes,
@JsonProperty("failedTasks") int failedTasks,
@JsonProperty("coordinatorOnly") boolean coordinatorOnly,
Comment on lines 62 to 63
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those changes do not seem tests specific

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not, but I need them for tests. Not sure if it makes sense to extract them into a separate commit, as then it would be harder to understand why do we need it.

@JsonProperty("subStages") List<StageStats> subStages)
{
this.stageId = stageId;
Expand All @@ -72,6 +76,8 @@ public StageStats(
this.processedRows = processedRows;
this.processedBytes = processedBytes;
this.physicalInputBytes = physicalInputBytes;
this.failedTasks = failedTasks;
this.coordinatorOnly = coordinatorOnly;
this.subStages = ImmutableList.copyOf(requireNonNull(subStages, "subStages is null"));
}

Expand Down Expand Up @@ -153,6 +159,18 @@ public long getPhysicalInputBytes()
return physicalInputBytes;
}

@JsonProperty
public int getFailedTasks()
{
return failedTasks;
}

@JsonProperty
public boolean isCoordinatorOnly()
{
return coordinatorOnly;
}

@JsonProperty
public List<StageStats> getSubStages()
{
Expand All @@ -175,6 +193,8 @@ public String toString()
.add("processedRows", processedRows)
.add("processedBytes", processedBytes)
.add("physicalInputBytes", physicalInputBytes)
.add("failedTasks", failedTasks)
.add("coordinatorOnly", coordinatorOnly)
.add("subStages", subStages)
.toString();
}
Expand All @@ -199,6 +219,8 @@ public static class Builder
private long processedRows;
private long processedBytes;
private long physicalInputBytes;
private int failedTasks;
private boolean coordinatorOnly;
private List<StageStats> subStages;

private Builder() {}
Expand Down Expand Up @@ -281,6 +303,18 @@ public Builder setPhysicalInputBytes(long physicalInputBytes)
return this;
}

public Builder setFailedTasks(int failedTasks)
{
this.failedTasks = failedTasks;
return this;
}

public Builder setCoordinatorOnly(boolean coordinatorOnly)
{
this.coordinatorOnly = coordinatorOnly;
return this;
}

public Builder setSubStages(List<StageStats> subStages)
{
this.subStages = ImmutableList.copyOf(requireNonNull(subStages, "subStages is null"));
Expand All @@ -303,6 +337,8 @@ public StageStats build()
processedRows,
processedBytes,
physicalInputBytes,
failedTasks,
coordinatorOnly,
subStages);
}
}
Expand Down
61 changes: 61 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MaxDataSize;
import io.trino.operator.RetryPolicy;
import io.trino.sql.analyzer.RegexLibrary;

import javax.validation.constraints.DecimalMax;
Expand All @@ -40,6 +41,7 @@
import static io.trino.sql.analyzer.RegexLibrary.JONI;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig({
"analyzer.experimental-syntax-enabled",
Expand Down Expand Up @@ -145,6 +147,11 @@ public class FeaturesConfig
private boolean disableSetPropertiesSecurityCheckForCreateDdl;
private boolean incrementalHashArrayLoadFactorEnabled = true;

private RetryPolicy retryPolicy = RetryPolicy.NONE;
private int retryAttempts = 4;
private Duration retryInitialDelay = new Duration(10, SECONDS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be too long for the default initial delay, especially for very short queries

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting it to something lower (<6s) might not give enough time for failure detector to detect a faulty node: https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/failuredetector/FailureDetectorConfig.java#L30

private Duration retryMaxDelay = new Duration(1, MINUTES);

public enum JoinReorderingStrategy
{
NONE,
Expand Down Expand Up @@ -1108,4 +1115,58 @@ public FeaturesConfig setIncrementalHashArrayLoadFactorEnabled(boolean increment
this.incrementalHashArrayLoadFactorEnabled = incrementalHashArrayLoadFactorEnabled;
return this;
}

@NotNull
public RetryPolicy getRetryPolicy()
{
return retryPolicy;
}

@Config("retry-policy")
public FeaturesConfig setRetryPolicy(RetryPolicy retryPolicy)
{
this.retryPolicy = retryPolicy;
return this;
}

@Min(0)
public int getRetryAttempts()
{
return retryAttempts;
}

@Config("retry-attempts")
public FeaturesConfig setRetryAttempts(int retryAttempts)
{
this.retryAttempts = retryAttempts;
return this;
}

@NotNull
public Duration getRetryInitialDelay()
{
return retryInitialDelay;
}

@Config("retry-initial-delay")
@ConfigDescription("Initial delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt up to 'retry_max_delay'")
public FeaturesConfig setRetryInitialDelay(Duration retryInitialDelay)
{
this.retryInitialDelay = retryInitialDelay;
return this;
}

@NotNull
public Duration getRetryMaxDelay()
{
return retryMaxDelay;
}

@Config("retry-max-delay")
@ConfigDescription("Maximum delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt starting from 'retry_initial_delay'")
public FeaturesConfig setRetryMaxDelay(Duration retryMaxDelay)
{
this.retryMaxDelay = retryMaxDelay;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.memory.MemoryManagerConfig;
import io.trino.memory.NodeMemoryConfig;
import io.trino.operator.RetryPolicy;
import io.trino.spi.TrinoException;
import io.trino.spi.session.PropertyMetadata;

Expand All @@ -37,6 +38,7 @@
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
Expand Down Expand Up @@ -143,6 +145,10 @@ public final class SystemSessionProperties
public static final String LEGACY_CATALOG_ROLES = "legacy_catalog_roles";
public static final String INCREMENTAL_HASH_ARRAY_LOAD_FACTOR_ENABLED = "incremental_hash_array_load_factor_enabled";
public static final String MAX_PARTIAL_TOP_N_MEMORY = "max_partial_top_n_memory";
public static final String RETRY_POLICY = "retry_policy";
public static final String RETRY_ATTEMPTS = "retry_attempts";
public static final String RETRY_INITIAL_DELAY = "retry_initial_delay";
public static final String RETRY_MAX_DELAY = "retry_max_delay";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -665,6 +671,27 @@ public SystemSessionProperties(
MAX_PARTIAL_TOP_N_MEMORY,
"Max memory size for partial Top N aggregations. This can be turned off by setting it with '0'.",
taskManagerConfig.getMaxPartialTopNMemory(),
false),
enumProperty(
RETRY_POLICY,
"Retry policy",
RetryPolicy.class,
featuresConfig.getRetryPolicy(),
false),
integerProperty(
RETRY_ATTEMPTS,
"Maximum number of retry attempts",
featuresConfig.getRetryAttempts(),
false),
durationProperty(
RETRY_INITIAL_DELAY,
"Initial delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt up to 'retry_max_delay'",
featuresConfig.getRetryInitialDelay(),
false),
durationProperty(
RETRY_MAX_DELAY,
"Maximum delay before initiating a retry attempt. Delay increases exponentially for each subsequent attempt starting from 'retry_initial_delay'",
featuresConfig.getRetryMaxDelay(),
false));
}

Expand Down Expand Up @@ -1183,4 +1210,33 @@ public static DataSize getMaxPartialTopNMemory(Session session)
{
return session.getSystemProperty(MAX_PARTIAL_TOP_N_MEMORY, DataSize.class);
}

public static RetryPolicy getRetryPolicy(Session session)
{
RetryPolicy retryPolicy = session.getSystemProperty(RETRY_POLICY, RetryPolicy.class);
if (retryPolicy != RetryPolicy.NONE) {
if (isEnableDynamicFiltering(session)) {
throw new TrinoException(NOT_SUPPORTED, "Dynamic filtering is not supported with automatic retries enabled");
}
if (isDistributedSortEnabled(session)) {
throw new TrinoException(NOT_SUPPORTED, "Distributed sort is not supported with automatic retries enabled");
}
}
return retryPolicy;
}

public static int getRetryAttempts(Session session)
{
return session.getSystemProperty(RETRY_ATTEMPTS, Integer.class);
}

public static Duration getRetryInitialDelay(Session session)
{
return session.getSystemProperty(RETRY_INITIAL_DELAY, Duration.class);
}

public static Duration getRetryMaxDelay(Session session)
{
return session.getSystemProperty(RETRY_MAX_DELAY, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void splitCompletedEvent(TaskId taskId, DriverStats driverStats, @Nullab
new SplitCompletedEvent(
taskId.getQueryId().toString(),
taskId.getStageId().toString(),
Integer.toString(taskId.getId()),
taskId.toString(),
splitCatalog,
driverStats.getCreateTime().toDate().toInstant(),
Optional.ofNullable(driverStats.getStartTime()).map(startTime -> startTime.toDate().toInstant()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public void addOutputInfoListener(Consumer<QueryOutputInfo> listener)
// DDL does not have an output
}

@Override
public void outputTaskFailed(TaskId taskId, Throwable failure)
{
// DDL does not have an output
}

@Override
public ListenableFuture<QueryState> getStateChange(QueryState currentState)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;

import javax.validation.constraints.NotNull;

import static java.util.concurrent.TimeUnit.MINUTES;

public class FailureInjectionConfig
{
private Duration expirationPeriod = new Duration(10, MINUTES);
private Duration requestTimeout = new Duration(2, MINUTES);

@NotNull
public Duration getExpirationPeriod()
{
return expirationPeriod;
}

@Config("failure-injection.expiration-period")
@ConfigDescription("Period after which an injected failure is considered expired and will no longer be triggering a failure")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need expiration of an injected failure?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to avoid memory leaks

public FailureInjectionConfig setExpirationPeriod(Duration expirationPeriod)
{
this.expirationPeriod = expirationPeriod;
return this;
}

@NotNull
public Duration getRequestTimeout()
{
return requestTimeout;
}

@Config("failure-injection.request-timeout")
@ConfigDescription("Period after which requests blocked to emulate a timeout are released")
public FailureInjectionConfig setRequestTimeout(Duration requestTimeout)
{
this.requestTimeout = requestTimeout;
return this;
}
}
Loading