Skip to content

Commit a0e649e

Browse files
committed
Add flag if task is speculative to TaskStatus
1 parent ed978b5 commit a0e649e

23 files changed

+115
-28
lines changed

core/trino-main/src/main/java/io/trino/execution/MemoryTrackingRemoteTaskFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public RemoteTask createRemoteTask(
4949
Span stageSpan,
5050
TaskId taskId,
5151
InternalNode node,
52+
boolean speculative,
5253
PlanFragment fragment,
5354
Multimap<PlanNodeId, Split> initialSplits,
5455
OutputBuffers outputBuffers,
@@ -62,6 +63,7 @@ public RemoteTask createRemoteTask(
6263
stageSpan,
6364
taskId,
6465
node,
66+
speculative,
6567
fragment,
6668
initialSplits,
6769
outputBuffers,

core/trino-main/src/main/java/io/trino/execution/RemoteTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public interface RemoteTask
3939

4040
void setOutputBuffers(OutputBuffers outputBuffers);
4141

42+
void setSpeculative(boolean speculative);
43+
4244
/**
4345
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
4446
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is

core/trino-main/src/main/java/io/trino/execution/RemoteTaskFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ RemoteTask createRemoteTask(
3535
Span stageSpan,
3636
TaskId taskId,
3737
InternalNode node,
38+
boolean speculative,
3839
PlanFragment fragment,
3940
Multimap<PlanNodeId, Split> initialSplits,
4041
OutputBuffers outputBuffers,

core/trino-main/src/main/java/io/trino/execution/SqlStage.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ public synchronized Optional<RemoteTask> createTask(
246246
OutputBuffers outputBuffers,
247247
Multimap<PlanNodeId, Split> splits,
248248
Set<PlanNodeId> noMoreSplits,
249-
Optional<DataSize> estimatedMemory)
249+
Optional<DataSize> estimatedMemory,
250+
boolean speculative)
250251
{
251252
if (stateMachine.getState().isDone()) {
252253
return Optional.empty();
@@ -261,6 +262,7 @@ public synchronized Optional<RemoteTask> createTask(
261262
stateMachine.getStageSpan(),
262263
taskId,
263264
node,
265+
speculative,
264266
stateMachine.getFragment().withBucketToPartition(bucketToPartition),
265267
splits,
266268
outputBuffers,

core/trino-main/src/main/java/io/trino/execution/SqlTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class SqlTask
8888
private final String taskInstanceId;
8989
private final URI location;
9090
private final String nodeId;
91+
private final AtomicBoolean speculative = new AtomicBoolean(false);
9192
private final TaskStateMachine taskStateMachine;
9293
private final OutputBuffer outputBuffer;
9394
private final QueryContext queryContext;
@@ -376,6 +377,7 @@ else if (taskHolder.getTaskExecution() != null) {
376377
state,
377378
location,
378379
nodeId,
380+
speculative.get(),
379381
failures,
380382
queuedPartitionedDrivers,
381383
runningPartitionedDrivers,
@@ -468,7 +470,8 @@ public TaskInfo updateTask(
468470
Optional<PlanFragment> fragment,
469471
List<SplitAssignment> splitAssignments,
470472
OutputBuffers outputBuffers,
471-
Map<DynamicFilterId, Domain> dynamicFilterDomains)
473+
Map<DynamicFilterId, Domain> dynamicFilterDomains,
474+
boolean speculative)
472475
{
473476
try {
474477
// trace token must be set first to make sure failure injection for getTaskResults requests works as expected
@@ -495,6 +498,9 @@ public TaskInfo updateTask(
495498
taskExecution.addSplitAssignments(splitAssignments);
496499
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
497500
}
501+
502+
// update speculative flag
503+
this.speculative.set(speculative);
498504
}
499505
catch (Error e) {
500506
failed(e);

core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,10 +470,11 @@ public TaskInfo updateTask(
470470
Optional<PlanFragment> fragment,
471471
List<SplitAssignment> splitAssignments,
472472
OutputBuffers outputBuffers,
473-
Map<DynamicFilterId, Domain> dynamicFilterDomains)
473+
Map<DynamicFilterId, Domain> dynamicFilterDomains,
474+
boolean speculative)
474475
{
475476
try {
476-
return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains)).call();
477+
return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains, speculative)).call();
477478
}
478479
catch (Exception e) {
479480
throwIfUnchecked(e);
@@ -489,7 +490,8 @@ private TaskInfo doUpdateTask(
489490
Optional<PlanFragment> fragment,
490491
List<SplitAssignment> splitAssignments,
491492
OutputBuffers outputBuffers,
492-
Map<DynamicFilterId, Domain> dynamicFilterDomains)
493+
Map<DynamicFilterId, Domain> dynamicFilterDomains,
494+
boolean speculative)
493495
{
494496
requireNonNull(session, "session is null");
495497
requireNonNull(taskId, "taskId is null");
@@ -528,7 +530,7 @@ private TaskInfo doUpdateTask(
528530
});
529531

530532
sqlTask.recordHeartbeat();
531-
return sqlTask.updateTask(session, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains);
533+
return sqlTask.updateTask(session, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains, speculative);
532534
}
533535

534536
/**

core/trino-main/src/main/java/io/trino/execution/TaskInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ public String toString()
130130
.toString();
131131
}
132132

133-
public static TaskInfo createInitialTask(TaskId taskId, URI location, String nodeId, Optional<List<PipelinedBufferInfo>> pipelinedBufferStates, TaskStats taskStats)
133+
public static TaskInfo createInitialTask(TaskId taskId, URI location, String nodeId, boolean speculative, Optional<List<PipelinedBufferInfo>> pipelinedBufferStates, TaskStats taskStats)
134134
{
135135
return new TaskInfo(
136-
initialTaskStatus(taskId, location, nodeId),
136+
initialTaskStatus(taskId, location, nodeId, speculative),
137137
DateTime.now(),
138138
new OutputBufferInfo(
139139
"UNINITIALIZED",

core/trino-main/src/main/java/io/trino/execution/TaskStatus.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class TaskStatus
5252
private final TaskState state;
5353
private final URI self;
5454
private final String nodeId;
55+
private final boolean speculative;
5556

5657
private final int queuedPartitionedDrivers;
5758
private final long queuedPartitionedSplitsWeight;
@@ -80,6 +81,7 @@ public TaskStatus(
8081
@JsonProperty("state") TaskState state,
8182
@JsonProperty("self") URI self,
8283
@JsonProperty("nodeId") String nodeId,
84+
@JsonProperty("speculative") boolean speculative,
8385
@JsonProperty("failures") List<ExecutionFailureInfo> failures,
8486
@JsonProperty("queuedPartitionedDrivers") int queuedPartitionedDrivers,
8587
@JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers,
@@ -104,6 +106,7 @@ public TaskStatus(
104106
this.state = requireNonNull(state, "state is null");
105107
this.self = requireNonNull(self, "self is null");
106108
this.nodeId = requireNonNull(nodeId, "nodeId is null");
109+
this.speculative = speculative;
107110

108111
checkArgument(queuedPartitionedDrivers >= 0, "queuedPartitionedDrivers must be positive");
109112
this.queuedPartitionedDrivers = queuedPartitionedDrivers;
@@ -169,6 +172,12 @@ public String getNodeId()
169172
return nodeId;
170173
}
171174

175+
@JsonProperty
176+
public boolean isSpeculative()
177+
{
178+
return speculative;
179+
}
180+
172181
@JsonProperty
173182
public List<ExecutionFailureInfo> getFailures()
174183
{
@@ -268,7 +277,7 @@ public String toString()
268277
.toString();
269278
}
270279

271-
public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String nodeId)
280+
public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String nodeId, boolean speculative)
272281
{
273282
return new TaskStatus(
274283
taskId,
@@ -277,6 +286,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n
277286
PLANNED,
278287
location,
279288
nodeId,
289+
speculative,
280290
ImmutableList.of(),
281291
0,
282292
0,
@@ -303,6 +313,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List<E
303313
state,
304314
taskStatus.getSelf(),
305315
taskStatus.getNodeId(),
316+
false,
306317
exceptions,
307318
taskStatus.getQueuedPartitionedDrivers(),
308319
taskStatus.getRunningPartitionedDrivers(),

core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1403,7 +1403,8 @@ public Optional<RemoteTask> schedule(int partitionId, ExchangeSinkInstanceHandle
14031403
outputBuffers,
14041404
splits,
14051405
noMoreSplits,
1406-
Optional.of(partition.getMemoryRequirements().getRequiredMemory()));
1406+
Optional.of(partition.getMemoryRequirements().getRequiredMemory()),
1407+
false); // TODO pass correct value here
14071408
task.ifPresent(remoteTask -> {
14081409
partition.addTask(remoteTask, outputBuffers);
14091410
runningPartitions.add(partitionId);

core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,8 @@ public synchronized Optional<RemoteTask> scheduleTask(
299299
outputBuffers,
300300
initialSplits,
301301
ImmutableSet.of(),
302-
Optional.empty());
302+
Optional.empty(),
303+
false);
303304

304305
if (optionalTask.isEmpty()) {
305306
return Optional.empty();

0 commit comments

Comments
 (0)