createStuckSplitTasksInterrupter(
* The detection is invoked periodically with the frequency of {@link StuckSplitTasksInterrupter#stuckSplitsDetectionInterval}.
* A thread gets interrupted once the split processing continues beyond {@link StuckSplitTasksInterrupter#interruptStuckSplitTasksTimeout} and
* the split threaddump matches with {@link StuckSplitTasksInterrupter#stuckSplitStackTracePredicate}.
- *
+ *
* There is a potential race condition for this {@link StuckSplitTasksInterrupter} class. The problematic flow is that we may
* kill a task that is long-running, but not really stuck on the code that matches {@link StuckSplitTasksInterrupter#stuckSplitStackTracePredicate} (e.g. JONI code).
* Consider the following example:
diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java
index 9cd84291110b..d4d479d092ce 100644
--- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java
+++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java
@@ -18,6 +18,10 @@
import io.airlift.log.Logger;
import io.airlift.stats.Distribution;
import io.airlift.units.Duration;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.operator.BlockedReason;
@@ -29,6 +33,7 @@
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableScanNode;
+import io.trino.tracing.TrinoAttributes;
import io.trino.util.Failures;
import io.trino.util.Optionals;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
@@ -82,6 +87,7 @@ public class StageStateMachine
private final StateMachine stageState;
private final StateMachine> finalStageInfo;
+ private final Span stageSpan;
private final AtomicReference failureCause = new AtomicReference<>();
private final AtomicReference schedulingComplete = new AtomicReference<>();
@@ -98,6 +104,8 @@ public StageStateMachine(
PlanFragment fragment,
Map tables,
Executor executor,
+ Tracer tracer,
+ Span querySpan,
SplitSchedulerStats schedulerStats)
{
this.stageId = requireNonNull(stageId, "stageId is null");
@@ -109,6 +117,20 @@ public StageStateMachine(
stageState.addStateChangeListener(state -> log.debug("Stage %s is %s", stageId, state));
finalStageInfo = new StateMachine<>("final stage " + stageId, executor, Optional.empty());
+
+ stageSpan = tracer.spanBuilder("stage")
+ .setParent(Context.current().with(querySpan))
+ .setAttribute(TrinoAttributes.QUERY_ID, stageId.getQueryId().toString())
+ .setAttribute(TrinoAttributes.STAGE_ID, stageId.toString())
+ .startSpan();
+
+ stageState.addStateChangeListener(state -> {
+ stageSpan.addEvent("stage_state", Attributes.of(
+ TrinoAttributes.EVENT_STATE, state.toString()));
+ if (state.isDone()) {
+ stageSpan.end();
+ }
+ });
}
public StageId getStageId()
@@ -126,6 +148,11 @@ public PlanFragment getFragment()
return fragment;
}
+ public Span getStageSpan()
+ {
+ return stageSpan;
+ }
+
/**
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/PrioritizedSplitRunner.java b/core/trino-main/src/main/java/io/trino/execution/executor/PrioritizedSplitRunner.java
index 5b98af62e0a3..0d4f3a68f169 100644
--- a/core/trino-main/src/main/java/io/trino/execution/executor/PrioritizedSplitRunner.java
+++ b/core/trino-main/src/main/java/io/trino/execution/executor/PrioritizedSplitRunner.java
@@ -21,7 +21,11 @@
import io.airlift.stats.CpuTimer;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.trino.execution.SplitRunner;
+import io.trino.tracing.TrinoAttributes;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,6 +33,7 @@
import java.util.concurrent.atomic.AtomicReference;
import static io.trino.operator.Operator.NOT_BLOCKED;
+import static io.trino.tracing.ScopedSpan.scopedSpan;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -50,6 +55,9 @@ public class PrioritizedSplitRunner
private final long workerId;
private final SplitRunner split;
+ private final Span splitSpan;
+
+ private final Tracer tracer;
private final Ticker ticker;
private final SettableFuture finishedFuture = SettableFuture.create();
@@ -75,7 +83,10 @@ public class PrioritizedSplitRunner
PrioritizedSplitRunner(
TaskHandle taskHandle,
+ int splitId,
SplitRunner split,
+ Span splitSpan,
+ Tracer tracer,
Ticker ticker,
CounterStat globalCpuTimeMicros,
CounterStat globalScheduledTimeMicros,
@@ -83,8 +94,10 @@ public class PrioritizedSplitRunner
TimeStat unblockedQuantaWallTime)
{
this.taskHandle = requireNonNull(taskHandle, "taskHandle is null");
- this.splitId = taskHandle.getNextSplitId();
+ this.splitId = splitId;
this.split = requireNonNull(split, "split is null");
+ this.splitSpan = requireNonNull(splitSpan, "splitSpan is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.ticker = requireNonNull(ticker, "ticker is null");
this.workerId = NEXT_WORKER_ID.getAndIncrement();
this.globalCpuTimeMicros = requireNonNull(globalCpuTimeMicros, "globalCpuTimeMicros is null");
@@ -119,6 +132,12 @@ public void destroy()
catch (RuntimeException e) {
log.error(e, "Error closing split for task %s", taskHandle.getTaskId());
}
+ finally {
+ splitSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, getScheduledNanos());
+ splitSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, getCpuTimeNanos());
+ splitSpan.setAttribute(TrinoAttributes.SPLIT_WAIT_TIME_NANOS, getWaitNanos());
+ splitSpan.end();
+ }
}
public long getCreatedNanos()
@@ -152,7 +171,11 @@ public long getWaitNanos()
public ListenableFuture process()
{
- try {
+ Span span = tracer.spanBuilder("process")
+ .setParent(Context.current().with(splitSpan))
+ .startSpan();
+
+ try (var ignored = scopedSpan(span)) {
long startNanos = ticker.read();
start.compareAndSet(0, startNanos);
lastReady.compareAndSet(0, startNanos);
@@ -185,6 +208,10 @@ public ListenableFuture process()
globalCpuTimeMicros.update(quantaCpuNanos / 1000);
globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);
+ span.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, quantaCpuNanos);
+ span.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, quantaScheduledNanos);
+ span.setAttribute(TrinoAttributes.SPLIT_BLOCKED, blocked != NOT_BLOCKED);
+
return blocked;
}
catch (Throwable e) {
diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java b/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java
index c501e223ccfc..da10e1532feb 100644
--- a/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java
+++ b/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java
@@ -25,11 +25,15 @@
import io.airlift.stats.TimeDistribution;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.trino.execution.SplitRunner;
import io.trino.execution.TaskId;
import io.trino.execution.TaskManagerConfig;
import io.trino.spi.TrinoException;
import io.trino.spi.VersionEmbedder;
+import io.trino.tracing.TrinoAttributes;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
@@ -68,6 +72,7 @@
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.threadsNamed;
+import static io.airlift.tracing.Tracing.noopTracer;
import static io.trino.execution.executor.MultilevelSplitQueue.computeLevel;
import static io.trino.version.EmbedVersion.testingVersionEmbedder;
import static java.lang.Math.min;
@@ -92,6 +97,7 @@ public class TaskExecutor
private final int guaranteedNumberOfDriversPerTask;
private final int maximumNumberOfDriversPerTask;
private final VersionEmbedder versionEmbedder;
+ private final Tracer tracer;
private final Ticker ticker;
@@ -163,7 +169,7 @@ public class TaskExecutor
private volatile boolean closed;
@Inject
- public TaskExecutor(TaskManagerConfig config, VersionEmbedder versionEmbedder, MultilevelSplitQueue splitQueue)
+ public TaskExecutor(TaskManagerConfig config, VersionEmbedder versionEmbedder, Tracer tracer, MultilevelSplitQueue splitQueue)
{
this(
config.getMaxWorkerThreads(),
@@ -172,6 +178,7 @@ public TaskExecutor(TaskManagerConfig config, VersionEmbedder versionEmbedder, M
config.getMaxDriversPerTask(),
config.getInterruptStuckSplitTasksWarningThreshold(),
versionEmbedder,
+ tracer,
splitQueue,
Ticker.systemTicker());
}
@@ -179,13 +186,13 @@ public TaskExecutor(TaskManagerConfig config, VersionEmbedder versionEmbedder, M
@VisibleForTesting
public TaskExecutor(int runnerThreads, int minDrivers, int guaranteedNumberOfDriversPerTask, int maximumNumberOfDriversPerTask, Ticker ticker)
{
- this(runnerThreads, minDrivers, guaranteedNumberOfDriversPerTask, maximumNumberOfDriversPerTask, new Duration(10, TimeUnit.MINUTES), testingVersionEmbedder(), new MultilevelSplitQueue(2), ticker);
+ this(runnerThreads, minDrivers, guaranteedNumberOfDriversPerTask, maximumNumberOfDriversPerTask, new Duration(10, TimeUnit.MINUTES), testingVersionEmbedder(), noopTracer(), new MultilevelSplitQueue(2), ticker);
}
@VisibleForTesting
public TaskExecutor(int runnerThreads, int minDrivers, int guaranteedNumberOfDriversPerTask, int maximumNumberOfDriversPerTask, MultilevelSplitQueue splitQueue, Ticker ticker)
{
- this(runnerThreads, minDrivers, guaranteedNumberOfDriversPerTask, maximumNumberOfDriversPerTask, new Duration(10, TimeUnit.MINUTES), testingVersionEmbedder(), splitQueue, ticker);
+ this(runnerThreads, minDrivers, guaranteedNumberOfDriversPerTask, maximumNumberOfDriversPerTask, new Duration(10, TimeUnit.MINUTES), testingVersionEmbedder(), noopTracer(), splitQueue, ticker);
}
@VisibleForTesting
@@ -196,6 +203,7 @@ public TaskExecutor(
int maximumNumberOfDriversPerTask,
Duration stuckSplitsWarningThreshold,
VersionEmbedder versionEmbedder,
+ Tracer tracer,
MultilevelSplitQueue splitQueue,
Ticker ticker)
{
@@ -209,6 +217,7 @@ public TaskExecutor(
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor);
this.runnerThreads = runnerThreads;
this.versionEmbedder = requireNonNull(versionEmbedder, "versionEmbedder is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.ticker = requireNonNull(ticker, "ticker is null");
this.stuckSplitsWarningThreshold = requireNonNull(stuckSplitsWarningThreshold, "stuckSplitsWarningThreshold is null");
@@ -340,9 +349,24 @@ public List> enqueueSplits(TaskHandle taskHandle, boolean
List> finishedFutures = new ArrayList<>(taskSplits.size());
synchronized (this) {
for (SplitRunner taskSplit : taskSplits) {
+ TaskId taskId = taskHandle.getTaskId();
+ int splitId = taskHandle.getNextSplitId();
+
+ Span splitSpan = tracer.spanBuilder(intermediate ? "split (intermediate)" : "split (leaf)")
+ .setParent(Context.current().with(taskSplit.getPipelineSpan()))
+ .setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
+ .setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
+ .setAttribute(TrinoAttributes.TASK_ID, taskId.toString())
+ .setAttribute(TrinoAttributes.PIPELINE_ID, taskId.getStageId() + "-" + taskSplit.getPipelineId())
+ .setAttribute(TrinoAttributes.SPLIT_ID, taskId + "-" + splitId)
+ .startSpan();
+
PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(
taskHandle,
+ splitId,
taskSplit,
+ splitSpan,
+ tracer,
ticker,
globalCpuTimeMicros,
globalScheduledTimeMicros,
diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
index 2aa19f767187..8c941109eb33 100644
--- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
+++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
@@ -33,6 +33,7 @@
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.BasicStageStats;
@@ -168,6 +169,7 @@ public class EventDrivenFaultTolerantQueryScheduler
private final NodeTaskMap nodeTaskMap;
private final ExecutorService queryExecutor;
private final ScheduledExecutorService scheduledExecutorService;
+ private final Tracer tracer;
private final SplitSchedulerStats schedulerStats;
private final PartitionMemoryEstimatorFactory memoryEstimatorFactory;
private final NodePartitioningManager nodePartitioningManager;
@@ -195,6 +197,7 @@ public EventDrivenFaultTolerantQueryScheduler(
NodeTaskMap nodeTaskMap,
ExecutorService queryExecutor,
ScheduledExecutorService scheduledExecutorService,
+ Tracer tracer,
SplitSchedulerStats schedulerStats,
PartitionMemoryEstimatorFactory memoryEstimatorFactory,
NodePartitioningManager nodePartitioningManager,
@@ -216,6 +219,7 @@ public EventDrivenFaultTolerantQueryScheduler(
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.scheduledExecutorService = requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
this.memoryEstimatorFactory = requireNonNull(memoryEstimatorFactory, "memoryEstimatorFactory is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "partitioningSchemeFactory is null");
@@ -279,7 +283,9 @@ public synchronized void start()
summarizeTaskInfo,
nodeTaskMap,
queryExecutor,
- scheduledExecutorService, schedulerStats,
+ scheduledExecutorService,
+ tracer,
+ schedulerStats,
memoryEstimatorFactory,
partitioningSchemeFactory,
exchangeManager,
@@ -458,6 +464,7 @@ private static class Scheduler
private final NodeTaskMap nodeTaskMap;
private final ExecutorService queryExecutor;
private final ScheduledExecutorService scheduledExecutorService;
+ private final Tracer tracer;
private final SplitSchedulerStats schedulerStats;
private final PartitionMemoryEstimatorFactory memoryEstimatorFactory;
private final FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory;
@@ -501,6 +508,7 @@ public Scheduler(
NodeTaskMap nodeTaskMap,
ExecutorService queryExecutor,
ScheduledExecutorService scheduledExecutorService,
+ Tracer tracer,
SplitSchedulerStats schedulerStats,
PartitionMemoryEstimatorFactory memoryEstimatorFactory,
FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory,
@@ -525,6 +533,7 @@ public Scheduler(
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.scheduledExecutorService = requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
this.memoryEstimatorFactory = requireNonNull(memoryEstimatorFactory, "memoryEstimatorFactory is null");
this.partitioningSchemeFactory = requireNonNull(partitioningSchemeFactory, "partitioningSchemeFactory is null");
@@ -782,6 +791,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, int sch
summarizeTaskInfo,
nodeTaskMap,
queryStateMachine.getStateMachineExecutor(),
+ tracer,
schedulerStats);
closer.register(stage::abort);
stageRegistry.add(stage);
@@ -811,6 +821,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, int sch
EventDrivenTaskSource taskSource = closer.register(taskSourceFactory.create(
session,
+ stage.getStageSpan(),
fragment,
sourceExchanges.buildOrThrow(),
partitioningSchemeFactory.get(fragment.getPartitioning()),
diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java
index 2ea724d34985..c3b564188e56 100644
--- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java
+++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java
@@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
+import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.execution.ForQueryExecution;
import io.trino.execution.QueryManagerConfig;
@@ -106,6 +107,7 @@ public EventDrivenTaskSourceFactory(
public EventDrivenTaskSource create(
Session session,
+ Span stageSpan,
PlanFragment fragment,
Map sourceExchanges,
FaultTolerantPartitioningScheme sourcePartitioningScheme,
@@ -125,7 +127,7 @@ public EventDrivenTaskSource create(
tableExecuteContextManager,
sourceExchanges,
remoteSources.build(),
- () -> splitSourceFactory.createSplitSources(session, fragment),
+ () -> splitSourceFactory.createSplitSources(session, stageSpan, fragment),
createSplitAssigner(
session,
fragment,
diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java
index 855185fc742c..0c7a2154061b 100644
--- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java
+++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java
@@ -25,6 +25,8 @@
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.exchange.DirectExchangeInput;
import io.trino.execution.BasicStageStats;
@@ -199,6 +201,7 @@ public PipelinedQueryScheduler(
FailureDetector failureDetector,
NodeTaskMap nodeTaskMap,
ExecutionPolicy executionPolicy,
+ Tracer tracer,
SplitSchedulerStats schedulerStats,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
@@ -224,6 +227,7 @@ public PipelinedQueryScheduler(
metadata,
remoteTaskFactory,
nodeTaskMap,
+ tracer,
schedulerStats,
plan,
summarizeTaskInfo);
@@ -1025,10 +1029,11 @@ private static StageScheduler createStageScheduler(
TableExecuteContextManager tableExecuteContextManager)
{
Session session = queryStateMachine.getSession();
+ Span stageSpan = stageExecution.getStageSpan();
PlanFragment fragment = stageExecution.getFragment();
PartitioningHandle partitioningHandle = fragment.getPartitioning();
Optional partitionCount = fragment.getPartitionCount();
- Map splitSources = splitSourceFactory.createSplitSources(session, fragment);
+ Map splitSources = splitSourceFactory.createSplitSources(session, stageSpan, fragment);
if (!splitSources.isEmpty()) {
queryStateMachine.addStateChangeListener(new StateChangeListener<>()
{
diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
index 3e01c9e3966a..b46f13322a80 100644
--- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
+++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java
@@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import io.airlift.log.Logger;
+import io.opentelemetry.api.trace.Span;
import io.trino.exchange.DirectExchangeInput;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.RemoteTask;
@@ -552,6 +553,12 @@ public int getAttemptId()
return attempt;
}
+ @Override
+ public Span getStageSpan()
+ {
+ return stage.getStageSpan();
+ }
+
@Override
public PlanFragment getFragment()
{
diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java
index 89fc0bc73e2e..d283010b7f65 100644
--- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java
+++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageExecution.java
@@ -14,6 +14,7 @@
package io.trino.execution.scheduler;
import com.google.common.collect.Multimap;
+import io.opentelemetry.api.trace.Span;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.StageId;
@@ -36,6 +37,8 @@ public interface StageExecution
int getAttemptId();
+ Span getStageSpan();
+
PlanFragment getFragment();
boolean isAnyTaskBlocked();
diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageManager.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageManager.java
index 524424ca968d..2bc6391858b2 100644
--- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageManager.java
+++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageManager.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.graph.Traverser;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.execution.BasicStageStats;
import io.trino.execution.NodeTaskMap;
@@ -65,6 +66,7 @@ static StageManager create(
Metadata metadata,
RemoteTaskFactory taskFactory,
NodeTaskMap nodeTaskMap,
+ Tracer tracer,
SplitSchedulerStats schedulerStats,
SubPlan planTree,
boolean summarizeTaskInfo)
@@ -88,6 +90,7 @@ static StageManager create(
summarizeTaskInfo,
nodeTaskMap,
queryStateMachine.getStateMachineExecutor(),
+ tracer,
schedulerStats);
StageId stageId = stage.getStageId();
stages.put(stageId, stage);
diff --git a/core/trino-main/src/main/java/io/trino/metadata/Catalog.java b/core/trino-main/src/main/java/io/trino/metadata/Catalog.java
index cbbab652291a..f0cf057f1c78 100644
--- a/core/trino-main/src/main/java/io/trino/metadata/Catalog.java
+++ b/core/trino-main/src/main/java/io/trino/metadata/Catalog.java
@@ -134,7 +134,11 @@ private static CatalogTransaction beginTransaction(
transactionHandle = connector.beginTransaction(isolationLevel, readOnly, autoCommitContext);
}
- return new CatalogTransaction(connectorServices.getCatalogHandle(), connector, transactionHandle);
+ return new CatalogTransaction(
+ connectorServices.getTracer(),
+ connectorServices.getCatalogHandle(),
+ connector,
+ transactionHandle);
}
@Override
diff --git a/core/trino-main/src/main/java/io/trino/metadata/CatalogTransaction.java b/core/trino-main/src/main/java/io/trino/metadata/CatalogTransaction.java
index d4e7ac751d0d..13cb22b3aa60 100644
--- a/core/trino-main/src/main/java/io/trino/metadata/CatalogTransaction.java
+++ b/core/trino-main/src/main/java/io/trino/metadata/CatalogTransaction.java
@@ -13,12 +13,16 @@
*/
package io.trino.metadata;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
+import io.trino.connector.informationschema.InformationSchemaMetadata;
+import io.trino.connector.system.SystemTablesMetadata;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.tracing.TracingConnectorMetadata;
import javax.annotation.concurrent.GuardedBy;
@@ -29,6 +33,7 @@
public class CatalogTransaction
{
+ private final Tracer tracer;
private final CatalogHandle catalogHandle;
private final Connector connector;
private final ConnectorTransactionHandle transactionHandle;
@@ -37,10 +42,12 @@ public class CatalogTransaction
private final AtomicBoolean finished = new AtomicBoolean();
public CatalogTransaction(
+ Tracer tracer,
CatalogHandle catalogHandle,
Connector connector,
ConnectorTransactionHandle transactionHandle)
{
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.connector = requireNonNull(connector, "connector is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
@@ -62,6 +69,7 @@ public synchronized ConnectorMetadata getConnectorMetadata(Session session)
if (connectorMetadata == null) {
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
connectorMetadata = connector.getMetadata(connectorSession, transactionHandle);
+ connectorMetadata = tracingConnectorMetadata(catalogHandle.getCatalogName(), connectorMetadata);
}
return connectorMetadata;
}
@@ -85,4 +93,12 @@ public void abort()
connector.rollback(transactionHandle);
}
}
+
+ private ConnectorMetadata tracingConnectorMetadata(String catalogName, ConnectorMetadata delegate)
+ {
+ if ((delegate instanceof SystemTablesMetadata) || (delegate instanceof InformationSchemaMetadata)) {
+ return delegate;
+ }
+ return new TracingConnectorMetadata(tracer, catalogName, delegate);
+ }
}
diff --git a/core/trino-main/src/main/java/io/trino/server/HttpRemoteTaskFactory.java b/core/trino-main/src/main/java/io/trino/server/HttpRemoteTaskFactory.java
index 974084c7aaab..e9a38930e1c9 100644
--- a/core/trino-main/src/main/java/io/trino/server/HttpRemoteTaskFactory.java
+++ b/core/trino-main/src/main/java/io/trino/server/HttpRemoteTaskFactory.java
@@ -20,6 +20,8 @@
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.execution.DynamicFiltersCollector.VersionedDynamicFilterDomains;
import io.trino.execution.LocationFactory;
@@ -77,6 +79,7 @@ public class HttpRemoteTaskFactory
private final ThreadPoolExecutorMBean executorMBean;
private final ScheduledExecutorService updateScheduledExecutor;
private final ScheduledExecutorService errorScheduledExecutor;
+ private final Tracer tracer;
private final RemoteTaskStats stats;
private final DynamicFilterService dynamicFilterService;
@@ -91,6 +94,7 @@ public HttpRemoteTaskFactory(
JsonCodec taskInfoCodec,
JsonCodec taskUpdateRequestCodec,
JsonCodec failTaskRequestCoded,
+ Tracer tracer,
RemoteTaskStats stats,
DynamicFilterService dynamicFilterService)
{
@@ -108,6 +112,7 @@ public HttpRemoteTaskFactory(
this.coreExecutor = newCachedThreadPool(daemonThreadsNamed("remote-task-callback-%s"));
this.executor = new BoundedExecutor(coreExecutor, config.getRemoteTaskMaxCallbackThreads());
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreExecutor);
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.stats = requireNonNull(stats, "stats is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
@@ -133,6 +138,7 @@ public void stop()
@Override
public RemoteTask createRemoteTask(
Session session,
+ Span stageSpan,
TaskId taskId,
InternalNode node,
PlanFragment fragment,
@@ -143,7 +149,9 @@ public RemoteTask createRemoteTask(
Optional estimatedMemory,
boolean summarizeTaskInfo)
{
- return new HttpRemoteTask(session,
+ return new HttpRemoteTask(
+ session,
+ stageSpan,
taskId,
node.getNodeIdentifier(),
locationFactory.createTaskLocation(node, taskId),
@@ -165,6 +173,7 @@ public RemoteTask createRemoteTask(
taskUpdateRequestCodec,
failTaskRequestCoded,
partitionedSplitCountTracker,
+ tracer,
stats,
dynamicFilterService,
outboundDynamicFilterIds,
diff --git a/core/trino-main/src/main/java/io/trino/server/NoOpSessionSupplier.java b/core/trino-main/src/main/java/io/trino/server/NoOpSessionSupplier.java
index 291c6e0ba3f7..723ccf4c4ea7 100644
--- a/core/trino-main/src/main/java/io/trino/server/NoOpSessionSupplier.java
+++ b/core/trino-main/src/main/java/io/trino/server/NoOpSessionSupplier.java
@@ -13,6 +13,7 @@
*/
package io.trino.server;
+import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.spi.QueryId;
@@ -23,7 +24,7 @@ public class NoOpSessionSupplier
implements SessionSupplier
{
@Override
- public Session createSession(QueryId queryId, SessionContext context)
+ public Session createSession(QueryId queryId, Span querySpan, SessionContext context)
{
throw new UnsupportedOperationException();
}
diff --git a/core/trino-main/src/main/java/io/trino/server/PluginManager.java b/core/trino-main/src/main/java/io/trino/server/PluginManager.java
index 92052fcd1d6b..87fa1131488a 100644
--- a/core/trino-main/src/main/java/io/trino/server/PluginManager.java
+++ b/core/trino-main/src/main/java/io/trino/server/PluginManager.java
@@ -72,6 +72,8 @@ public class PluginManager
.add("com.fasterxml.jackson.annotation.")
.add("io.airlift.slice.")
.add("org.openjdk.jol.")
+ .add("io.opentelemetry.api.")
+ .add("io.opentelemetry.context.")
.build();
private static final Logger log = Logger.get(PluginManager.class);
diff --git a/core/trino-main/src/main/java/io/trino/server/QuerySessionSupplier.java b/core/trino-main/src/main/java/io/trino/server/QuerySessionSupplier.java
index 58fe9b468a19..9bae204717e9 100644
--- a/core/trino-main/src/main/java/io/trino/server/QuerySessionSupplier.java
+++ b/core/trino-main/src/main/java/io/trino/server/QuerySessionSupplier.java
@@ -13,6 +13,7 @@
*/
package io.trino.server;
+import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.metadata.Metadata;
import io.trino.metadata.SessionPropertyManager;
@@ -69,7 +70,7 @@ public QuerySessionSupplier(
}
@Override
- public Session createSession(QueryId queryId, SessionContext context)
+ public Session createSession(QueryId queryId, Span querySpan, SessionContext context)
{
Identity identity = context.getIdentity();
accessControl.checkCanSetUser(identity.getPrincipal(), identity.getUser());
@@ -90,6 +91,7 @@ public Session createSession(QueryId queryId, SessionContext context)
SessionBuilder sessionBuilder = Session.builder(sessionPropertyManager)
.setQueryId(queryId)
+ .setQuerySpan(querySpan)
.setIdentity(identity)
.setPath(context.getPath().or(() -> defaultPath).map(SqlPath::new))
.setSource(context.getSource())
diff --git a/core/trino-main/src/main/java/io/trino/server/Server.java b/core/trino-main/src/main/java/io/trino/server/Server.java
index 8318dc1f26e2..c90c1221af69 100644
--- a/core/trino-main/src/main/java/io/trino/server/Server.java
+++ b/core/trino-main/src/main/java/io/trino/server/Server.java
@@ -38,6 +38,7 @@
import io.airlift.node.NodeModule;
import io.airlift.openmetrics.JmxOpenMetricsModule;
import io.airlift.tracetoken.TraceTokenModule;
+import io.airlift.tracing.TracingModule;
import io.trino.client.NodeVersion;
import io.trino.connector.CatalogManagerConfig;
import io.trino.connector.CatalogManagerConfig.CatalogMangerKind;
@@ -112,6 +113,7 @@ private void doStart(String trinoVersion)
new JmxOpenMetricsModule(),
new LogJmxModule(),
new TraceTokenModule(),
+ new TracingModule("trino", trinoVersion),
new EventModule(),
new JsonEventModule(),
new ServerSecurityModule(),
diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java
index 5e768f344bd8..e9a7b81a0136 100644
--- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java
+++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java
@@ -141,6 +141,8 @@
import io.trino.sql.planner.RuleStatsRecorder;
import io.trino.sql.planner.TypeAnalyzer;
import io.trino.sql.tree.Expression;
+import io.trino.tracing.ForTracing;
+import io.trino.tracing.TracingMetadata;
import io.trino.type.BlockTypeOperators;
import io.trino.type.InternalTypeManager;
import io.trino.type.JsonPath2016Type;
@@ -368,7 +370,8 @@ protected void setup(Binder binder)
// metadata
binder.bind(MetadataManager.class).in(Scopes.SINGLETON);
- binder.bind(Metadata.class).to(MetadataManager.class).in(Scopes.SINGLETON);
+ binder.bind(Metadata.class).annotatedWith(ForTracing.class).to(MetadataManager.class).in(Scopes.SINGLETON);
+ binder.bind(Metadata.class).to(TracingMetadata.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, SystemSecurityMetadata.class)
.setDefault()
.to(DisabledSystemSecurityMetadata.class)
diff --git a/core/trino-main/src/main/java/io/trino/server/SessionSupplier.java b/core/trino-main/src/main/java/io/trino/server/SessionSupplier.java
index 9837b8a9c24d..2ff7e3a4672e 100644
--- a/core/trino-main/src/main/java/io/trino/server/SessionSupplier.java
+++ b/core/trino-main/src/main/java/io/trino/server/SessionSupplier.java
@@ -13,10 +13,11 @@
*/
package io.trino.server;
+import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.spi.QueryId;
public interface SessionSupplier
{
- Session createSession(QueryId queryId, SessionContext context);
+ Session createSession(QueryId queryId, Span querySpan, SessionContext context);
}
diff --git a/core/trino-main/src/main/java/io/trino/server/TaskResource.java b/core/trino-main/src/main/java/io/trino/server/TaskResource.java
index 007f7a4909f8..e143a835aa76 100644
--- a/core/trino-main/src/main/java/io/trino/server/TaskResource.java
+++ b/core/trino-main/src/main/java/io/trino/server/TaskResource.java
@@ -151,8 +151,10 @@ public void createOrUpdateTask(
return;
}
- TaskInfo taskInfo = taskManager.updateTask(session,
+ TaskInfo taskInfo = taskManager.updateTask(
+ session,
taskId,
+ taskUpdateRequest.getStageSpan(),
taskUpdateRequest.getFragment(),
taskUpdateRequest.getSplitAssignments(),
taskUpdateRequest.getOutputIds(),
diff --git a/core/trino-main/src/main/java/io/trino/server/TaskUpdateRequest.java b/core/trino-main/src/main/java/io/trino/server/TaskUpdateRequest.java
index 08a3b8807a40..b30175502ec1 100644
--- a/core/trino-main/src/main/java/io/trino/server/TaskUpdateRequest.java
+++ b/core/trino-main/src/main/java/io/trino/server/TaskUpdateRequest.java
@@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
+import io.opentelemetry.api.trace.Span;
import io.trino.SessionRepresentation;
import io.trino.execution.SplitAssignment;
import io.trino.execution.buffer.OutputBuffers;
@@ -36,6 +37,7 @@ public class TaskUpdateRequest
private final SessionRepresentation session;
// extraCredentials is stored separately from SessionRepresentation to avoid being leaked
private final Map extraCredentials;
+ private final Span stageSpan;
private final Optional fragment;
private final List splitAssignments;
private final OutputBuffers outputIds;
@@ -46,6 +48,7 @@ public class TaskUpdateRequest
public TaskUpdateRequest(
@JsonProperty("session") SessionRepresentation session,
@JsonProperty("extraCredentials") Map extraCredentials,
+ @JsonProperty("stageSpan") Span stageSpan,
@JsonProperty("fragment") Optional fragment,
@JsonProperty("splitAssignments") List splitAssignments,
@JsonProperty("outputIds") OutputBuffers outputIds,
@@ -54,6 +57,7 @@ public TaskUpdateRequest(
{
requireNonNull(session, "session is null");
requireNonNull(extraCredentials, "extraCredentials is null");
+ requireNonNull(stageSpan, "stageSpan is null");
requireNonNull(fragment, "fragment is null");
requireNonNull(splitAssignments, "splitAssignments is null");
requireNonNull(outputIds, "outputIds is null");
@@ -62,6 +66,7 @@ public TaskUpdateRequest(
this.session = session;
this.extraCredentials = extraCredentials;
+ this.stageSpan = stageSpan;
this.fragment = fragment;
this.splitAssignments = ImmutableList.copyOf(splitAssignments);
this.outputIds = outputIds;
@@ -81,6 +86,12 @@ public Map getExtraCredentials()
return extraCredentials;
}
+ @JsonProperty
+ public Span getStageSpan()
+ {
+ return stageSpan;
+ }
+
@JsonProperty
public Optional getFragment()
{
diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java b/core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java
index 22994843ab40..6fbdee7ab376 100644
--- a/core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java
+++ b/core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java
@@ -22,6 +22,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.SpanBuilder;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskStatus;
@@ -34,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
@@ -62,6 +64,7 @@ class ContinuousTaskStatusFetcher
private final Duration refreshMaxWait;
private final Executor executor;
private final HttpClient httpClient;
+ private final Supplier spanBuilderFactory;
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
@@ -79,6 +82,7 @@ public ContinuousTaskStatusFetcher(
DynamicFiltersFetcher dynamicFiltersFetcher,
Executor executor,
HttpClient httpClient,
+ Supplier spanBuilderFactory,
Duration maxErrorDuration,
ScheduledExecutorService errorScheduledExecutor,
RemoteTaskStats stats)
@@ -95,6 +99,7 @@ public ContinuousTaskStatusFetcher(
this.executor = requireNonNull(executor, "executor is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
this.errorTracker = new RequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
this.stats = requireNonNull(stats, "stats is null");
@@ -146,6 +151,7 @@ private synchronized void scheduleNextRequest()
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
.setHeader(TRINO_CURRENT_VERSION, Long.toString(taskStatus.getVersion()))
.setHeader(TRINO_MAX_WAIT, refreshMaxWait.toString())
+ .setSpanBuilder(spanBuilderFactory.get())
.build();
errorTracker.startRequest();
diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java b/core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java
index cdb88244b289..61712bf56359 100644
--- a/core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java
+++ b/core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java
@@ -20,6 +20,7 @@
import io.airlift.http.client.Request;
import io.airlift.json.JsonCodec;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.SpanBuilder;
import io.trino.execution.DynamicFiltersCollector.VersionedDynamicFilterDomains;
import io.trino.execution.TaskId;
import io.trino.server.DynamicFilterService;
@@ -30,6 +31,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.MediaType.JSON_UTF_8;
@@ -52,6 +54,7 @@ class DynamicFiltersFetcher
private final Duration refreshMaxWait;
private final Executor executor;
private final HttpClient httpClient;
+ private final Supplier spanBuilderFactory;
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
private final DynamicFilterService dynamicFilterService;
@@ -73,6 +76,7 @@ public DynamicFiltersFetcher(
JsonCodec dynamicFilterDomainsCodec,
Executor executor,
HttpClient httpClient,
+ Supplier spanBuilderFactory,
Duration maxErrorDuration,
ScheduledExecutorService errorScheduledExecutor,
RemoteTaskStats stats,
@@ -87,6 +91,7 @@ public DynamicFiltersFetcher(
this.executor = requireNonNull(executor, "executor is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
this.errorTracker = new RequestErrorTracker(taskId, taskUri, maxErrorDuration, errorScheduledExecutor, "getting dynamic filter domains");
this.stats = requireNonNull(stats, "stats is null");
@@ -142,6 +147,7 @@ private synchronized void fetchDynamicFiltersIfNecessary()
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
.setHeader(TRINO_CURRENT_VERSION, Long.toString(localDynamicFiltersVersion))
.setHeader(TRINO_MAX_WAIT, refreshMaxWait.toString())
+ .setSpanBuilder(spanBuilderFactory.get())
.build();
errorTracker.startRequest();
diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
index 9e59b6bec98d..805475eb5717 100644
--- a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
+++ b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java
@@ -33,6 +33,10 @@
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.trino.Session;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.DynamicFiltersCollector.VersionedDynamicFilterDomains;
@@ -64,6 +68,7 @@
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
+import io.trino.tracing.TrinoAttributes;
import org.joda.time.DateTime;
import javax.annotation.concurrent.GuardedBy;
@@ -128,12 +133,15 @@ public final class HttpRemoteTask
private final TaskId taskId;
private final Session session;
+ private final Span stageSpan;
private final String nodeId;
private final PlanFragment planFragment;
private final AtomicLong nextSplitId = new AtomicLong();
private final RemoteTaskStats stats;
+ private final Tracer tracer;
+ private final Span span;
private final TaskInfoFetcher taskInfoFetcher;
private final ContinuousTaskStatusFetcher taskStatusFetcher;
private final DynamicFiltersFetcher dynamicFiltersFetcher;
@@ -196,6 +204,7 @@ public final class HttpRemoteTask
public HttpRemoteTask(
Session session,
+ Span stageSpan,
TaskId taskId,
String nodeId,
URI location,
@@ -217,12 +226,14 @@ public HttpRemoteTask(
JsonCodec taskUpdateRequestCodec,
JsonCodec failTaskRequestCodec,
PartitionedSplitCountTracker partitionedSplitCountTracker,
+ Tracer tracer,
RemoteTaskStats stats,
DynamicFilterService dynamicFilterService,
Set outboundDynamicFilterIds,
Optional estimatedMemory)
{
requireNonNull(session, "session is null");
+ requireNonNull(stageSpan, "stageSpan is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(nodeId, "nodeId is null");
requireNonNull(location, "location is null");
@@ -241,6 +252,7 @@ public HttpRemoteTask(
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
this.session = session;
+ this.stageSpan = stageSpan;
this.nodeId = nodeId;
this.planFragment = planFragment;
this.outputBuffers.set(outputBuffers);
@@ -255,6 +267,8 @@ public HttpRemoteTask(
this.failTaskRequestCodec = failTaskRequestCodec;
this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
+ this.span = createSpanBuilder("remote-task", stageSpan).startSpan();
this.stats = stats;
for (Entry entry : initialSplits.entries()) {
@@ -307,6 +321,7 @@ public HttpRemoteTask(
dynamicFilterDomainsCodec,
executor,
httpClient,
+ () -> createSpanBuilder("task-dynamic-filters", span),
maxErrorDuration,
errorScheduledExecutor,
stats,
@@ -320,6 +335,7 @@ public HttpRemoteTask(
dynamicFiltersFetcher,
executor,
httpClient,
+ () -> createSpanBuilder("task-status", span),
maxErrorDuration,
errorScheduledExecutor,
stats);
@@ -329,6 +345,7 @@ public HttpRemoteTask(
taskStatusFetcher,
initialTask,
httpClient,
+ () -> createSpanBuilder("task-info", span),
taskInfoUpdateInterval,
taskInfoCodec,
maxErrorDuration,
@@ -349,6 +366,9 @@ public HttpRemoteTask(
partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
updateSplitQueueSpace();
}
+ if (state.isDone()) {
+ span.end();
+ }
});
this.outboundDynamicFiltersCollector = new DynamicFiltersCollector(this::triggerUpdate);
@@ -709,6 +729,7 @@ private void sendUpdate()
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
+ stageSpan,
fragment,
splitAssignments,
outputBuffers.get(),
@@ -734,6 +755,7 @@ private void sendUpdate()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
+ .setSpanBuilder(createSpanBuilder("task-update", span))
.build();
updateErrorTracker.startRequest();
@@ -895,6 +917,7 @@ private Request buildDeleteTaskRequest(boolean abort)
HttpUriBuilder uriBuilder = getHttpUriBuilder(getTaskStatus()).addParameter("abort", "" + abort);
return prepareDelete()
.setUri(uriBuilder.build())
+ .setSpanBuilder(createSpanBuilder("task-delete", span))
.build();
}
@@ -906,6 +929,7 @@ private Request buildFailTaskRequest(FailTaskRequest failTaskRequest)
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(failTaskRequestCodec.toJsonBytes(failTaskRequest)))
+ .setSpanBuilder(createSpanBuilder("task-fail", span))
.build();
}
@@ -1052,6 +1076,15 @@ private HttpUriBuilder getHttpUriBuilder(TaskStatus taskStatus)
return uriBuilder;
}
+ private SpanBuilder createSpanBuilder(String name, Span parent)
+ {
+ return tracer.spanBuilder(name)
+ .setParent(Context.current().with(parent))
+ .setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
+ .setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
+ .setAttribute(TrinoAttributes.TASK_ID, taskId.toString());
+ }
+
@Override
public String toString()
{
diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java b/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java
index 2b446fd0b991..c141dfe85394 100644
--- a/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java
+++ b/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java
@@ -23,6 +23,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
+import io.opentelemetry.api.trace.SpanBuilder;
import io.trino.execution.StateMachine;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.TaskId;
@@ -42,6 +43,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
@@ -68,6 +70,7 @@ public class TaskInfoFetcher
private final Executor executor;
private final HttpClient httpClient;
+ private final Supplier spanBuilderFactory;
private final RequestErrorTracker errorTracker;
private final boolean summarizeTaskInfo;
@@ -90,6 +93,7 @@ public TaskInfoFetcher(
ContinuousTaskStatusFetcher taskStatusFetcher,
TaskInfo initialTask,
HttpClient httpClient,
+ Supplier spanBuilderFactory,
Duration updateInterval,
JsonCodec taskInfoCodec,
Duration maxErrorDuration,
@@ -118,6 +122,7 @@ public TaskInfoFetcher(
this.executor = requireNonNull(executor, "executor is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
this.stats = requireNonNull(stats, "stats is null");
this.estimatedMemory = requireNonNull(estimatedMemory, "estimatedMemory is null");
}
@@ -224,6 +229,7 @@ private synchronized void sendNextRequest()
Request request = prepareGet()
.setUri(uri)
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
+ .setSpanBuilder(spanBuilderFactory.get())
.build();
errorTracker.startRequest();
diff --git a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java
index 34582cd46921..b32792eabfe0 100644
--- a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java
+++ b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java
@@ -36,6 +36,7 @@
import io.airlift.node.testing.TestingNodeModule;
import io.airlift.openmetrics.JmxOpenMetricsModule;
import io.airlift.tracetoken.TraceTokenModule;
+import io.airlift.tracing.TracingModule;
import io.trino.connector.CatalogManagerModule;
import io.trino.connector.ConnectorName;
import io.trino.connector.ConnectorServicesProvider;
@@ -131,6 +132,8 @@
public class TestingTrinoServer
implements Closeable
{
+ private static final String VERSION = "testversion";
+
public static TestingTrinoServer create()
{
return builder().build();
@@ -262,10 +265,11 @@ private TestingTrinoServer(
.add(new JmxOpenMetricsModule())
.add(new EventModule())
.add(new TraceTokenModule())
+ .add(new TracingModule("trino", VERSION))
.add(new ServerSecurityModule())
.add(new CatalogManagerModule())
.add(new TransactionManagerModule())
- .add(new ServerMainModule("testversion"))
+ .add(new ServerMainModule(VERSION))
.add(new TestingWarningCollectorModule())
.add(binder -> {
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
diff --git a/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java b/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java
index 41248cf928e6..084d6722fa99 100644
--- a/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java
+++ b/core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java
@@ -15,6 +15,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.opentelemetry.context.Context;
import io.trino.metadata.Split;
import io.trino.spi.connector.CatalogHandle;
@@ -72,6 +73,7 @@ public Optional> getTableExecuteSplitsInfo()
private static class GetNextBatch
{
+ private final Context context = Context.current();
private final SplitSource splitSource;
private final int min;
private final int max;
@@ -102,7 +104,10 @@ private ListenableFuture fetchSplits()
if (splits.size() >= min) {
return immediateVoidFuture();
}
- ListenableFuture future = splitSource.getNextBatch(max - splits.size());
+ ListenableFuture future;
+ try (var ignored = context.makeCurrent()) {
+ future = splitSource.getNextBatch(max - splits.size());
+ }
return Futures.transformAsync(future, splitBatch -> {
splits.addAll(splitBatch.getSplits());
if (splitBatch.isLastBatch()) {
diff --git a/core/trino-main/src/main/java/io/trino/split/SplitManager.java b/core/trino-main/src/main/java/io/trino/split/SplitManager.java
index b5ed757ec6f6..dbb11680b375 100644
--- a/core/trino-main/src/main/java/io/trino/split/SplitManager.java
+++ b/core/trino-main/src/main/java/io/trino/split/SplitManager.java
@@ -13,6 +13,9 @@
*/
package io.trino.split;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.trino.Session;
import io.trino.connector.CatalogServiceProvider;
import io.trino.execution.QueryManagerConfig;
@@ -24,26 +27,32 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
+import io.trino.tracing.TrinoAttributes;
import javax.inject.Inject;
+import java.util.Optional;
+
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static java.util.Objects.requireNonNull;
public class SplitManager
{
private final CatalogServiceProvider splitManagerProvider;
+ private final Tracer tracer;
private final int minScheduleSplitBatchSize;
@Inject
- public SplitManager(CatalogServiceProvider splitManagerProvider, QueryManagerConfig config)
+ public SplitManager(CatalogServiceProvider splitManagerProvider, Tracer tracer, QueryManagerConfig config)
{
this.splitManagerProvider = requireNonNull(splitManagerProvider, "splitManagerProvider is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.minScheduleSplitBatchSize = config.getMinScheduleSplitBatchSize();
}
public SplitSource getSplits(
Session session,
+ Span parentSpan,
TableHandle table,
DynamicFilter dynamicFilter,
Constraint constraint)
@@ -64,13 +73,22 @@ public SplitSource getSplits(
constraint);
SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source);
+
+ Span span = splitSourceSpan(parentSpan, catalogHandle);
+
if (minScheduleSplitBatchSize > 1) {
+ splitSource = new TracingSplitSource(splitSource, tracer, Optional.empty(), "split-batch");
splitSource = new BufferingSplitSource(splitSource, minScheduleSplitBatchSize);
+ splitSource = new TracingSplitSource(splitSource, tracer, Optional.of(span), "split-buffer");
}
+ else {
+ splitSource = new TracingSplitSource(splitSource, tracer, Optional.of(span), "split-batch");
+ }
+
return splitSource;
}
- public SplitSource getSplits(Session session, TableFunctionHandle function)
+ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHandle function)
{
CatalogHandle catalogHandle = function.getCatalogHandle();
ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle);
@@ -81,6 +99,17 @@ public SplitSource getSplits(Session session, TableFunctionHandle function)
function.getSchemaFunctionName(),
function.getFunctionHandle());
- return new ConnectorAwareSplitSource(catalogHandle, source);
+ SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source);
+
+ Span span = splitSourceSpan(parentSpan, catalogHandle);
+ return new TracingSplitSource(splitSource, tracer, Optional.of(span), "split-buffer");
+ }
+
+ private Span splitSourceSpan(Span querySpan, CatalogHandle catalogHandle)
+ {
+ return tracer.spanBuilder("split-source")
+ .setParent(Context.current().with(querySpan))
+ .setAttribute(TrinoAttributes.CATALOG, catalogHandle.getCatalogName())
+ .startSpan();
}
}
diff --git a/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java b/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java
new file mode 100644
index 000000000000..8df2cc09b5b1
--- /dev/null
+++ b/core/trino-main/src/main/java/io/trino/split/TracingSplitSource.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.split;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.trino.spi.connector.CatalogHandle;
+import io.trino.tracing.TrinoAttributes;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.util.Objects.requireNonNull;
+
+public class TracingSplitSource
+ implements SplitSource
+{
+ private final SplitSource source;
+ private final Tracer tracer;
+ private final Optional parentSpan;
+ private final String spanName;
+
+ public TracingSplitSource(SplitSource source, Tracer tracer, Optional parentSpan, String spanName)
+ {
+ this.source = requireNonNull(source, "source is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
+ this.parentSpan = requireNonNull(parentSpan, "parentSpan is null");
+ this.spanName = requireNonNull(spanName, "spanName is null");
+ }
+
+ @Override
+ public CatalogHandle getCatalogHandle()
+ {
+ return source.getCatalogHandle();
+ }
+
+ @Override
+ public ListenableFuture getNextBatch(int maxSize)
+ {
+ Span span = tracer.spanBuilder(spanName)
+ .setParent(parentSpan.map(Context.current()::with).orElse(Context.current()))
+ .setAttribute(TrinoAttributes.SPLIT_BATCH_MAX_SIZE, (long) maxSize)
+ .startSpan();
+
+ ListenableFuture future;
+ try (var ignored = span.makeCurrent()) {
+ future = source.getNextBatch(maxSize);
+ }
+ catch (Throwable t) {
+ span.end();
+ throw t;
+ }
+
+ Futures.addCallback(future, new FutureCallback<>()
+ {
+ @Override
+ public void onSuccess(SplitBatch batch)
+ {
+ span.setAttribute(TrinoAttributes.SPLIT_BATCH_RESULT_SIZE, batch.getSplits().size());
+ span.end();
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ span.end();
+ }
+ }, directExecutor());
+
+ return future;
+ }
+
+ @Override
+ public void close()
+ {
+ try (source) {
+ parentSpan.ifPresent(Span::end);
+ }
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return source.isFinished();
+ }
+
+ @Override
+ public Optional> getTableExecuteSplitsInfo()
+ {
+ return source.getTableExecuteSplitsInfo();
+ }
+}
diff --git a/core/trino-main/src/main/java/io/trino/sql/PlannerContext.java b/core/trino-main/src/main/java/io/trino/sql/PlannerContext.java
index 5bf90f91e5fb..a73d8c93e0ae 100644
--- a/core/trino-main/src/main/java/io/trino/sql/PlannerContext.java
+++ b/core/trino-main/src/main/java/io/trino/sql/PlannerContext.java
@@ -13,6 +13,7 @@
*/
package io.trino.sql;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.metadata.FunctionManager;
import io.trino.metadata.Metadata;
import io.trino.spi.block.BlockEncodingSerde;
@@ -39,19 +40,22 @@ public class PlannerContext
private final BlockEncodingSerde blockEncodingSerde;
private final TypeManager typeManager;
private final FunctionManager functionManager;
+ private final Tracer tracer;
@Inject
public PlannerContext(Metadata metadata,
TypeOperators typeOperators,
BlockEncodingSerde blockEncodingSerde,
TypeManager typeManager,
- FunctionManager functionManager)
+ FunctionManager functionManager,
+ Tracer tracer)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.functionManager = requireNonNull(functionManager, "functionManager is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
}
public Metadata getMetadata()
@@ -78,4 +82,9 @@ public FunctionManager getFunctionManager()
{
return functionManager;
}
+
+ public Tracer getTracer()
+ {
+ return tracer;
+ }
}
diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analyzer.java
index c04f8e20bc8b..f08161ff853a 100644
--- a/core/trino-main/src/main/java/io/trino/sql/analyzer/Analyzer.java
+++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/Analyzer.java
@@ -15,6 +15,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
import io.trino.Session;
import io.trino.execution.querystats.PlanOptimizersStatsCollector;
import io.trino.execution.warnings.WarningCollector;
@@ -37,6 +40,7 @@
import static io.trino.sql.analyzer.ExpressionTreeUtils.extractWindowExpressions;
import static io.trino.sql.analyzer.QueryType.OTHERS;
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
+import static io.trino.tracing.ScopedSpan.scopedSpan;
import static java.util.Objects.requireNonNull;
public class Analyzer
@@ -47,7 +51,8 @@ public class Analyzer
private final List parameters;
private final Map, Expression> parameterLookup;
private final WarningCollector warningCollector;
- private PlanOptimizersStatsCollector planOptimizersStatsCollector;
+ private final PlanOptimizersStatsCollector planOptimizersStatsCollector;
+ private final Tracer tracer;
private final StatementRewrite statementRewrite;
Analyzer(
@@ -58,6 +63,7 @@ public class Analyzer
Map, Expression> parameterLookup,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector,
+ Tracer tracer,
StatementRewrite statementRewrite)
{
this.session = requireNonNull(session, "session is null");
@@ -67,12 +73,18 @@ public class Analyzer
this.parameterLookup = parameterLookup;
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
this.planOptimizersStatsCollector = requireNonNull(planOptimizersStatsCollector, "planOptimizersStatsCollector is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
this.statementRewrite = requireNonNull(statementRewrite, "statementRewrite is null");
}
public Analysis analyze(Statement statement)
{
- return analyze(statement, OTHERS);
+ Span span = tracer.spanBuilder("analyzer")
+ .setParent(Context.current().with(session.getQuerySpan()))
+ .startSpan();
+ try (var ignored = scopedSpan(span)) {
+ return analyze(statement, OTHERS);
+ }
}
public Analysis analyze(Statement statement, QueryType queryType)
@@ -80,15 +92,21 @@ public Analysis analyze(Statement statement, QueryType queryType)
Statement rewrittenStatement = statementRewrite.rewrite(analyzerFactory, session, statement, parameters, parameterLookup, warningCollector, planOptimizersStatsCollector);
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, queryType);
StatementAnalyzer analyzer = statementAnalyzerFactory.createStatementAnalyzer(analysis, session, warningCollector, CorrelationSupport.ALLOWED);
- analyzer.analyze(rewrittenStatement, Optional.empty());
- // check column access permissions for each table
- analysis.getTableColumnReferences().forEach((accessControlInfo, tableColumnReferences) ->
- tableColumnReferences.forEach((tableName, columns) ->
- accessControlInfo.getAccessControl().checkCanSelectFromColumns(
- accessControlInfo.getSecurityContext(session.getRequiredTransactionId(), session.getQueryId()),
- tableName,
- columns)));
+ try (var ignored = scopedSpan(tracer, "analyze")) {
+ analyzer.analyze(rewrittenStatement, Optional.empty());
+ }
+
+ try (var ignored = scopedSpan(tracer, "access-control")) {
+ // check column access permissions for each table
+ analysis.getTableColumnReferences().forEach((accessControlInfo, tableColumnReferences) ->
+ tableColumnReferences.forEach((tableName, columns) ->
+ accessControlInfo.getAccessControl().checkCanSelectFromColumns(
+ accessControlInfo.getSecurityContext(session.getRequiredTransactionId(), session.getQueryId()),
+ tableName,
+ columns)));
+ }
+
return analysis;
}
diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/AnalyzerFactory.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/AnalyzerFactory.java
index 354605386e8c..b5b8c49c8d19 100644
--- a/core/trino-main/src/main/java/io/trino/sql/analyzer/AnalyzerFactory.java
+++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/AnalyzerFactory.java
@@ -13,6 +13,7 @@
*/
package io.trino.sql.analyzer;
+import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.execution.querystats.PlanOptimizersStatsCollector;
import io.trino.execution.warnings.WarningCollector;
@@ -32,12 +33,14 @@ public class AnalyzerFactory
{
private final StatementAnalyzerFactory statementAnalyzerFactory;
private final StatementRewrite statementRewrite;
+ private final Tracer tracer;
@Inject
- public AnalyzerFactory(StatementAnalyzerFactory statementAnalyzerFactory, StatementRewrite statementRewrite)
+ public AnalyzerFactory(StatementAnalyzerFactory statementAnalyzerFactory, StatementRewrite statementRewrite, Tracer tracer)
{
this.statementAnalyzerFactory = requireNonNull(statementAnalyzerFactory, "statementAnalyzerFactory is null");
this.statementRewrite = requireNonNull(statementRewrite, "statementRewrite is null");
+ this.tracer = requireNonNull(tracer, "tracer is null");
}
public Analyzer createAnalyzer(
@@ -55,6 +58,7 @@ public Analyzer createAnalyzer(
parameterLookup,
warningCollector,
planOptimizersStatsCollector,
+ tracer,
statementRewrite);
}
}
diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
index 1b4a60db51b5..92be1ac482ef 100644
--- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
+++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
@@ -689,7 +689,7 @@ public void addDriverFactory(boolean inputDriver, boolean outputDriver, Physical
else {
operatorFactories = toOperatorFactories(operatorFactoriesWithTypes);
}
- driverFactories.add(new DriverFactory(getNextPipelineId(), inputDriver, outputDriver, operatorFactories, driverInstances));
+ addDriverFactory(inputDriver, outputDriver, operatorFactories, driverInstances);
}
private List handleLateMaterialization(List operatorFactories)
diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java
index 6b11d3e1c224..f78ce7aa1bcd 100644
--- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java
+++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java
@@ -15,7 +15,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.errorprone.annotations.MustBeClosed;
import io.airlift.log.Logger;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.context.Context;
import io.trino.Session;
import io.trino.cost.CachingCostProvider;
import io.trino.cost.CachingStatsProvider;
@@ -55,6 +59,7 @@
import io.trino.sql.analyzer.RelationType;
import io.trino.sql.analyzer.Scope;
import io.trino.sql.planner.StatisticsAggregationPlanner.TableStatisticAggregation;
+import io.trino.sql.planner.iterative.IterativeOptimizer;
import io.trino.sql.planner.optimizations.PlanOptimizer;
import io.trino.sql.planner.plan.Assignments;
import io.trino.sql.planner.plan.ExplainAnalyzeNode;
@@ -99,9 +104,13 @@
import io.trino.sql.tree.Table;
import io.trino.sql.tree.TableExecute;
import io.trino.sql.tree.Update;
+import io.trino.tracing.ScopedSpan;
+import io.trino.tracing.TrinoAttributes;
import io.trino.type.TypeCoercion;
import io.trino.type.UnknownType;
+import javax.annotation.Nonnull;
+
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.HashMap;
@@ -149,6 +158,7 @@
import static io.trino.sql.planner.sanity.PlanSanityChecker.DISTRIBUTED_PLAN_SANITY_CHECKER;
import static io.trino.sql.tree.BooleanLiteral.TRUE_LITERAL;
import static io.trino.sql.tree.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
+import static io.trino.tracing.ScopedSpan.scopedSpan;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
@@ -230,7 +240,10 @@ public Plan plan(Analysis analysis, Stage stage)
public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
{
- PlanNode root = planStatement(analysis, analysis.getStatement());
+ PlanNode root;
+ try (var ignored = scopedSpan(plannerContext.getTracer(), "plan")) {
+ root = planStatement(analysis, analysis.getStatement());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Initial plan:\n%s", PlanPrinter.textLogicalPlan(
@@ -244,34 +257,25 @@ public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
false));
}
- planSanityChecker.validateIntermediatePlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
+ try (var ignored = scopedSpan(plannerContext.getTracer(), "validate-intermediate")) {
+ planSanityChecker.validateIntermediatePlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
+ }
TableStatsProvider tableStatsProvider = new CachingTableStatsProvider(metadata, session);
if (stage.ordinal() >= OPTIMIZED.ordinal()) {
- for (PlanOptimizer optimizer : planOptimizers) {
- root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator, warningCollector, planOptimizersStatsCollector, tableStatsProvider);
- if (root == null) {
- throw new NullPointerException(optimizer.getClass().getName() + " returned a null plan");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("%s:\n%s", optimizer.getClass().getName(), PlanPrinter.textLogicalPlan(
- root,
- symbolAllocator.getTypes(),
- metadata,
- plannerContext.getFunctionManager(),
- StatsAndCosts.empty(),
- session,
- 0,
- false));
+ try (var ignored = scopedSpan(plannerContext.getTracer(), "optimizer")) {
+ for (PlanOptimizer optimizer : planOptimizers) {
+ root = runOptimizer(root, tableStatsProvider, optimizer);
}
}
}
if (stage.ordinal() >= OPTIMIZED_AND_VALIDATED.ordinal()) {
// make sure we produce a valid plan after optimizations run. This is mainly to catch programming errors
- planSanityChecker.validateFinalPlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
+ try (var ignored = scopedSpan(plannerContext.getTracer(), "validate-final")) {
+ planSanityChecker.validateFinalPlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
+ }
}
TypeProvider types = symbolAllocator.getTypes();
@@ -280,11 +284,55 @@ public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
if (collectPlanStatistics) {
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types, tableStatsProvider);
CostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, Optional.empty(), session, types);
- statsAndCosts = StatsAndCosts.create(root, statsProvider, costProvider);
+ try (var ignored = scopedSpan(plannerContext.getTracer(), "plan-stats")) {
+ statsAndCosts = StatsAndCosts.create(root, statsProvider, costProvider);
+ }
}
return new Plan(root, types, statsAndCosts);
}
+ @Nonnull
+ private PlanNode runOptimizer(PlanNode root, TableStatsProvider tableStatsProvider, PlanOptimizer optimizer)
+ {
+ PlanNode result;
+ try (var ignored = optimizerSpan(optimizer)) {
+ result = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator, warningCollector, planOptimizersStatsCollector, tableStatsProvider);
+ }
+ if (result == null) {
+ throw new NullPointerException(optimizer.getClass().getName() + " returned a null plan");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("%s:\n%s", optimizer.getClass().getName(), PlanPrinter.textLogicalPlan(
+ result,
+ symbolAllocator.getTypes(),
+ metadata,
+ plannerContext.getFunctionManager(),
+ StatsAndCosts.empty(),
+ session,
+ 0,
+ false));
+ }
+
+ return result;
+ }
+
+ @MustBeClosed
+ private ScopedSpan optimizerSpan(PlanOptimizer optimizer)
+ {
+ if (!Span.fromContext(Context.current()).isRecording()) {
+ return null;
+ }
+ SpanBuilder builder = plannerContext.getTracer().spanBuilder("optimize")
+ .setAttribute(TrinoAttributes.OPTIMIZER_NAME, optimizer.getClass().getSimpleName());
+ if (optimizer instanceof IterativeOptimizer iterative) {
+ builder.setAttribute(TrinoAttributes.OPTIMIZER_RULES, iterative.getRules().stream()
+ .map(x -> x.getClass().getSimpleName())
+ .toList());
+ }
+ return scopedSpan(builder.startSpan());
+ }
+
public PlanNode planStatement(Analysis analysis, Statement statement)
{
if ((statement instanceof CreateTableAsSelect && analysis.getCreate().orElseThrow().isCreateTableAsSelectNoOp()) ||
diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/SplitSourceFactory.java b/core/trino-main/src/main/java/io/trino/sql/planner/SplitSourceFactory.java
index e47e2eae0eab..7b76175a4a5c 100644
--- a/core/trino-main/src/main/java/io/trino/sql/planner/SplitSourceFactory.java
+++ b/core/trino-main/src/main/java/io/trino/sql/planner/SplitSourceFactory.java
@@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
+import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.server.DynamicFilterService;
import io.trino.spi.connector.Constraint;
@@ -100,13 +101,13 @@ public SplitSourceFactory(SplitManager splitManager, PlannerContext plannerConte
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
}
- public Map createSplitSources(Session session, PlanFragment fragment)
+ public Map createSplitSources(Session session, Span stageSpan, PlanFragment fragment)
{
ImmutableList.Builder allSplitSources = ImmutableList.builder();
try {
// get splits for this fragment, this is lazy so split assignments aren't actually calculated here
return fragment.getRoot().accept(
- new Visitor(session, TypeProvider.copyOf(fragment.getSymbols()), allSplitSources),
+ new Visitor(session, stageSpan, TypeProvider.copyOf(fragment.getSymbols()), allSplitSources),
null);
}
catch (Throwable t) {
@@ -129,15 +130,18 @@ private final class Visitor
extends PlanVisitor