Decrease a lock contention in PipelinedStageExecution#14138
Conversation
losipiuk
left a comment
There was a problem hiding this comment.
Looks good. Some questions and editorials.
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
b6067e5 to
35a509f
Compare
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Outdated
Show resolved
Hide resolved
35a509f to
84b53f7
Compare
Taking a monitor of io.trino.execution.scheduler.PipelinedStageExection in the updateTaskStatus method causes a high lock contention. Make this method lock-less.
84b53f7 to
b09f551
Compare
lukasz-stec
left a comment
There was a problem hiding this comment.
some comments added.
Additionally, I think using both allTasksLock and concurrent map + atomicIntegers does not increase thread safety.
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Show resolved
Hide resolved
What do you mean? |
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
Show resolved
Hide resolved
|
|
||
| private void updateTaskStatus(TaskStatus taskStatus) | ||
| { | ||
| State stageState = stateMachine.getState(); |
There was a problem hiding this comment.
I think this can be simplified without introducing too much complex multi-threaded code. A lot of the code in updateTaskStatus doesn't need to be synchronized:
private void updateTaskStatus(TaskStatus taskStatus) {
State stageState = stateMachine.getState();
if (stageState.isDone()) {
return;
}
TaskState taskState = taskStatus.getState();
switch (taskState) {
case FAILED:
RuntimeException failure = taskStatus.getFailures().stream()
.findFirst()
.map(this::rewriteTransportFailure)
.map(ExecutionFailureInfo::toException)
.orElse(new TrinoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"));
fail(failure);
break;
case CANCELED:
// A task should only be in the canceled state if the STAGE is cancelled
fail(new TrinoException(GENERIC_INTERNAL_ERROR, "A task is in the CANCELED state but stage is " + stageState));
break;
case ABORTED:
// A task should only be in the aborted state if the STAGE is done (ABORTED or FAILED)
fail(new TrinoException(GENERIC_INTERNAL_ERROR, "A task is in the ABORTED state but stage is " + stageState));
break;
case FLUSHING:
addFlushingTask(taskStatus.getTaskId());
break;
case FINISHED:
addFinishedTask(taskStatus.getTaskId());
break;
default:
}
if (stageState == SCHEDULED || stageState == RUNNING || stageState == FLUSHING) {
if (taskState == TaskState.RUNNING) {
stateMachine.transitionToRunning();
}
if (isFlushing()) {
stateMachine.transitionToFlushing();
}
if (isAllTaskFinished()) {
stateMachine.transitionToFinished();
}
}
}
private synchronized void addFlushingTask(TaskId taskId) {
flushingTasks.add(taskStatus.getTaskId());
}
private synchronized void addFinishedTask(TaskId taskId) {
finishedTasks.add(taskStatus.getTaskId());
flushingTasks.remove(taskStatus.getTaskId());
}
private synchronized boolean isAllTaskFinished() {
return finishedTasks.containsAll(allTasks);
}
Then in subsequent commit I would probably make flushingTasks, finishedTasks lock free, e.g: use ConcurrentHashMap.newKeySet(); (still @Guarded(this) if method touches both finishedTasks and flushingTasks at same time) and:
private void addFlushingTask(TaskId taskId) {
flushingTasks.add(taskStatus.getTaskId());
}
private void addFinishedTask(TaskId taskId) {
if (!finishedTasks.contains(taskId)) {
synchronized(this) {
// atomically move task to finished set.
// nit: MAYBE it's not needed
finishedTasks.add(taskStatus.getTaskId());
flushingTasks.remove(taskStatus.getTaskId());
}
}
}
There was a problem hiding this comment.
Yes, it is easier but it does not resolve the source problem. You are still locking this in every call of updateTaskStatus. I tried that approach and there was still a high contention on this's monitor.
There was a problem hiding this comment.
You can make addFlushingTask and addFinishedTask return boolean (true if element was added), e.g:
boolean taskStateChanged = addFlushingTask(taskStatus.getTaskId());
...
if (!stateChanged) {
return;
}
if (stageState == SCHEDULED || stageState == RUNNING || stageState == FLUSHING) {
if (taskState == TaskState.RUNNING) {
stateMachine.transitionToRunning();
}
if (isFlushing()) {
stateMachine.transitionToFlushing();
}
if (isAllTaskFinished()) {
stateMachine.transitionToFinished();
}
}
Alternatively we could probably make isFlushing() and isAllTaskFinished() lock-free somehow
There was a problem hiding this comment.
I've implemented the version where the updateTaskStatus was not called when taskState was not changed.
It helped a bit, but the lock contention was still too high to be accepted. (like totally 1 day).
Alternatively we could probably make isFlushing() and isAllTaskFinished() lock-free somehow
This what I did in that PR.
There was a problem hiding this comment.
I've implemented the version where the updateTaskStatus was not called when taskState was not changed.
It helped a bit, but the lock contention was still too high to be accepted. (like totally 1 day).
It could be because the whole updateTaskStatus was synchronized. That seems like a waste. There is no reason why transotionToXX should be synchronized and they do some non-trivial stuff like firing executor task
There was a problem hiding this comment.
Well, it could be and could not be. It is hard to say. I did not check the version:
(1) decrease the number of calls updateTaskStatus & (2) change the scope of synchronized section
when I've checked that (1) does not help and (2) does not help (separately) I decided to make this method completly lock-free (lock-less)
|
closed in favor of #14395 |
Description
We observed that there is a high lock contention on
io.trino.execution.scheduler.PipelinedStageExecution's monitor.Lock contention before:
Lock contention after:
We measured the throughput (80 trino workers, 40 r5.4xlarge nodes, 64 concurrent queries) before/after
Throughput (queries/h) before:
6000 queries / h
Throughput (queries/h) after:
6600 queries / h (10% difference)
Non-technical explanation
The Trino is able to process more queries within one hour.
Release notes
( ) This is not user-visible and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(* ) Release notes are required, with the following suggested text:
Increase throughput when running highly concurrent workloads on big Trino clusters