Skip to content

Commit

Permalink
Link JMS receive span with the producer span (open-telemetry#6804)
Browse files Browse the repository at this point in the history
Resolves open-telemetry#6779

In JMS you can have either the consumer receive span or the consumer
process span (unlike Kafka, where the process span is always there and
the receive span is just an addition) - in scenarios where polling
(receive) is used, I think it makes sense to add links to the producer
span to preserve the producer-consumer connection. Current messaging
semantic conventions don't really describe a situation like this one,
but the open-telemetry/oteps#220 OTEP mentions
that links might be used in a scenario like this one - which makes me
think that adding links here might be a not that bad idea.
  • Loading branch information
Mateusz Rzeszutek authored and LironKS committed Oct 31, 2022
1 parent e77a41f commit ec68009
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.google.common.io.Files
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
Expand All @@ -26,6 +25,7 @@ import javax.jms.Message
import javax.jms.MessageListener
import javax.jms.Session
import javax.jms.TextMessage
import java.nio.file.Files
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

Expand All @@ -43,7 +43,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
HornetQTextMessage message = session.createTextMessage(messageText)

def setupSpec() {
def tempDir = Files.createTempDir()
def tempDir = Files.createTempDirectory("jmsTempDir").toFile()
tempDir.deleteOnExit()

Configuration config = new ConfigurationImpl()
Expand Down Expand Up @@ -86,19 +86,34 @@ class Jms2Test extends AgentInstrumentationSpecification {
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)

producer.send(message)
runWithSpan("producer parent") {
producer.send(message)
}

TextMessage receivedMessage = consumer.receive()
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()

expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName)
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))

producerSpanData = span(1)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive")
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData)
}
}

Expand All @@ -124,18 +139,24 @@ class Jms2Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}

producer.send(message)
runWithSpan("parent") {
producer.send(message)
}
lock.countDown()

expect:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
Expand All @@ -158,7 +179,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)

// Receive with timeout
TextMessage receivedMessage = consumer.receiveNoWait()
Message receivedMessage = consumer.receiveNoWait()

expect:
receivedMessage == null
Expand All @@ -179,7 +200,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receive(100)
Message receivedMessage = consumer.receive(100)
expect:
receivedMessage == null
Expand All @@ -206,19 +227,25 @@ class Jms2Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}
when:
producer.send(destination, message)
runWithSpan("parent") {
producer.send(destination, message)
}
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
Expand All @@ -236,11 +263,15 @@ class Jms2Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
hasNoParent()
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
Expand All @@ -256,14 +287,19 @@ class Jms2Test extends AgentInstrumentationSpecification {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
if (parentOrLinkedSpan != null) {
childOf((SpanData) parentOrLinkedSpan)
} else {
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink(linkedSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.opentelemetry.javaagent.instrumentation.jms.JmsSingletons.consumerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
Expand Down Expand Up @@ -37,10 +38,16 @@ public ElementMatcher<TypeDescription> typeMatcher() {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
named("receive")
.and(takesArguments(0).or(takesArguments(1)))
.and(returns(named("javax.jms.Message")))
.and(isPublic()),
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
transformer.applyAdviceToMethod(
named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
named("receiveNoWait")
.and(takesArguments(0))
.and(returns(named("javax.jms.Message")))
.and(isPublic()),
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;

public final class JmsSingletons {
Expand Down Expand Up @@ -47,6 +48,10 @@ private static Instrumenter<MessageWithDestination, Void> buildConsumerInstrumen
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(),
MessagePropertyGetter.INSTANCE))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand Down
Loading

0 comments on commit ec68009

Please sign in to comment.