Skip to content

Commit

Permalink
feat(*): OpenTelemetry traces
Browse files Browse the repository at this point in the history
Fixes #6149
  • Loading branch information
loicmathieu committed Jan 22, 2025
1 parent 33308c4 commit e87b97a
Show file tree
Hide file tree
Showing 28 changed files with 890 additions and 284 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ allprojects {
micronaut "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
micronaut "io.micronaut:micronaut-http-client"
micronaut "io.micronaut.reactor:micronaut-reactor-http-client"
micronaut "io.micronaut.tracing:micronaut-tracing-opentelemetry-http"

// logs
implementation "org.slf4j:slf4j-api"
Expand All @@ -133,6 +134,9 @@ allprojects {
implementation group: 'org.slf4j', name: 'jcl-over-slf4j'
implementation group: 'org.fusesource.jansi', name: 'jansi'

// OTEL
implementation "io.opentelemetry:opentelemetry-exporter-otlp"

// jackson
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down
12 changes: 12 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ kestra:
metrics:
prefix: kestra

traces:
root: DISABLED

server:
basic-auth:
enabled: false
Expand Down Expand Up @@ -194,3 +197,12 @@ kestra:
prefixes:
- system.
- internal.

otel:
exclusions:
- /ping
- /metrics
- /health
- /env
- /prometheus
propagators: tracecontext, baggage
7 changes: 6 additions & 1 deletion core/src/main/java/io/kestra/core/http/HttpRequest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.http;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -93,7 +94,7 @@ public static HttpRequest of(URI uri, String method, RequestBody body, Map<Strin
.build();
}

public HttpUriRequest to() throws IOException {
public HttpUriRequest to(RunContext runContext) throws IOException {
HttpUriRequestBase builder = new HttpUriRequestBase(this.method, this.uri);

// headers
Expand All @@ -104,6 +105,10 @@ public HttpUriRequest to() throws IOException {
);
}

if (runContext.getTraceParent() != null) {
builder.addHeader("traceparent", runContext.getTraceParent());
}

// body
if (this.body != null) {
builder.setEntity(this.body.to());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private <T> HttpResponse<T> request(
HttpClientResponseHandler<HttpResponse<T>> responseHandler
) throws HttpClientException {
try {
return this.client.execute(request.to(), httpClientContext, responseHandler);
return this.client.execute(request.to(runContext), httpClientContext, responseHandler);
} catch (SocketException e) {
throw new HttpClientRequestException(e.getMessage(), request, e);
} catch (IOException e) {
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.NonFinal;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
Expand Down Expand Up @@ -106,6 +107,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable
Instant scheduleDate;

@NonFinal
@Setter
String traceParent;

/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
Expand Down Expand Up @@ -199,7 +204,8 @@ public Execution withState(State.Type state) {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}

Expand All @@ -222,7 +228,8 @@ public Execution withLabels(List<Label> labels) {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}

Expand Down Expand Up @@ -258,7 +265,8 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}

Expand All @@ -281,7 +289,8 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.traceParent
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.flows.State;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import lombok.Getter;
import lombok.Synchronized;
import org.slf4j.Logger;
Expand All @@ -25,6 +27,9 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
@Getter
String type;

@Getter
String uid;

@Getter
Throwable exception;

Expand All @@ -34,10 +39,11 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {

private Thread currentThread;

AbstractWorkerCallable(RunContext runContext, String type, ClassLoader classLoader) {
AbstractWorkerCallable(RunContext runContext, String type, String uid, ClassLoader classLoader) {
this.logger = runContext.logger();
this.runContext = runContext;
this.type = type;
this.uid = uid;
this.classLoader = classLoader;
}

Expand Down Expand Up @@ -100,6 +106,7 @@ protected void kill(boolean markAsKilled) {

protected State.Type exceptionHandler(Throwable e) {
this.exception = e;
Span.current().recordException(e).setStatus(StatusCode.ERROR);

if (this.killed) {
return KILLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ abstract class AbstractWorkerTriggerCallable extends AbstractWorkerCallable {
WorkerTrigger workerTrigger;

AbstractWorkerTriggerCallable(RunContext runContext, String type, WorkerTrigger workerTrigger) {
super(runContext, type, workerTrigger.getTrigger().getClass().getClassLoader());
super(runContext, type, workerTrigger.uid(), workerTrigger.getTrigger().getClass().getClassLoader());
this.workerTrigger = workerTrigger;
}

Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/io/kestra/core/runners/DefaultRunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DefaultRunContext extends RunContext {
private Storage storage;
private Map<String, Object> pluginConfiguration;
private List<String> secretInputs;
private String traceParent;

// those are only used to validate dynamic properties inside the RunContextProperty
private Task task;
Expand Down Expand Up @@ -103,6 +104,20 @@ public List<String> getSecretInputs() {
return secretInputs;
}

/**
* {@inheritDoc}
*/
@Override
@JsonInclude
public String getTraceParent() {
return traceParent;
}

@Override
public void setTraceParent(String traceParent) {
this.traceParent = traceParent;
}

@JsonIgnore
public ApplicationContext getApplicationContext() {
return applicationContext;
Expand Down
Loading

0 comments on commit e87b97a

Please sign in to comment.