Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ project.includePaths."+" = ["glob:**.md"]

fileOverride {
"glob:**/project/Dependencies.scala" {
maxColumn = 150
maxColumn = 160
align.preset = more
}
}
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ lazy val rawAllAggregates = chimneyZioPrelude.projectRefs ++ circeReactivemongoB
reactivemongoBase.projectRefs ++ reactivemongoBson.projectRefs ++
reactivemongoBsonAny.projectRefs ++ reactivemongoBsonJodaTime.projectRefs ++
reactivemongoBsonRefined.projectRefs ++ reactivemongoBsonZioPrelude.projectRefs ++
reactivemongoKamonInstrumentation.projectRefs ++ redissonCore.projectRefs ++
reactivemongoKamonInstrumentation.projectRefs ++
reactivemongoOpentelemetryJavaagentExtension.projectRefs ++ redissonCore.projectRefs ++
redissonCodecBase.projectRefs ++ redissonCodecCirce.projectRefs ++
redissonCodecPlayJson.projectRefs ++ redissonCodecPlay2Json.projectRefs ++
scalaLogging.projectRefs ++ tapirZioPrelude.projectRefs ++ implicitsAny.projectRefs ++
Expand Down Expand Up @@ -242,6 +243,11 @@ lazy val reactivemongoKamonInstrumentation = projectMatrix
.jvmPlatform(ProjectSettings.scala2Versions)
.configure(ModulesCommon.reactivemongoKamonInstrumentationProfile)

lazy val reactivemongoOpentelemetryJavaagentExtension = projectMatrix
.in(file("common/reactivemongo/opentelemetry-javaagent-extension"))
.jvmPlatform(ProjectSettings.scala2_13Versions)
.configure(ModulesCommon.reactivemongoOpentelemetryJavaagentExtensionProfile)

lazy val redissonCore = projectMatrix
.in(file("common/redisson/core"))
.jvmPlatform(ProjectSettings.scala2Versions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers;
import net.bytebuddy.matcher.ElementMatcher;
import reactivemongo.api.ReactiveMongoInstrumentation;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@AutoService(InstrumentationModule.class)
public final class ReactiveMongoInstrumentationModule extends InstrumentationModule {
public ReactiveMongoInstrumentationModule() {
super("reactivemongo");
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
return AgentElementMatchers.hasClassesNamed("reactivemongo.api.MongoConnection");
}

@Override
public boolean isIndyModule() {
return true;
}

public List<String> getAdditionalHelperClassNames() {
return Arrays.asList(
"io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension.ReactiveMongoSingletons",
"reactivemongo.api.ResponseFutureWrapper",
"reactivemongo.api.ResponseFutureWrapper$1",
"reactivemongo.api.ResponseFutureWrapper$2",
"reactivemongo.api.ReactiveMongoDbClientAttributesGetter",
"reactivemongo.api.ReactiveMongoAttributesExtractor",
"reactivemongo.api.ReactiveMongoSpanNameExtractor"
);
}


@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new ReactiveMongoInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import reactivemongo.api.ExpectingResponseWrapper;
import reactivemongo.api.ReactiveMongoAttributesExtractor;
import reactivemongo.api.ReactiveMongoDbClientAttributesGetter;
import reactivemongo.api.ReactiveMongoSpanNameExtractor;

public final class ReactiveMongoSingletons {
private static final String INSTRUMENTATION_NAME = "io.kinoplan.utils.reactivemongo";

private static final Instrumenter<ExpectingResponseWrapper, Void> INSTRUMENTER;

static {
ReactiveMongoDbClientAttributesGetter dbAttributesGetter = new ReactiveMongoDbClientAttributesGetter();
ReactiveMongoAttributesExtractor attributesExtractor = new ReactiveMongoAttributesExtractor();
ReactiveMongoSpanNameExtractor spanNameExtractor = new ReactiveMongoSpanNameExtractor();

INSTRUMENTER =
Instrumenter.<ExpectingResponseWrapper, Void>builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
spanNameExtractor)
.addAttributesExtractor(DbClientAttributesExtractor.create(dbAttributesGetter))
.addAttributesExtractor(attributesExtractor)
.buildInstrumenter(SpanKindExtractor.alwaysClient());
}

public static Instrumenter<ExpectingResponseWrapper, Void> instrumenter() {
return INSTRUMENTER;
}

private ReactiveMongoSingletons() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package reactivemongo.api;


import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactivemongo.core.actors.ExpectingResponse;
import reactivemongo.core.protocol.Response;
import scala.concurrent.Future;

import static io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension.ReactiveMongoSingletons.instrumenter;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static net.bytebuddy.matcher.ElementMatchers.*;

public class ReactiveMongoInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("reactivemongo.api.MongoConnection");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("sendExpectingResponse"))
.and(takesArguments(1))
.and(takesArgument(0, named("reactivemongo.core.actors.ExpectingResponse")))
,
this.getClass().getName() + "$SendExpectingResponseAdvice"
);
}

@SuppressWarnings("unused")
public static class SendExpectingResponseAdvice {

@Advice.AssignReturned.ToArguments(@Advice.AssignReturned.ToArguments.ToArgument(value = 0, index = 0))
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static Object[] onEnter(
@Advice.Argument(0) ExpectingResponse expectingResponse
) {
Context parentContext = currentContext();
ExpectingResponseWrapper wrapper = new ExpectingResponseWrapper(expectingResponse);
if (!instrumenter().shouldStart(parentContext, wrapper)) {
return new Object[]{null, null};
}
Context context = instrumenter().start(parentContext, wrapper);
Scope scope = context.makeCurrent();

return new Object[]{context, scope};
}

@Advice.AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static Future<Response> onExit(
@Advice.Argument(0) ExpectingResponse expectingResponse,
@Advice.This MongoConnection thiz,
@Advice.Return Future<Response> responseFuture,
@Advice.Enter Object[] enter,
@Advice.Thrown Throwable throwable
) {

if (!(enter[0] instanceof Context context) || !(enter[1] instanceof Scope scope)) {
return null;
}
scope.close();

ExpectingResponseWrapper wrapper = new ExpectingResponseWrapper(expectingResponse);
if (throwable != null) {
instrumenter().end(context, wrapper, null, throwable);
return responseFuture;
}

if (responseFuture == null) {
return null;
}
return ResponseFutureWrapper.wrap(responseFuture, context, thiz.actorSystem().dispatcher());
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package reactivemongo.api;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.context.Context;
import reactivemongo.core.protocol.Response;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;

import static io.kinoplan.utils.reactivemongo.opentelemetry.javaagent.extension.ReactiveMongoSingletons.instrumenter;

public final class ResponseFutureWrapper {

public static Future<Response> wrap(
Future<Response> future, Context context, ExecutionContext executionContext) {

return future.transform(
new AbstractFunction1<>() {
@Override
@CanIgnoreReturnValue
public Response apply(Response result) {
if (result.error().isEmpty()) {
instrumenter().end(context, null, null, null);
} else {
instrumenter().end(context, null, null, (Throwable) result.error().get());
}
return result;
}
},
new AbstractFunction1<>() {
@Override
@CanIgnoreReturnValue
public Throwable apply(Throwable throwable) {
instrumenter().end(context, null, null, throwable);
return throwable;
}
},
executionContext);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package reactivemongo.api

import scala.util.{Failure, Success, Try}

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.buffer.ReadableBuffer
import reactivemongo.api.bson.collection.BSONSerializationPack.readFromBuffer
import reactivemongo.core.actors.ExpectingResponse
import reactivemongo.core.protocol.CollectionAwareRequestOp

case class ExpectingResponseWrapper(expectingResponse: ExpectingResponse) {

lazy val statement: Option[BSONDocument] = {
val buf = expectingResponse.requestMaker.payload.duplicate()
Try[BSONDocument] {
val sz = buf.getIntLE(buf.readerIndex)
val bytes = Array.ofDim[Byte](sz)
buf.readBytes(bytes)
readFromBuffer(ReadableBuffer(bytes))
} match {
case Failure(_) => None
case Success(value) =>
buf.resetReaderIndex()
Some(value)
}
}

lazy val collectionName: Option[String] = statement.flatMap { document =>
document
.headOption
.flatMap(
_.value.asOpt[String].orElse(document.getAsOpt[String]("collection")) // in case of getMore
)
}

lazy val dbName: Option[String] = expectingResponse.requestMaker.op match {
case op: CollectionAwareRequestOp => Some(op.db)
case _ => None
}

lazy val operationName: Option[String] = statement.flatMap(_.headOption.map(_.name))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package reactivemongo.api

import io.opentelemetry.api.common.{AttributeKey, AttributesBuilder}
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor
import io.opentelemetry.instrumentation.api.internal.SemconvStability.{
emitOldDatabaseSemconv,
emitStableDatabaseSemconv
};

class ReactiveMongoAttributesExtractor extends AttributesExtractor[ExpectingResponseWrapper, Void] {

// copied from DbIncubatingAttributes
private val DB_COLLECTION_NAME = AttributeKey.stringKey("db.collection.name");
private val DB_MONGODB_COLLECTION = AttributeKey.stringKey("db.mongodb.collection");

override def onStart(
attributes: AttributesBuilder,
parentContext: Context,
request: ExpectingResponseWrapper
): Unit = request
.collectionName
.foreach { collectionName =>
if (emitStableDatabaseSemconv()) attributes.put(DB_COLLECTION_NAME, collectionName)
if (emitOldDatabaseSemconv()) attributes.put(DB_MONGODB_COLLECTION, collectionName)
}

override def onEnd(
attributes: AttributesBuilder,
context: Context,
request: ExpectingResponseWrapper,
response: Void,
error: Throwable
): Unit = {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package reactivemongo.api

import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesGetter
import reactivemongo.api.bson.BSONDocument.pretty

class ReactiveMongoDbClientAttributesGetter
extends DbClientAttributesGetter[ExpectingResponseWrapper] {

override def getUser(request: ExpectingResponseWrapper): String = null

Check warning on line 9 in common/reactivemongo/opentelemetry-javaagent-extension/src/main/scala/reactivemongo/api/ReactiveMongoDbClientAttributesGetter.scala

View check run for this annotation

Codecov / codecov/patch

common/reactivemongo/opentelemetry-javaagent-extension/src/main/scala/reactivemongo/api/ReactiveMongoDbClientAttributesGetter.scala#L9

Added line #L9 was not covered by tests

override def getConnectionString(request: ExpectingResponseWrapper): String = null

Check warning on line 11 in common/reactivemongo/opentelemetry-javaagent-extension/src/main/scala/reactivemongo/api/ReactiveMongoDbClientAttributesGetter.scala

View check run for this annotation

Codecov / codecov/patch

common/reactivemongo/opentelemetry-javaagent-extension/src/main/scala/reactivemongo/api/ReactiveMongoDbClientAttributesGetter.scala#L11

Added line #L11 was not covered by tests

override def getDbSystem(request: ExpectingResponseWrapper): String = "mongodb"

override def getDbNamespace(request: ExpectingResponseWrapper): String = request
.dbName
.getOrElse("undefined")

override def getDbQueryText(request: ExpectingResponseWrapper): String = request
.statement
.map(pretty)
.orNull

override def getDbOperationName(request: ExpectingResponseWrapper): String =
request.operationName.orNull

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package reactivemongo.api

import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor

class ReactiveMongoSpanNameExtractor extends SpanNameExtractor[ExpectingResponseWrapper] {

private val DEFAULT_SPAN_NAME = "DB Query"

override def extract(wrapper: ExpectingResponseWrapper): String = {
val operation = wrapper.operationName.orNull
val dbName = wrapper.dbName.orNull

if (operation == null) return if (dbName == null) DEFAULT_SPAN_NAME
else dbName

Check warning on line 14 in common/reactivemongo/opentelemetry-javaagent-extension/src/main/scala/reactivemongo/api/ReactiveMongoSpanNameExtractor.scala

View check run for this annotation

Codecov / codecov/patch

common/reactivemongo/opentelemetry-javaagent-extension/src/main/scala/reactivemongo/api/ReactiveMongoSpanNameExtractor.scala#L14

Added line #L14 was not covered by tests

val table = wrapper.collectionName.orNull
val name = new StringBuilder(operation)
if (dbName != null || table != null) name.append(' ')
// skip db name if table already has a db name prefixed to it// skip db name if table already has a db name prefixed to it
if (dbName != null && (table == null || table.indexOf('.') == -1)) {
name.append(dbName)
if (table != null) name.append('.')
}
if (table != null) name.append(table)
name.toString
}

}
Loading
Loading