Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class FeaturesConfig
private boolean scaleWriters = true;
private DataSize writerMinSize = DataSize.of(32, DataSize.Unit.MEGABYTE);
private DataIntegrityVerification exchangeDataIntegrityVerification = DataIntegrityVerification.ABORT;
/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
private boolean exchangeCompressionEnabled;
private boolean pagesIndexEagerCompactionEnabled;
private boolean omitDateTimeTypePrecision;
Expand Down Expand Up @@ -479,4 +482,9 @@ public FeaturesConfig setForceSpillingJoin(boolean forceSpillingJoin)
this.forceSpillingJoin = forceSpillingJoin;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
exchangeCompressionEnabled = true;
}
Comment on lines +486 to +489
Copy link
Copy Markdown
Member

@hashhar hashhar Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while it's good to keep FTE defaults in same place as the other configs the method being public can lead to misuse. Config objects from Airlift are meant to be immutable so using the method anywhere except bindConfigDefaults would not have intended effects.

Would declaring a constant here and using it in bindConfigDefaults be better in that regards?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Airlift do anything special with set* methods?

Copy link
Copy Markdown
Member

@hashhar hashhar Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Airlift do anything special with set* methods?

Not sure, didn't check.

You can obviously call set* after the fact but since the config is Guice managed and also affects other modules via conditional modules the point in time at which the set* is called can have different impact and makes reasoning about whole thing problematic.

An example is #14921 (not related to modifying the config object but regarding the effect when the config is modified can have).

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class QueryManagerConfig
private int queryManagerExecutorPoolSize = 5;
private int queryExecutorPoolSize = 1000;

/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}
*/
private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES);
private int remoteTaskMaxCallbackThreads = 1000;

Expand Down Expand Up @@ -641,4 +644,9 @@ public QueryManagerConfig setFaultTolerantExecutionEventDrivenSchedulerEnabled(b
this.faultTolerantExecutionEventDrivenSchedulerEnabled = faultTolerantExecutionEventDrivenSchedulerEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
remoteTaskMaxErrorDuration = new Duration(1, MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class TaskManagerConfig
// more available processors, the default value could be above 1. Therefore, it can cause error due to config
// mismatch during execution. Additionally, cap it to 32 in order to avoid small pages produced by local
// partitioning exchanges.
/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
private int taskConcurrency = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
private int httpResponseThreads = 100;
private int httpTimeoutThreads = 3;
Expand Down Expand Up @@ -579,4 +582,9 @@ public TaskManagerConfig setInterruptStuckSplitTasksDetectionInterval(Duration i
this.interruptStuckSplitTasksDetectionInterval = interruptStuckSplitTasksDetectionInterval;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
taskConcurrency = 8;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class MemoryManagerConfig
private DataSize faultTolerantExecutionTaskRuntimeMemoryEstimationOverhead = DataSize.of(1, GIGABYTE);
private LowMemoryQueryKillerPolicy lowMemoryQueryKillerPolicy = LowMemoryQueryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private LowMemoryTaskKillerPolicy lowMemoryTaskKillerPolicy = LowMemoryTaskKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
*/
private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES);

public LowMemoryQueryKillerPolicy getLowMemoryQueryKillerPolicy()
Expand Down Expand Up @@ -189,6 +192,11 @@ public MemoryManagerConfig setFaultTolerantExecutionTaskMemoryEstimationQuantile
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
killOnOutOfMemoryDelay = new Duration(0, MINUTES);
}

public enum LowMemoryQueryKillerPolicy
{
NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import io.trino.operator.OperatorFactories;
import io.trino.operator.PagesIndex;
import io.trino.operator.PagesIndexPageSorter;
import io.trino.operator.RetryPolicy;
import io.trino.operator.TrinoOperatorFactories;
import io.trino.operator.index.IndexJoinLookupStats;
import io.trino.operator.scalar.json.JsonExistsFunction;
Expand Down Expand Up @@ -173,6 +174,7 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.TOPOLOGY;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static io.trino.operator.RetryPolicy.TASK;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
Expand Down Expand Up @@ -211,16 +213,24 @@ protected void setup(Binder binder)
binder.bind(HttpRequestSessionContextFactory.class).in(Scopes.SINGLETON);
install(new InternalCommunicationModule());

QueryManagerConfig queryManagerConfig = buildConfigObject(QueryManagerConfig.class);
RetryPolicy retryPolicy = queryManagerConfig.getRetryPolicy();
if (retryPolicy == TASK) {
configBinder(binder).bindConfigDefaults(QueryManagerConfig.class, QueryManagerConfig::applyFaultTolerantExecutionDefaults);
}

configBinder(binder).bindConfig(FeaturesConfig.class);
if (retryPolicy == TASK) {
configBinder(binder).bindConfigDefaults(FeaturesConfig.class, FeaturesConfig::applyFaultTolerantExecutionDefaults);
}

configBinder(binder).bindConfig(OptimizerConfig.class);
configBinder(binder).bindConfig(ProtocolConfig.class);

binder.bind(SqlParser.class).in(Scopes.SINGLETON);

jaxrsBinder(binder).bind(ThrowableMapper.class);

configBinder(binder).bindConfig(QueryManagerConfig.class);

configBinder(binder).bindConfig(SqlEnvironmentConfig.class);

newOptionalBinder(binder, ExplainAnalyzeContext.class);
Expand Down Expand Up @@ -284,6 +294,10 @@ protected void setup(Binder binder)
newExporter(binder).export(PauseMeter.class).withGeneratedName();

configBinder(binder).bindConfig(MemoryManagerConfig.class);
if (retryPolicy == TASK) {
configBinder(binder).bindConfigDefaults(MemoryManagerConfig.class, MemoryManagerConfig::applyFaultTolerantExecutionDefaults);
}

configBinder(binder).bindConfig(NodeMemoryConfig.class);
binder.bind(LocalMemoryManager.class).in(Scopes.SINGLETON);
binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON);
Expand All @@ -300,6 +314,9 @@ protected void setup(Binder binder)
binder.bind(PageFunctionCompiler.class).in(Scopes.SINGLETON);
newExporter(binder).export(PageFunctionCompiler.class).withGeneratedName();
configBinder(binder).bindConfig(TaskManagerConfig.class);
if (retryPolicy == TASK) {
configBinder(binder).bindConfigDefaults(TaskManagerConfig.class, TaskManagerConfig::applyFaultTolerantExecutionDefaults);
}
binder.bind(IndexJoinLookupStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(IndexJoinLookupStats.class).withGeneratedName();
binder.bind(AsyncHttpExecutionMBean.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1864,11 +1864,6 @@ public void testCreateTableWithUnsupportedType()

@Test
public void testTargetMaxFileSize()
{
testTargetMaxFileSize(3);
}

protected void testTargetMaxFileSize(int expectedTableWriters)
{
// We use TEXTFILE in this test because is has a very consistent and predictable size
@Language("SQL") String createTableSql = "CREATE TABLE test_max_file_size WITH (format = 'TEXTFILE') AS SELECT * FROM tpch.sf1.lineitem LIMIT 1000000";
Expand All @@ -1878,11 +1873,12 @@ protected void testTargetMaxFileSize(int expectedTableWriters)
Session session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("scale_writers", "false")
.setSystemProperty("redistribute_writes", "false")
// task scale writers should be disabled since we want to write with a single task writer
.setSystemProperty("task_scale_writers_enabled", "false")
.build();
assertUpdate(session, createTableSql, 1000000);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(expectedTableWriters);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(1);
assertUpdate("DROP TABLE test_max_file_size");

// Write table with small limit and verify we get multiple files per node near the expected size
Expand All @@ -1897,7 +1893,7 @@ protected void testTargetMaxFileSize(int expectedTableWriters)

assertUpdate(session, createTableSql, 1000000);
MaterializedResult result = computeActual(selectFileInfo);
assertThat(result.getRowCount()).isGreaterThan(expectedTableWriters);
assertThat(result.getRowCount()).isGreaterThan(1);
for (MaterializedRow row : result) {
// allow up to a larger delta due to the very small max size and the relatively large writer chunk size
assertThat((Long) row.getField(1)).isLessThan(maxSize.toBytes() * 3);
Expand All @@ -1908,11 +1904,6 @@ protected void testTargetMaxFileSize(int expectedTableWriters)

@Test
public void testTargetMaxFileSizePartitioned()
{
testTargetMaxFileSizePartitioned(3);
}

protected void testTargetMaxFileSizePartitioned(int expectedTableWriters)
{
// We use TEXTFILE in this test because is has a very consistent and predictable size
@Language("SQL") String createTableSql = "" +
Expand All @@ -1928,9 +1919,10 @@ protected void testTargetMaxFileSizePartitioned(int expectedTableWriters)
// task scale writers should be disabled since we want to write a single file
.setSystemProperty("task_scale_writers_enabled", "false")
.setSystemProperty("scale_writers", "false")
.setSystemProperty("redistribute_writes", "false")
.build();
assertUpdate(session, createTableSql, 1000000);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(expectedTableWriters * 3);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(3);
assertUpdate("DROP TABLE test_max_file_size_partitioned");

// Write table with small limit and verify we get multiple files per node near the expected size
Expand All @@ -1946,7 +1938,7 @@ protected void testTargetMaxFileSizePartitioned(int expectedTableWriters)

assertUpdate(session, createTableSql, 1000000);
MaterializedResult result = computeActual(selectFileInfo);
assertThat(result.getRowCount()).isGreaterThan(expectedTableWriters * 3);
assertThat(result.getRowCount()).isGreaterThan(3);
for (MaterializedRow row : result) {
// allow up to a larger delta due to the very small max size and the relatively large writer chunk size
assertThat((Long) row.getField(1)).isLessThan(maxSize.toBytes() * 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ public void testWritersAcrossMultipleWorkersWhenScaleWritersIsEnabled()
// Not applicable for fault-tolerant mode.
}

@Override
public void testTargetMaxFileSize()
{
testTargetMaxFileSize(9);
}

@Override
public void testTargetMaxFileSizePartitioned()
{
testTargetMaxFileSizePartitioned(9);
}

@Override
public void testOptimize()
{
Expand Down