-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #40191 from vkn/mongodb-client-opentelemetry
Add mongo commands to otel span attributes
- Loading branch information
Showing
14 changed files
with
704 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
...-client/runtime/src/main/java/io/quarkus/mongodb/tracing/MongoTracingCommandListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package io.quarkus.mongodb.tracing; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
import jakarta.inject.Inject; | ||
|
||
import org.jboss.logging.Logger; | ||
|
||
import com.mongodb.event.*; | ||
|
||
import io.opentelemetry.api.OpenTelemetry; | ||
import io.opentelemetry.api.common.AttributesBuilder; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; | ||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; | ||
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; | ||
|
||
public class MongoTracingCommandListener implements CommandListener { | ||
private static final org.jboss.logging.Logger LOGGER = Logger.getLogger(MongoTracingCommandListener.class); | ||
private static final String KEY = "mongodb.command"; | ||
private final Map<Integer, ContextEvent> requestMap; | ||
private final Instrumenter<CommandStartedEvent, Void> instrumenter; | ||
|
||
private record ContextEvent(Context context, CommandStartedEvent commandEvent) { | ||
} | ||
|
||
@Inject | ||
public MongoTracingCommandListener(OpenTelemetry openTelemetry) { | ||
requestMap = new ConcurrentHashMap<>(); | ||
SpanNameExtractor<CommandStartedEvent> spanNameExtractor = CommandEvent::getCommandName; | ||
instrumenter = Instrumenter.<CommandStartedEvent, Void> builder( | ||
openTelemetry, "quarkus-mongodb-client", spanNameExtractor) | ||
.addAttributesExtractor(new CommandEventAttrExtractor()) | ||
.buildInstrumenter(SpanKindExtractor.alwaysClient()); | ||
LOGGER.debugf("MongoTracingCommandListener created"); | ||
} | ||
|
||
@Override | ||
public void commandStarted(CommandStartedEvent event) { | ||
LOGGER.tracef("commandStarted event %s", event.getCommandName()); | ||
|
||
Context parentContext = Context.current(); | ||
if (instrumenter.shouldStart(parentContext, event)) { | ||
Context context = instrumenter.start(parentContext, event); | ||
requestMap.put(event.getRequestId(), new ContextEvent(context, event)); | ||
} | ||
} | ||
|
||
@Override | ||
public void commandSucceeded(CommandSucceededEvent event) { | ||
LOGGER.tracef("commandSucceeded event %s", event.getCommandName()); | ||
ContextEvent contextEvent = requestMap.remove(event.getRequestId()); | ||
if (contextEvent != null) { | ||
instrumenter.end(contextEvent.context(), contextEvent.commandEvent(), null, null); | ||
} | ||
} | ||
|
||
@Override | ||
public void commandFailed(CommandFailedEvent event) { | ||
LOGGER.tracef("commandFailed event %s", event.getCommandName()); | ||
ContextEvent contextEvent = requestMap.remove(event.getRequestId()); | ||
if (contextEvent != null) { | ||
instrumenter.end( | ||
contextEvent.context(), | ||
contextEvent.commandEvent(), | ||
null, | ||
event.getThrowable()); | ||
} | ||
} | ||
|
||
private static class CommandEventAttrExtractor implements AttributesExtractor<CommandStartedEvent, Void> { | ||
@Override | ||
public void onStart(AttributesBuilder attributesBuilder, | ||
Context context, | ||
CommandStartedEvent commandStartedEvent) { | ||
attributesBuilder.put(KEY, commandStartedEvent.getCommand().toJson()); | ||
} | ||
|
||
@Override | ||
public void onEnd(AttributesBuilder attributesBuilder, | ||
Context context, | ||
CommandStartedEvent commandStartedEvent, | ||
@Nullable Void unused, | ||
@Nullable Throwable throwable) { | ||
|
||
} | ||
} | ||
} |
103 changes: 103 additions & 0 deletions
103
...ent/runtime/src/test/java/io/quarkus/mongodb/tracing/MongoTracingCommandListenerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package io.quarkus.mongodb.tracing; | ||
|
||
import static org.assertj.core.api.Assertions.assertThatNoException; | ||
|
||
import org.bson.BsonDocument; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import com.mongodb.ServerAddress; | ||
import com.mongodb.connection.ClusterId; | ||
import com.mongodb.connection.ConnectionDescription; | ||
import com.mongodb.connection.ServerId; | ||
import com.mongodb.event.CommandFailedEvent; | ||
import com.mongodb.event.CommandStartedEvent; | ||
import com.mongodb.event.CommandSucceededEvent; | ||
|
||
import io.opentelemetry.api.OpenTelemetry; | ||
|
||
class MongoTracingCommandListenerTest { | ||
private ConnectionDescription connDescr; | ||
private MongoTracingCommandListener listener; | ||
private BsonDocument command; | ||
|
||
@BeforeEach | ||
void setUp() { | ||
connDescr = new ConnectionDescription(new ServerId(new ClusterId(), new ServerAddress())); | ||
listener = new MongoTracingCommandListener(OpenTelemetry.noop()); | ||
command = new BsonDocument(); | ||
} | ||
|
||
@Test | ||
void commandStarted() { | ||
var startEvent = new CommandStartedEvent( | ||
null, | ||
1L, | ||
10, | ||
connDescr, | ||
"db", | ||
"find", | ||
command); | ||
assertThatNoException().isThrownBy(() -> listener.commandStarted(startEvent)); | ||
|
||
CommandSucceededEvent successEvent = new CommandSucceededEvent(null, | ||
startEvent.getOperationId(), | ||
startEvent.getRequestId(), | ||
connDescr, | ||
startEvent.getDatabaseName(), | ||
startEvent.getCommandName(), | ||
startEvent.getCommand(), | ||
10L); | ||
assertThatNoException().isThrownBy(() -> listener.commandSucceeded(successEvent)); | ||
} | ||
|
||
@Test | ||
void commandSucceeded() { | ||
CommandSucceededEvent cmd = new CommandSucceededEvent(null, | ||
1L, | ||
10, | ||
connDescr, | ||
"db", | ||
"find", | ||
command, | ||
10L); | ||
assertThatNoException().isThrownBy(() -> listener.commandSucceeded(cmd)); | ||
} | ||
|
||
@Test | ||
void commandFailed() { | ||
var startedEvent = new CommandStartedEvent( | ||
null, | ||
1L, | ||
10, | ||
connDescr, | ||
"db", | ||
"find", | ||
command); | ||
assertThatNoException().isThrownBy(() -> listener.commandStarted(startedEvent)); | ||
|
||
CommandFailedEvent failedEvent = new CommandFailedEvent(null, | ||
1L, | ||
10, | ||
connDescr, | ||
"db", | ||
"find", | ||
10L, | ||
new IllegalStateException("command failed")); | ||
assertThatNoException().isThrownBy(() -> listener.commandFailed(failedEvent)); | ||
} | ||
|
||
@Test | ||
void commandFailedNoEvent() { | ||
CommandFailedEvent cmd = new CommandFailedEvent(null, | ||
1L, | ||
10, | ||
connDescr, | ||
"db", | ||
"find", | ||
10L, | ||
new IllegalStateException("command failed")); | ||
assertThatNoException().isThrownBy(() -> listener.commandFailed(cmd)); | ||
} | ||
|
||
} |
Oops, something went wrong.