Skip to content

Commit fc20950

Browse files
committed
Convert TaskId to record
1 parent 75df10a commit fc20950

31 files changed

+95
-127
lines changed

core/trino-main/src/main/java/io/trino/connector/system/TaskSystemTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
113113
nodeId,
114114

115115
taskStatus.getTaskId().toString(),
116-
taskStatus.getTaskId().getStageId().toString(),
117-
taskStatus.getTaskId().getQueryId().toString(),
116+
taskStatus.getTaskId().stageId().toString(),
117+
taskStatus.getTaskId().queryId().toString(),
118118
taskStatus.getState().toString(),
119119

120120
(long) stats.getTotalDrivers(),

core/trino-main/src/main/java/io/trino/execution/SqlQueryManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void setOutputInfoListener(QueryId queryId, Consumer<QueryOutputInfo> lis
176176
@Override
177177
public void outputTaskFailed(TaskId taskId, Throwable failure)
178178
{
179-
queryTracker.getQuery(taskId.getQueryId()).outputTaskFailed(taskId, failure);
179+
queryTracker.getQuery(taskId.queryId()).outputTaskFailed(taskId, failure);
180180
}
181181

182182
@Override

core/trino-main/src/main/java/io/trino/execution/SqlTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,8 @@ private SqlTaskExecution tryCreateSqlTaskExecution(Session session, Span stageSp
555555

556556
taskSpan.set(tracer.spanBuilder("task")
557557
.setParent(Context.current().with(stageSpan))
558-
.setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
559-
.setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
558+
.setAttribute(TrinoAttributes.QUERY_ID, taskId.queryId().toString())
559+
.setAttribute(TrinoAttributes.STAGE_ID, taskId.stageId().toString())
560560
.setAttribute(TrinoAttributes.TASK_ID, taskId.toString())
561561
.startSpan());
562562

core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,10 @@ private DriverSplitRunnerFactory(DriverFactory driverFactory, Tracer tracer, boo
595595
this.pipelineContext = taskContext.addPipelineContext(driverFactory.getPipelineId(), driverFactory.isInputDriver(), driverFactory.isOutputDriver(), partitioned);
596596
this.pipelineSpan = tracer.spanBuilder("pipeline")
597597
.setParent(Context.current().with(taskSpan))
598-
.setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
599-
.setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
598+
.setAttribute(TrinoAttributes.QUERY_ID, taskId.queryId().toString())
599+
.setAttribute(TrinoAttributes.STAGE_ID, taskId.stageId().toString())
600600
.setAttribute(TrinoAttributes.TASK_ID, taskId.toString())
601-
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.getStageId() + "-" + pipelineContext.getPipelineId())
601+
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.stageId() + "-" + pipelineContext.getPipelineId())
602602
.startSpan();
603603
}
604604

core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public SqlTaskManager(
233233
taskId,
234234
locationFactory.createLocalTaskLocation(taskId),
235235
nodeInfo.getNodeId(),
236-
queryContexts.getUnchecked(taskId.getQueryId()),
236+
queryContexts.getUnchecked(taskId.queryId()),
237237
tracer,
238238
sqlTaskExecutionFactory,
239239
taskNotificationExecutor,

core/trino-main/src/main/java/io/trino/execution/TaskId.java

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,69 +25,37 @@
2525
import static io.airlift.slice.SizeOf.instanceSize;
2626
import static io.trino.spi.QueryId.parseDottedId;
2727
import static java.lang.Integer.parseInt;
28-
import static java.lang.String.join;
2928
import static java.util.Objects.requireNonNull;
3029

31-
public class TaskId
30+
public record TaskId(String fullId, StageId stageId, int partitionId, int attemptId)
3231
{
3332
private static final int INSTANCE_SIZE = instanceSize(TaskId.class);
3433

3534
@JsonCreator
36-
public static TaskId valueOf(String taskId)
35+
public static TaskId valueOf(String fullId)
3736
{
38-
return new TaskId(taskId);
37+
List<String> parts = parseDottedId(fullId, 4, "taskId");
38+
return new TaskId(fullId, new StageId(new QueryId(parts.get(0)), parseInt(parts.get(1))), parseInt(parts.get(2)), parseInt(parts.get(3)));
3939
}
4040

41-
private final String fullId;
42-
private final StageId stageId;
43-
private final int partitionId;
44-
private final int attemptId;
45-
46-
public TaskId(StageId stageId, int partitionId, int attemptId)
41+
public TaskId
4742
{
48-
this.stageId = requireNonNull(stageId, "stageId is null");
43+
requireNonNull(fullId, "fullId is null");
44+
requireNonNull(stageId, "stageId is null");
4945
checkArgument(partitionId >= 0, "partitionId is negative: %s", partitionId);
5046
checkArgument(attemptId >= 0, "attemptId is negative: %s", attemptId);
51-
this.partitionId = partitionId;
52-
this.attemptId = attemptId;
53-
54-
// There is a strange JDK bug related to the CompactStrings implementation in JDK20+ which causes some fullId values
55-
// to get corrupted when this particular line is JIT-optimized. Changing implicit concatenation to a String.join call
56-
// seems to mitigate this issue. See: https://github.com/trinodb/trino/issues/18272 for more details.
57-
this.fullId = join(".", stageId.toString(), String.valueOf(partitionId), String.valueOf(attemptId));
5847
}
5948

60-
private TaskId(String fullId)
49+
public TaskId(StageId stageId, int partitionId, int attemptId)
6150
{
62-
this.fullId = requireNonNull(fullId, "fullId is null");
63-
List<String> parts = parseDottedId(fullId, 4, "taskId");
64-
this.stageId = new StageId(new QueryId(parts.get(0)), parseInt(parts.get(1)));
65-
this.partitionId = parseInt(parts.get(2));
66-
this.attemptId = parseInt(parts.get(3));
67-
checkArgument(partitionId >= 0, "partitionId is negative: %s", partitionId);
68-
checkArgument(attemptId >= 0, "attemptId is negative: %s", attemptId);
51+
this("%s.%s.%s".formatted(stageId.toString(), String.valueOf(partitionId), String.valueOf(attemptId)), stageId, partitionId, attemptId);
6952
}
7053

71-
public QueryId getQueryId()
54+
public QueryId queryId()
7255
{
7356
return stageId.queryId();
7457
}
7558

76-
public StageId getStageId()
77-
{
78-
return stageId;
79-
}
80-
81-
public int getPartitionId()
82-
{
83-
return partitionId;
84-
}
85-
86-
public int getAttemptId()
87-
{
88-
return attemptId;
89-
}
90-
9159
@Override
9260
@JsonValue
9361
public String toString()

core/trino-main/src/main/java/io/trino/execution/executor/dedicated/SplitProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ public void run(SchedulerContext context)
5959
{
6060
Span splitSpan = tracer.spanBuilder("split")
6161
.setParent(Context.current().with(split.getPipelineSpan()))
62-
.setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
63-
.setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
62+
.setAttribute(TrinoAttributes.QUERY_ID, taskId.queryId().toString())
63+
.setAttribute(TrinoAttributes.STAGE_ID, taskId.stageId().toString())
6464
.setAttribute(TrinoAttributes.TASK_ID, taskId.toString())
65-
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.getStageId() + "-" + split.getPipelineId())
65+
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.stageId() + "-" + split.getPipelineId())
6666
.setAttribute(TrinoAttributes.SPLIT_ID, taskId + "-" + splitId)
6767
.startSpan();
6868

core/trino-main/src/main/java/io/trino/execution/executor/timesharing/TimeSharingTaskExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,10 +358,10 @@ public List<ListenableFuture<Void>> enqueueSplits(TaskHandle taskHandle, boolean
358358

359359
Span splitSpan = tracer.spanBuilder(intermediate ? "split (intermediate)" : "split (leaf)")
360360
.setParent(Context.current().with(taskSplit.getPipelineSpan()))
361-
.setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
362-
.setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
361+
.setAttribute(TrinoAttributes.QUERY_ID, taskId.queryId().toString())
362+
.setAttribute(TrinoAttributes.STAGE_ID, taskId.stageId().toString())
363363
.setAttribute(TrinoAttributes.TASK_ID, taskId.toString())
364-
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.getStageId() + "-" + taskSplit.getPipelineId())
364+
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.stageId() + "-" + taskSplit.getPipelineId())
365365
.setAttribute(TrinoAttributes.SPLIT_ID, taskId + "-" + splitId)
366366
.startSpan();
367367

core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,7 +1400,7 @@ public void fail(Throwable failureCause, Optional<StageId> failedStageId)
14001400

14011401
public void reportTaskFailure(TaskId taskId, Throwable failureCause)
14021402
{
1403-
StageExecution stageExecution = stageExecutions.get(taskId.getStageId());
1403+
StageExecution stageExecution = stageExecutions.get(taskId.stageId());
14041404
if (stageExecution == null) {
14051405
return;
14061406
}
@@ -1411,7 +1411,7 @@ public void reportTaskFailure(TaskId taskId, Throwable failureCause)
14111411
}
14121412

14131413
stageExecution.failTask(taskId, failureCause);
1414-
stateMachine.transitionToFailed(failureCause, Optional.of(taskId.getStageId()));
1414+
stateMachine.transitionToFailed(failureCause, Optional.of(taskId.stageId()));
14151415
stageExecutions.values().forEach(StageExecution::abort);
14161416
}
14171417

core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public synchronized void fail(Throwable failureCause)
277277
@Override
278278
public synchronized void failTask(TaskId taskId, Throwable failureCause)
279279
{
280-
RemoteTask task = requireNonNull(tasks.get(taskId.getPartitionId()), () -> "task not found: " + taskId);
280+
RemoteTask task = requireNonNull(tasks.get(taskId.partitionId()), () -> "task not found: " + taskId);
281281
task.failLocallyImmediately(failureCause);
282282
fail(failureCause);
283283
}
@@ -337,7 +337,7 @@ public synchronized Optional<RemoteTask> scheduleTask(
337337
taskLifecycleListener.taskCreated(stage.getFragment().getId(), task);
338338

339339
// update output buffers
340-
OutputBufferId outputBufferId = new OutputBufferId(task.getTaskId().getPartitionId());
340+
OutputBufferId outputBufferId = new OutputBufferId(task.getTaskId().partitionId());
341341
updateSourceTasksOutputBuffers(outputBufferManager -> outputBufferManager.addOutputBuffer(outputBufferId));
342342

343343
return Optional.of(task);
@@ -589,7 +589,7 @@ private static Split createExchangeSplit(RemoteTask sourceTask, RemoteTask desti
589589
{
590590
// Fetch the results from the buffer assigned to the task based on id
591591
URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
592-
URI splitLocation = uriBuilderFrom(exchangeLocation).appendPath("results").appendPath(String.valueOf(destinationTask.getTaskId().getPartitionId())).build();
592+
URI splitLocation = uriBuilderFrom(exchangeLocation).appendPath("results").appendPath(String.valueOf(destinationTask.getTaskId().partitionId())).build();
593593
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(sourceTask.getTaskId(), splitLocation.toString())));
594594
}
595595

0 commit comments

Comments
 (0)