Skip to content

Commit

Permalink
Fix rocketmq test (open-telemetry#10902)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Mar 21, 2024
1 parent 8e1b900 commit 789f444
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ tasks.withType<Test>().configureEach {
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")

jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")

// with default settings tests will fail when disk is 90% full
jvmArgs("-Drocketmq.broker.diskSpaceWarningLevelRatio=1.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ tasks.withType<Test>().configureEach {
jvmArgs("--add-opens=java.base/sun.nio.ch=ALL-UNNAMED")
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
// with default settings tests will fail when disk is 90% full
jvmArgs("-Drocketmq.broker.diskSpaceWarningLevelRatio=1.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
Expand All @@ -23,6 +25,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
Expand Down Expand Up @@ -253,106 +257,115 @@ void testRocketmqProduceAndBatchConsume() throws Exception {
int maxAttempts = 5;
for (int i = 0; i < maxAttempts; i++) {
tracingMessageListener.reset();

testing().runWithSpan("parent", () -> producer.send(msgs));

tracingMessageListener.waitForMessages();
if (tracingMessageListener.getLastBatchSize() == 2) {
break;
} else if (i < maxAttempts) {
// if messages weren't received as a batch we get 1 trace instead of 2
testing().waitForTraces(1);
Thread.sleep(2_000);
testing().clearData();
logger.error("Messages weren't received as batch, retrying");
}
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName(sharedTopic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
equalTo(
AttributeKey.stringKey("messaging.rocketmq.send_result"),
"SEND_OK"))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("multiple_sources receive")
.hasKind(SpanKind.CONSUMER)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(
SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class))),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(
SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class))),
span ->
span.hasName("messageListener")
.hasParent(trace.getSpan(0))
.hasKind(SpanKind.INTERNAL)));
}

AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
testing()
.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName(sharedTopic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
equalTo(
AttributeKey.stringKey("messaging.rocketmq.send_result"),
"SEND_OK")));

SpanContext spanContext = trace.getSpan(1).getSpanContext();
producerSpanContext.set(
SpanContext.createFromRemoteParent(
spanContext.getTraceId(),
spanContext.getSpanId(),
spanContext.getTraceFlags(),
spanContext.getTraceState()));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("multiple_sources receive")
.hasKind(SpanKind.CONSUMER)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinksSatisfying(links(producerSpanContext.get()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isNotEmpty()),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_id"),
val -> val.isNotNull()),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
val -> val.isNotNull())),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinksSatisfying(links(producerSpanContext.get()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagB"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isNotEmpty()),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_id"),
val -> val.isNotNull()),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
val -> val.isNotNull())),
span ->
span.hasName("messageListener")
.hasParent(trace.getSpan(0))
.hasKind(SpanKind.INTERNAL)));
}

@Test
Expand Down Expand Up @@ -433,4 +446,23 @@ void captureMessageHeaderAsSpanAttributes() throws Exception {
.hasParent(trace.getSpan(2))
.hasKind(SpanKind.INTERNAL)));
}

private static Consumer<List<? extends LinkData>> links(SpanContext... spanContexts) {
return links -> {
assertThat(links).hasSize(spanContexts.length);
for (SpanContext spanContext : spanContexts) {
assertThat(links)
.anySatisfy(
link -> {
assertThat(link.getSpanContext().getTraceId())
.isEqualTo(spanContext.getTraceId());
assertThat(link.getSpanContext().getSpanId()).isEqualTo(spanContext.getSpanId());
assertThat(link.getSpanContext().getTraceFlags())
.isEqualTo(spanContext.getTraceFlags());
assertThat(link.getSpanContext().getTraceState())
.isEqualTo(spanContext.getTraceState());
});
}
};
}
}

0 comments on commit 789f444

Please sign in to comment.