Skip to content

Commit ae8c814

Browse files
committed
Support opentelemetry javaagent extension for ReactiveMongo
1 parent c087072 commit ae8c814

File tree

14 files changed

+691
-64
lines changed

14 files changed

+691
-64
lines changed

.scalafmt.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ project.includePaths."+" = ["glob:**.md"]
7575

7676
fileOverride {
7777
"glob:**/project/Dependencies.scala" {
78-
maxColumn = 150
78+
maxColumn = 160
7979
align.preset = more
8080
}
8181
}

build.sbt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ lazy val rawAllAggregates = chimneyZioPrelude.projectRefs ++ circeReactivemongoB
1313
reactivemongoBase.projectRefs ++ reactivemongoBson.projectRefs ++
1414
reactivemongoBsonAny.projectRefs ++ reactivemongoBsonJodaTime.projectRefs ++
1515
reactivemongoBsonRefined.projectRefs ++ reactivemongoBsonZioPrelude.projectRefs ++
16-
reactivemongoKamonInstrumentation.projectRefs ++ redissonCore.projectRefs ++
16+
reactivemongoKamonInstrumentation.projectRefs ++
17+
reactivemongoOpentelemetryJavaagentExtension.projectRefs ++ redissonCore.projectRefs ++
1718
redissonCodecBase.projectRefs ++ redissonCodecCirce.projectRefs ++
1819
redissonCodecPlayJson.projectRefs ++ redissonCodecPlay2Json.projectRefs ++
1920
scalaLogging.projectRefs ++ tapirZioPrelude.projectRefs ++ implicitsAny.projectRefs ++
@@ -242,6 +243,11 @@ lazy val reactivemongoKamonInstrumentation = projectMatrix
242243
.jvmPlatform(ProjectSettings.scala2Versions)
243244
.configure(ModulesCommon.reactivemongoKamonInstrumentationProfile)
244245

246+
lazy val reactivemongoOpentelemetryJavaagentExtension = projectMatrix
247+
.in(file("common/reactivemongo/opentelemetry-javaagent-extension"))
248+
.jvmPlatform(ProjectSettings.scala2_13Versions)
249+
.configure(ModulesCommon.reactivemongoOpentelemetryJavaagentExtensionProfile)
250+
245251
lazy val redissonCore = projectMatrix
246252
.in(file("common/redisson/core"))
247253
.jvmPlatform(ProjectSettings.scala2Versions)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension;
2+
3+
import com.google.auto.service.AutoService;
4+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
5+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
6+
import io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers;
7+
import net.bytebuddy.matcher.ElementMatcher;
8+
import reactivemongo.api.ReactiveMongoInstrumentation;
9+
10+
import java.util.Arrays;
11+
import java.util.Collections;
12+
import java.util.List;
13+
14+
@AutoService(InstrumentationModule.class)
15+
public final class ReactiveMongoInstrumentationModule extends InstrumentationModule {
16+
public ReactiveMongoInstrumentationModule() {
17+
super("reactivemongo");
18+
}
19+
20+
@Override
21+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
22+
return AgentElementMatchers.hasClassesNamed("reactivemongo.api.MongoConnection");
23+
}
24+
25+
@Override
26+
public boolean isIndyModule() {
27+
return true;
28+
}
29+
30+
public List<String> getAdditionalHelperClassNames() {
31+
return Arrays.asList(
32+
"io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension.ReactiveMongoSingletons",
33+
"reactivemongo.api.ResponseFutureWrapper",
34+
"reactivemongo.api.ResponseFutureWrapper$1",
35+
"reactivemongo.api.ResponseFutureWrapper$2",
36+
"reactivemongo.api.ReactiveMongoDbClientAttributesGetter",
37+
"reactivemongo.api.ReactiveMongoAttributesExtractor",
38+
"reactivemongo.api.ReactiveMongoSpanNameExtractor"
39+
);
40+
}
41+
42+
43+
@Override
44+
public List<TypeInstrumentation> typeInstrumentations() {
45+
return Collections.singletonList(new ReactiveMongoInstrumentation());
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension;
2+
3+
import io.opentelemetry.api.GlobalOpenTelemetry;
4+
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesExtractor;
5+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
6+
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
7+
import reactivemongo.api.ExpectingResponseWrapper;
8+
import reactivemongo.api.ReactiveMongoAttributesExtractor;
9+
import reactivemongo.api.ReactiveMongoDbClientAttributesGetter;
10+
import reactivemongo.api.ReactiveMongoSpanNameExtractor;
11+
12+
public final class ReactiveMongoSingletons {
13+
private static final String INSTRUMENTATION_NAME = "io.kinoplan.utils.reactivemongo";
14+
15+
private static final Instrumenter<ExpectingResponseWrapper, Void> INSTRUMENTER;
16+
17+
static {
18+
ReactiveMongoDbClientAttributesGetter dbAttributesGetter = new ReactiveMongoDbClientAttributesGetter();
19+
ReactiveMongoAttributesExtractor attributesExtractor = new ReactiveMongoAttributesExtractor();
20+
ReactiveMongoSpanNameExtractor spanNameExtractor = new ReactiveMongoSpanNameExtractor();
21+
22+
INSTRUMENTER =
23+
Instrumenter.<ExpectingResponseWrapper, Void>builder(
24+
GlobalOpenTelemetry.get(),
25+
INSTRUMENTATION_NAME,
26+
spanNameExtractor)
27+
.addAttributesExtractor(DbClientAttributesExtractor.create(dbAttributesGetter))
28+
.addAttributesExtractor(attributesExtractor)
29+
.buildInstrumenter(SpanKindExtractor.alwaysClient());
30+
}
31+
32+
public static Instrumenter<ExpectingResponseWrapper, Void> instrumenter() {
33+
return INSTRUMENTER;
34+
}
35+
36+
private ReactiveMongoSingletons() {
37+
}
38+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package reactivemongo.api;
2+
3+
4+
import io.opentelemetry.context.Context;
5+
import io.opentelemetry.context.Scope;
6+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
7+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
8+
import net.bytebuddy.asm.Advice;
9+
import net.bytebuddy.description.type.TypeDescription;
10+
import net.bytebuddy.matcher.ElementMatcher;
11+
import reactivemongo.core.actors.ExpectingResponse;
12+
import reactivemongo.core.protocol.Response;
13+
import scala.concurrent.Future;
14+
15+
import static io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension.ReactiveMongoSingletons.instrumenter;
16+
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
17+
import static net.bytebuddy.matcher.ElementMatchers.*;
18+
19+
public class ReactiveMongoInstrumentation implements TypeInstrumentation {
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
return named("reactivemongo.api.MongoConnection");
23+
}
24+
25+
@Override
26+
public void transform(TypeTransformer transformer) {
27+
transformer.applyAdviceToMethod(
28+
isMethod()
29+
.and(named("sendExpectingResponse"))
30+
.and(takesArguments(1))
31+
.and(takesArgument(0, named("reactivemongo.core.actors.ExpectingResponse")))
32+
,
33+
this.getClass().getName() + "$SendExpectingResponseAdvice"
34+
);
35+
}
36+
37+
@SuppressWarnings("unused")
38+
public static class SendExpectingResponseAdvice {
39+
40+
@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(value = 0, index = 0))
41+
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
42+
public static Object[] onEnter(
43+
@Advice.Argument(0) ExpectingResponse expectingResponse
44+
) {
45+
Context parentContext = currentContext();
46+
ExpectingResponseWrapper wrapper = new ExpectingResponseWrapper(expectingResponse);
47+
if (!instrumenter().shouldStart(parentContext, wrapper)) {
48+
return new Object[]{null, null};
49+
}
50+
Context context = instrumenter().start(parentContext, wrapper);
51+
Scope scope = context.makeCurrent();
52+
53+
return new Object[]{context, scope};
54+
}
55+
56+
@Advice.AssignReturned.ToReturned
57+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
58+
public static Future<Response> onExit(
59+
@Advice.Argument(0) ExpectingResponse expectingResponse,
60+
@Advice.This MongoConnection thiz,
61+
@Advice.Return Future<Response> responseFuture,
62+
@Advice.Enter Object[] enter,
63+
@Advice.Thrown Throwable throwable
64+
) {
65+
66+
if (!(enter[0] instanceof Context context) || !(enter[1] instanceof Scope scope)) {
67+
return null;
68+
}
69+
scope.close();
70+
71+
ExpectingResponseWrapper wrapper = new ExpectingResponseWrapper(expectingResponse);
72+
if (throwable != null) {
73+
instrumenter().end(context, wrapper, null, throwable);
74+
return responseFuture;
75+
}
76+
77+
if (responseFuture == null) {
78+
return null;
79+
}
80+
return ResponseFutureWrapper.wrap(responseFuture, context, thiz.actorSystem().dispatcher());
81+
}
82+
}
83+
84+
}
85+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package reactivemongo.api;
2+
3+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
4+
import io.opentelemetry.context.Context;
5+
import reactivemongo.core.protocol.Response;
6+
import scala.concurrent.ExecutionContext;
7+
import scala.concurrent.Future;
8+
import scala.runtime.AbstractFunction1;
9+
10+
import static io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension.ReactiveMongoSingletons.instrumenter;
11+
12+
public final class ResponseFutureWrapper {
13+
14+
public static Future<Response> wrap(
15+
Future<Response> future, Context context, ExecutionContext executionContext) {
16+
17+
return future.transform(
18+
new AbstractFunction1<>() {
19+
@Override
20+
@CanIgnoreReturnValue
21+
public Response apply(Response result) {
22+
if (result.error().isEmpty()) {
23+
instrumenter().end(context, null, null, null);
24+
} else {
25+
instrumenter().end(context, null, null, (Throwable) result.error().get());
26+
}
27+
return result;
28+
}
29+
},
30+
new AbstractFunction1<>() {
31+
@Override
32+
@CanIgnoreReturnValue
33+
public Throwable apply(Throwable throwable) {
34+
instrumenter().end(context, null, null, throwable);
35+
return throwable;
36+
}
37+
},
38+
executionContext);
39+
}
40+
}
41+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package reactivemongo.api
2+
3+
import scala.util.{Failure, Success, Try}
4+
5+
import reactivemongo.api.bson.BSONDocument
6+
import reactivemongo.api.bson.buffer.ReadableBuffer
7+
import reactivemongo.api.bson.collection.BSONSerializationPack.readFromBuffer
8+
import reactivemongo.core.actors.ExpectingResponse
9+
import reactivemongo.core.protocol.CollectionAwareRequestOp
10+
11+
case class ExpectingResponseWrapper(expectingResponse: ExpectingResponse) {
12+
13+
lazy val statement: Option[BSONDocument] = {
14+
val buf = expectingResponse.requestMaker.payload.duplicate()
15+
Try[BSONDocument] {
16+
val sz = buf.getIntLE(buf.readerIndex)
17+
val bytes = Array.ofDim[Byte](sz)
18+
buf.readBytes(bytes)
19+
readFromBuffer(ReadableBuffer(bytes))
20+
} match {
21+
case Failure(_) => None
22+
case Success(value) =>
23+
buf.resetReaderIndex()
24+
Some(value)
25+
}
26+
}
27+
28+
lazy val collectionName: Option[String] = statement.flatMap { document =>
29+
document
30+
.headOption
31+
.flatMap(
32+
_.value.asOpt[String].orElse(document.getAsOpt[String]("collection")) // in case of getMore
33+
)
34+
}
35+
36+
lazy val dbName: Option[String] = expectingResponse.requestMaker.op match {
37+
case op: CollectionAwareRequestOp => Some(op.db)
38+
case _ => None
39+
}
40+
41+
lazy val operationName: Option[String] = statement.flatMap(_.headOption.map(_.name))
42+
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package reactivemongo.api
7+
8+
import io.opentelemetry.api.common.{AttributeKey, AttributesBuilder}
9+
import io.opentelemetry.context.Context
10+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor
11+
import io.opentelemetry.instrumentation.api.internal.SemconvStability.{
12+
emitOldDatabaseSemconv,
13+
emitStableDatabaseSemconv
14+
};
15+
16+
class ReactiveMongoAttributesExtractor extends AttributesExtractor[ExpectingResponseWrapper, Void] {
17+
18+
// copied from DbIncubatingAttributes
19+
private val DB_COLLECTION_NAME = AttributeKey.stringKey("db.collection.name");
20+
private val DB_MONGODB_COLLECTION = AttributeKey.stringKey("db.mongodb.collection");
21+
22+
override def onStart(
23+
attributes: AttributesBuilder,
24+
parentContext: Context,
25+
request: ExpectingResponseWrapper
26+
): Unit = request
27+
.collectionName
28+
.foreach { collectionName =>
29+
if (emitStableDatabaseSemconv()) attributes.put(DB_COLLECTION_NAME, collectionName)
30+
if (emitOldDatabaseSemconv()) attributes.put(DB_MONGODB_COLLECTION, collectionName)
31+
}
32+
33+
override def onEnd(
34+
attributes: AttributesBuilder,
35+
context: Context,
36+
request: ExpectingResponseWrapper,
37+
response: Void,
38+
error: Throwable
39+
): Unit = {}
40+
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package reactivemongo.api
2+
3+
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesGetter
4+
import reactivemongo.api.bson.BSONDocument.pretty
5+
6+
class ReactiveMongoDbClientAttributesGetter
7+
extends DbClientAttributesGetter[ExpectingResponseWrapper] {
8+
9+
override def getUser(request: ExpectingResponseWrapper): String = null
10+
11+
override def getConnectionString(request: ExpectingResponseWrapper): String = null
12+
13+
override def getDbSystem(request: ExpectingResponseWrapper): String = "mongodb"
14+
15+
override def getDbNamespace(request: ExpectingResponseWrapper): String = request
16+
.dbName
17+
.getOrElse("undefined")
18+
19+
override def getDbQueryText(request: ExpectingResponseWrapper): String = request
20+
.statement
21+
.map(pretty)
22+
.orNull
23+
24+
override def getDbOperationName(request: ExpectingResponseWrapper): String =
25+
request.operationName.orNull
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package reactivemongo.api
2+
3+
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor
4+
5+
class ReactiveMongoSpanNameExtractor extends SpanNameExtractor[ExpectingResponseWrapper] {
6+
7+
private val DEFAULT_SPAN_NAME = "DB Query"
8+
9+
override def extract(wrapper: ExpectingResponseWrapper): String = {
10+
val operation = wrapper.operationName.orNull
11+
val dbName = wrapper.dbName.orNull
12+
13+
if (operation == null) return if (dbName == null) DEFAULT_SPAN_NAME
14+
else dbName
15+
16+
val table = wrapper.collectionName.orNull
17+
val name = new StringBuilder(operation)
18+
if (dbName != null || table != null) name.append(' ')
19+
// skip db name if table already has a db name prefixed to it// skip db name if table already has a db name prefixed to it
20+
if (dbName != null && (table == null || table.indexOf('.') == -1)) {
21+
name.append(dbName)
22+
if (table != null) name.append('.')
23+
}
24+
if (table != null) name.append(table)
25+
name.toString
26+
}
27+
28+
}

0 commit comments

Comments
 (0)