Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
<artifactId>trace-token</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>tracing</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
Expand Down Expand Up @@ -291,6 +296,16 @@
<artifactId>jjwt-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
Expand Down Expand Up @@ -368,6 +383,12 @@
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
Expand Down Expand Up @@ -449,12 +470,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-urlconnection</artifactId>
Expand Down
33 changes: 33 additions & 0 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.trino.client.ProtocolHeaders;
import io.trino.metadata.SessionPropertyManager;
import io.trino.security.AccessControl;
Expand Down Expand Up @@ -60,6 +61,7 @@
public final class Session
{
private final QueryId queryId;
private final Span querySpan;
private final Optional<TransactionId> transactionId;
private final boolean clientTransactionSupport;
private final Identity identity;
Expand Down Expand Up @@ -87,6 +89,7 @@ public final class Session

public Session(
QueryId queryId,
Span querySpan,
Optional<TransactionId> transactionId,
boolean clientTransactionSupport,
Identity identity,
Expand All @@ -112,6 +115,7 @@ public Session(
Optional<Slice> exchangeEncryptionKey)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.querySpan = requireNonNull(querySpan, "querySpan is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
this.clientTransactionSupport = clientTransactionSupport;
this.identity = requireNonNull(identity, "identity is null");
Expand Down Expand Up @@ -150,6 +154,11 @@ public QueryId getQueryId()
return queryId;
}

public Span getQuerySpan()
{
return querySpan;
}

public String getUser()
{
return identity.getUser();
Expand Down Expand Up @@ -332,6 +341,7 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage

return new Session(
queryId,
querySpan,
Optional.of(transactionId),
clientTransactionSupport,
Identity.from(identity)
Expand Down Expand Up @@ -381,6 +391,7 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,

return new Session(
queryId,
querySpan,
transactionId,
clientTransactionSupport,
identity,
Expand Down Expand Up @@ -411,6 +422,7 @@ public Session withExchangeEncryption(Slice encryptionKey)
checkState(exchangeEncryptionKey.isEmpty(), "exchangeEncryptionKey is already present");
return new Session(
queryId,
querySpan,
transactionId,
clientTransactionSupport,
identity,
Expand Down Expand Up @@ -459,6 +471,7 @@ public SessionRepresentation toSessionRepresentation()
{
return new SessionRepresentation(
queryId.toString(),
querySpan,
transactionId,
clientTransactionSupport,
identity.getUser(),
Expand Down Expand Up @@ -491,6 +504,7 @@ public String toString()
{
return toStringHelper(this)
.add("queryId", queryId)
.add("querySpan", querySpanString().orElse(null))
.add("transactionId", transactionId)
.add("user", getUser())
.add("principal", getIdentity().getPrincipal().orElse(null))
Expand All @@ -512,6 +526,16 @@ public String toString()
.toString();
}

private Optional<String> querySpanString()
{
return Optional.of(querySpan)
.filter(span -> span.getSpanContext().isValid())
.map(span -> toStringHelper("Span")
.add("spanId", span.getSpanContext().getSpanId())
.add("traceId", span.getSpanContext().getTraceId())
.toString());
}

private void validateCatalogProperties(
Optional<TransactionId> transactionId,
AccessControl accessControl,
Expand Down Expand Up @@ -560,6 +584,7 @@ public SecurityContext toSecurityContext()
public static class SessionBuilder
{
private QueryId queryId;
private Span querySpan = Span.getInvalid();
private TransactionId transactionId;
private boolean clientTransactionSupport;
private Identity identity;
Expand Down Expand Up @@ -624,6 +649,13 @@ public SessionBuilder setQueryId(QueryId queryId)
return this;
}

@CanIgnoreReturnValue
public SessionBuilder setQuerySpan(Span querySpan)
{
this.querySpan = requireNonNull(querySpan, "querySpan is null");
return this;
}

@CanIgnoreReturnValue
public SessionBuilder setTransactionId(TransactionId transactionId)
{
Expand Down Expand Up @@ -853,6 +885,7 @@ public Session build()
{
return new Session(
queryId,
querySpan,
Optional.ofNullable(transactionId),
clientTransactionSupport,
identity,
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/SessionRepresentation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.opentelemetry.api.trace.Span;
import io.trino.metadata.SessionPropertyManager;
import io.trino.spi.QueryId;
import io.trino.spi.security.BasicPrincipal;
Expand All @@ -42,6 +43,7 @@
public final class SessionRepresentation
{
private final String queryId;
private final Span querySpan;
private final Optional<TransactionId> transactionId;
private final boolean clientTransactionSupport;
private final String user;
Expand Down Expand Up @@ -71,6 +73,7 @@ public final class SessionRepresentation
@JsonCreator
public SessionRepresentation(
@JsonProperty("queryId") String queryId,
@JsonProperty("querySpan") Span querySpan,
@JsonProperty("transactionId") Optional<TransactionId> transactionId,
@JsonProperty("clientTransactionSupport") boolean clientTransactionSupport,
@JsonProperty("user") String user,
Expand Down Expand Up @@ -98,6 +101,7 @@ public SessionRepresentation(
@JsonProperty("protocolName") String protocolName)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.querySpan = requireNonNull(querySpan, "querySpan is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
this.clientTransactionSupport = clientTransactionSupport;
this.user = requireNonNull(user, "user is null");
Expand Down Expand Up @@ -136,6 +140,12 @@ public String getQueryId()
return queryId;
}

@JsonProperty
public Span getQuerySpan()
{
return querySpan;
}

@JsonProperty
public Optional<TransactionId> getTransactionId()
{
Expand Down Expand Up @@ -317,6 +327,7 @@ public Session toSession(SessionPropertyManager sessionPropertyManager, Map<Stri
{
return new Session(
new QueryId(queryId),
querySpan,
transactionId,
clientTransactionSupport,
toIdentity(extraCredentials),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.connector;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.NodeManager;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
Expand All @@ -31,6 +33,8 @@
public class ConnectorContextInstance
implements ConnectorContext
{
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final NodeManager nodeManager;
private final VersionEmbedder versionEmbedder;
private final TypeManager typeManager;
Expand All @@ -43,6 +47,8 @@ public class ConnectorContextInstance

public ConnectorContextInstance(
CatalogHandle catalogHandle,
OpenTelemetry openTelemetry,
Tracer tracer,
NodeManager nodeManager,
VersionEmbedder versionEmbedder,
TypeManager typeManager,
Expand All @@ -51,6 +57,8 @@ public ConnectorContextInstance(
PageIndexerFactory pageIndexerFactory,
Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.versionEmbedder = requireNonNull(versionEmbedder, "versionEmbedder is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -61,6 +69,18 @@ public ConnectorContextInstance(
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
}

@Override
public OpenTelemetry getOpenTelemetry()
{
return openTelemetry;
}

@Override
public Tracer getTracer()
{
return tracer;
}

@Override
public CatalogHandle getCatalogHandle()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.airlift.log.Logger;
import io.opentelemetry.api.trace.Tracer;
import io.trino.metadata.CatalogMetadata.SecurityManagement;
import io.trino.metadata.CatalogProcedures;
import io.trino.metadata.CatalogTableFunctions;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class ConnectorServices
{
private static final Logger log = Logger.get(ConnectorServices.class);

private final Tracer tracer;
private final CatalogHandle catalogHandle;
private final Connector connector;
private final Runnable afterShutdown;
Expand All @@ -87,8 +89,9 @@ public class ConnectorServices

private final AtomicBoolean shutdown = new AtomicBoolean();

public ConnectorServices(CatalogHandle catalogHandle, Connector connector, Runnable afterShutdown)
public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector connector, Runnable afterShutdown)
{
this.tracer = requireNonNull(tracer, "tracer is null");
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.connector = requireNonNull(connector, "connector is null");
this.afterShutdown = requireNonNull(afterShutdown, "afterShutdown is null");
Expand Down Expand Up @@ -207,6 +210,11 @@ public ConnectorServices(CatalogHandle catalogHandle, Connector connector, Runna
this.capabilities = capabilities;
}

public Tracer getTracer()
{
return tracer;
}

public CatalogHandle getCatalogHandle()
{
return catalogHandle;
Expand Down
Loading