From 3fc8e9b817a7d7393f641eda67b245311635a334 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 4 May 2022 14:10:39 +0200 Subject: [PATCH 1/4] Implement vertx-kafka-client instrumentation; single record --- .../kafkaclients/TracingIterator.java | 4 +- .../javaagent/build.gradle.kts | 59 ++++++ .../v3_5/InstrumentedSingleRecordHandler.java | 62 ++++++ ...fkaConsumerRecordsImplInstrumentation.java | 50 +++++ .../KafkaReadStreamImplInstrumentation.java | 85 +++++++++ .../v3_5/VertxKafkaInstrumentationModule.java | 27 +++ .../kafka/v3_5/VertxKafkaSingletons.java | 26 +++ .../v3_5/SingleRecordVertxKafkaTest.java | 178 ++++++++++++++++++ ...veTelemetrySingleRecordVertxKafkaTest.java | 129 +++++++++++++ .../testing/build.gradle.kts | 13 ++ .../kafka/v3_5/AbstractVertxKafkaTest.java | 88 +++++++++ settings.gradle.kts | 2 + 12 files changed, 722 insertions(+), 1 deletion(-) create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts create mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java index 12d764c5bc94..b5cd07c8fff6 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java @@ -54,7 +54,9 @@ public ConsumerRecord next() { closeScopeAndEndSpan(); ConsumerRecord next = delegateIterator.next(); - if (next != null && consumerProcessInstrumenter().shouldStart(parentContext, next)) { + if (next != null + && consumerProcessInstrumenter().shouldStart(parentContext, next) + && KafkaClientsConsumerProcessTracing.wrappingEnabled()) { currentRequest = next; currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest); currentScope = currentContext.makeCurrent(); diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts new file mode 100644 index 000000000000..935f3ad924f4 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts @@ -0,0 +1,59 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +dependencies { + bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap")) + implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) + + library("io.vertx:vertx-kafka-client:3.5.0") + + // vertx-codegen and vertx-docgen dependencies are needed for Xlint's annotation checking + library("io.vertx:vertx-codegen:3.0.0") + testLibrary("io.vertx:vertx-docgen:3.0.0") + + // vertx-kafka-client 3.5 uses kafka-clients 0.10.2.1 by default, need to bump it to make instrumentation work + testImplementation("org.apache.kafka:kafka-clients:0.11.0.0") + + testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.5:testing")) + + testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) +} + +testing { + suites { + val testNoReceiveTelemetry by registering(JvmTestSuite::class) { + dependencies { + implementation("io.vertx:vertx-kafka-client:3.5.0") + implementation("io.vertx:vertx-codegen:3.0.0") + implementation("io.vertx:vertx-docgen:3.0.0") + implementation("org.apache.kafka:kafka-clients:0.11.0.0") + implementation(project(":instrumentation:vertx:vertx-kafka-client-3.5:testing")) + } + + targets { + all { + testTask.configure { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") + } + } + } + } + } +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testing.suites) + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java new file mode 100644 index 000000000000..d9598e91064a --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5.VertxKafkaSingletons.processInstrumenter; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.field.VirtualField; +import io.vertx.core.Handler; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public final class InstrumentedSingleRecordHandler implements Handler> { + + private final VirtualField, Context> receiveContextField; + @Nullable private final Handler> delegate; + + public InstrumentedSingleRecordHandler( + VirtualField, Context> receiveContextField, + @Nullable Handler> delegate) { + this.receiveContextField = receiveContextField; + this.delegate = delegate; + } + + @Override + public void handle(ConsumerRecord record) { + Context parentContext = getParentContext(record); + + if (!processInstrumenter().shouldStart(parentContext, record)) { + callDelegateHandler(record); + return; + } + + Context context = processInstrumenter().start(parentContext, record); + Throwable error = null; + try (Scope ignored = context.makeCurrent()) { + callDelegateHandler(record); + } catch (Throwable t) { + error = t; + throw t; + } finally { + processInstrumenter().end(context, record, null, error); + } + } + + private Context getParentContext(ConsumerRecord records) { + Context receiveContext = receiveContextField.get(records); + + // use the receive CONSUMER span as parent if it's available + return receiveContext != null ? receiveContext : Context.current(); + } + + private void callDelegateHandler(ConsumerRecord record) { + if (delegate != null) { + delegate.handle(record); + } + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java new file mode 100644 index 000000000000..4738355aac83 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class KafkaConsumerRecordsImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordsImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("recordAt") + .and(isPublic()) + .and(takesArguments(int.class)) + .and(returns(named("io.vertx.kafka.client.consumer.KafkaConsumerRecord"))), + this.getClass().getName() + "$RecordAtAdvice"); + } + + @SuppressWarnings("unused") + public static class RecordAtAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static boolean onEnter() { + return KafkaClientsConsumerProcessTracing.setEnabled(false); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit(@Advice.Enter boolean previousValue) { + KafkaClientsConsumerProcessTracing.setEnabled(previousValue); + } + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java new file mode 100644 index 000000000000..b2ac08418824 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.field.VirtualField; +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.vertx.core.Handler; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public class KafkaReadStreamImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("handler") + .and(takesArguments(1)) + .and(takesArgument(0, named("io.vertx.core.Handler"))), + this.getClass().getName() + "$HandlerAdvice"); + transformer.applyAdviceToMethod( + named("batchHandler") + .and(takesArguments(1)) + .and(takesArgument(0, named("io.vertx.core.Handler"))), + this.getClass().getName() + "$BatchHandlerAdvice"); + transformer.applyAdviceToMethod( + named("run").and(isPrivate()), this.getClass().getName() + "$RunAdvice"); + } + + @SuppressWarnings("unused") + public static class HandlerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) Handler> handler) { + + VirtualField, Context> receiveContextField = + VirtualField.find(ConsumerRecord.class, Context.class); + handler = new InstrumentedSingleRecordHandler<>(receiveContextField, handler); + } + } + + @SuppressWarnings("unused") + public static class BatchHandlerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) Handler> handler) { + // TODO: next PR + } + } + + // this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation + @SuppressWarnings("unused") + public static class RunAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static boolean onEnter() { + return KafkaClientsConsumerProcessTracing.setEnabled(false); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Enter boolean previousValue) { + KafkaClientsConsumerProcessTracing.setEnabled(previousValue); + } + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java new file mode 100644 index 000000000000..c9b7ab61f305 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class VertxKafkaInstrumentationModule extends InstrumentationModule { + + public VertxKafkaInstrumentationModule() { + super("vertx-kafka-client", "vertx-kafka-client-3.5", "vertx"); + } + + @Override + public List typeInstrumentations() { + return asList( + new KafkaReadStreamImplInstrumentation(), new KafkaConsumerRecordsImplInstrumentation()); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java new file mode 100644 index 000000000000..35c889caf751 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public final class VertxKafkaSingletons { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-kafka-client-3.5"; + + private static final Instrumenter, Void> PROCESS_INSTRUMENTER = + new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .createConsumerProcessInstrumenter(); + + public static Instrumenter, Void> processInstrumenter() { + return PROCESS_INSTRUMENTER; + } + + private VertxKafkaSingletons() {} +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java new file mode 100644 index 000000000000..5a9038cf38e1 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java @@ -0,0 +1,178 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class SingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { + + static final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + static void setUpTopicAndConsumer() { + kafkaConsumer.handler( + record -> { + testing.runWithSpan("consumer", () -> {}); + if (record.value().equals("error")) { + throw new IllegalArgumentException("boom"); + } + }); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + kafkaConsumer.subscribe("testSingleTopic"); + } + + @Test + void shouldCreateSpansForSingleRecordProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + CountDownLatch sent = new CountDownLatch(1); + testing.runWithSpan( + "producer", + () -> { + kafkaProducer.write( + KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"), + result -> sent.countDown()); + }); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + AtomicReference producer = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testSingleTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative), + satisfies( + longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } + + @Test + void shouldHandleFailureInSingleRecordHandler() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + CountDownLatch sent = new CountDownLatch(1); + testing.runWithSpan( + "producer", + () -> { + kafkaProducer.write( + KafkaProducerRecord.create("testSingleTopic", "10", "error"), + result -> sent.countDown()); + }); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + AtomicReference producer = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testSingleTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative), + satisfies( + longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java new file mode 100644 index 000000000000..039d88793e08 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java @@ -0,0 +1,129 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class NoReceiveTelemetrySingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { + + static final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + static void setUpTopicAndConsumer() { + kafkaConsumer.handler( + record -> { + testing.runWithSpan("consumer", () -> {}); + if (record.value().equals("error")) { + throw new IllegalArgumentException("boom"); + } + }); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + kafkaConsumer.subscribe("testSingleTopic"); + } + + @Test + void shouldCreateSpansForSingleRecordProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + CountDownLatch sent = new CountDownLatch(1); + testing.runWithSpan( + "producer", + () -> { + kafkaProducer.write( + KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"), + result -> sent.countDown()); + }); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @Test + void shouldHandleFailureInSingleRecordHandler() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + CountDownLatch sent = new CountDownLatch(1); + testing.runWithSpan( + "producer", + () -> { + kafkaProducer.write( + KafkaProducerRecord.create("testSingleTopic", "10", "error"), + result -> sent.countDown()); + }); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts new file mode 100644 index 000000000000..6d4d78dbc7c8 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + implementation(project(":testing-common")) + implementation("org.testcontainers:kafka") + + compileOnly("io.vertx:vertx-kafka-client:3.5.0") + // vertx-codegen and vertx-docgen dependencies are needed for Xlint's annotation checking + compileOnly("io.vertx:vertx-codegen:3.0.0") + compileOnly("io.vertx:vertx-docgen:3.0.0") +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java new file mode 100644 index 000000000000..efa8f9dfda35 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java @@ -0,0 +1,88 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.producer.KafkaProducer; +import java.time.Duration; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +public abstract class AbstractVertxKafkaTest { + + private static final Logger logger = LoggerFactory.getLogger(AbstractVertxKafkaTest.class); + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static KafkaContainer kafka; + static Vertx vertx; + protected static KafkaProducer kafkaProducer; + protected static KafkaConsumer kafkaConsumer; + + @BeforeAll + static void setUpAll() { + kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + + vertx = Vertx.vertx(); + kafkaProducer = KafkaProducer.create(vertx, producerProps()); + kafkaConsumer = KafkaConsumer.create(vertx, consumerProps()); + } + + @AfterAll + static void tearDownAll() { + if (kafkaConsumer != null) { + kafkaConsumer.close(unused -> {}); + } + if (kafkaProducer != null) { + kafkaProducer.close(unused -> {}); + } + if (vertx != null) { + vertx.close(unused -> {}); + } + kafka.stop(); + } + + private static Properties producerProps() { + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + props.put("retries", 0); + props.put("key.serializer", StringSerializer.class); + props.put("value.serializer", StringSerializer.class); + return props; + } + + private static Properties consumerProps() { + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + props.put("group.id", "test"); + props.put("enable.auto.commit", true); + props.put("auto.commit.interval.ms", 10); + props.put("session.timeout.ms", 30000); + props.put("auto.offset.reset", "earliest"); + props.put("key.deserializer", StringDeserializer.class); + props.put("value.deserializer", StringDeserializer.class); + return props; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 6eecfac8446d..f238be475e1a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -429,6 +429,8 @@ include(":instrumentation:vaadin-14.2:testing") include(":instrumentation:vertx:vertx-http-client:vertx-http-client-3.0:javaagent") include(":instrumentation:vertx:vertx-http-client:vertx-http-client-4.0:javaagent") include(":instrumentation:vertx:vertx-http-client:vertx-http-client-common:javaagent") +include(":instrumentation:vertx:vertx-kafka-client-3.5:javaagent") +include(":instrumentation:vertx:vertx-kafka-client-3.5:testing") include(":instrumentation:vertx:vertx-rx-java-3.5:javaagent") include(":instrumentation:vertx:vertx-web-3.0:javaagent") include(":instrumentation:vertx:vertx-web-3.0:testing") From 51de434b13c834ec5ffc0080c6d8f3b67dc3efc8 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 4 May 2022 14:18:08 +0200 Subject: [PATCH 2/4] add muzzle --- .../vertx-kafka-client-3.5/javaagent/build.gradle.kts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts index 935f3ad924f4..98e03af6b8c0 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts @@ -2,6 +2,16 @@ plugins { id("otel.javaagent-instrumentation") } +muzzle { + pass { + group.set("io.vertx") + module.set("vertx-kafka-client") + versions.set("[3.5.0,)") + assertInverse.set(true) + extraDependency("org.apache.kafka:kafka-clients:0.11.0.0") + } +} + dependencies { bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap")) implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) From fab623bd12fdc09eb28f18931723db967f9fc14b Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 4 May 2022 18:10:58 -0700 Subject: [PATCH 3/4] Update baseline to 3.6 --- .../javaagent/build.gradle.kts | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts index 98e03af6b8c0..cd9eb861751e 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts +++ b/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts @@ -6,9 +6,8 @@ muzzle { pass { group.set("io.vertx") module.set("vertx-kafka-client") - versions.set("[3.5.0,)") + versions.set("[3.5.1,)") assertInverse.set(true) - extraDependency("org.apache.kafka:kafka-clients:0.11.0.0") } } @@ -16,14 +15,11 @@ dependencies { bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap")) implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) - library("io.vertx:vertx-kafka-client:3.5.0") + library("io.vertx:vertx-kafka-client:3.6.0") // vertx-codegen and vertx-docgen dependencies are needed for Xlint's annotation checking - library("io.vertx:vertx-codegen:3.0.0") - testLibrary("io.vertx:vertx-docgen:3.0.0") - - // vertx-kafka-client 3.5 uses kafka-clients 0.10.2.1 by default, need to bump it to make instrumentation work - testImplementation("org.apache.kafka:kafka-clients:0.11.0.0") + library("io.vertx:vertx-codegen:3.6.0") + testLibrary("io.vertx:vertx-docgen:3.6.0") testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.5:testing")) @@ -34,9 +30,9 @@ testing { suites { val testNoReceiveTelemetry by registering(JvmTestSuite::class) { dependencies { - implementation("io.vertx:vertx-kafka-client:3.5.0") - implementation("io.vertx:vertx-codegen:3.0.0") - implementation("io.vertx:vertx-docgen:3.0.0") + implementation("io.vertx:vertx-kafka-client:3.6.0") + implementation("io.vertx:vertx-codegen:3.6.0") + implementation("io.vertx:vertx-docgen:3.6.0") implementation("org.apache.kafka:kafka-clients:0.11.0.0") implementation(project(":instrumentation:vertx:vertx-kafka-client-3.5:testing")) } From c61144f4169712f0c8b99272da65af355c0db162 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 5 May 2022 12:08:17 +0200 Subject: [PATCH 4/4] Set baseline to 3.6 everywhere --- .../vertx-kafka-client-3.5/testing/build.gradle.kts | 13 ------------- .../javaagent/build.gradle.kts | 10 +++------- .../v3_6}/InstrumentedSingleRecordHandler.java | 4 ++-- .../KafkaConsumerRecordsImplInstrumentation.java | 2 +- .../v3_6}/KafkaReadStreamImplInstrumentation.java | 2 +- .../v3_6}/VertxKafkaInstrumentationModule.java | 2 +- .../vertx/kafka/v3_6}/VertxKafkaSingletons.java | 2 +- .../kafka/v3_6}/SingleRecordVertxKafkaTest.java | 2 +- ...oReceiveTelemetrySingleRecordVertxKafkaTest.java | 2 +- .../vertx-kafka-client-3.6/testing/build.gradle.kts | 12 ++++++++++++ .../vertx/kafka/v3_6}/AbstractVertxKafkaTest.java | 2 +- settings.gradle.kts | 4 ++-- 12 files changed, 26 insertions(+), 31 deletions(-) delete mode 100644 instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts rename instrumentation/vertx/{vertx-kafka-client-3.5 => vertx-kafka-client-3.6}/javaagent/build.gradle.kts (84%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/InstrumentedSingleRecordHandler.java (97%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/KafkaConsumerRecordsImplInstrumentation.java (99%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/KafkaReadStreamImplInstrumentation.java (99%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/VertxKafkaInstrumentationModule.java (99%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/VertxKafkaSingletons.java (99%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/SingleRecordVertxKafkaTest.java (99%) rename instrumentation/vertx/{vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/NoReceiveTelemetrySingleRecordVertxKafkaTest.java (99%) create mode 100644 instrumentation/vertx/vertx-kafka-client-3.6/testing/build.gradle.kts rename instrumentation/vertx/{vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5 => vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6}/AbstractVertxKafkaTest.java (99%) diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts deleted file mode 100644 index 6d4d78dbc7c8..000000000000 --- a/instrumentation/vertx/vertx-kafka-client-3.5/testing/build.gradle.kts +++ /dev/null @@ -1,13 +0,0 @@ -plugins { - id("otel.java-conventions") -} - -dependencies { - implementation(project(":testing-common")) - implementation("org.testcontainers:kafka") - - compileOnly("io.vertx:vertx-kafka-client:3.5.0") - // vertx-codegen and vertx-docgen dependencies are needed for Xlint's annotation checking - compileOnly("io.vertx:vertx-codegen:3.0.0") - compileOnly("io.vertx:vertx-docgen:3.0.0") -} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts similarity index 84% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts index cd9eb861751e..83ffceaf3cda 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/build.gradle.kts +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/build.gradle.kts @@ -16,12 +16,10 @@ dependencies { implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) library("io.vertx:vertx-kafka-client:3.6.0") - - // vertx-codegen and vertx-docgen dependencies are needed for Xlint's annotation checking + // vertx-codegen is needed for Xlint's annotation checking library("io.vertx:vertx-codegen:3.6.0") - testLibrary("io.vertx:vertx-docgen:3.6.0") - testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.5:testing")) + testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) } @@ -32,9 +30,7 @@ testing { dependencies { implementation("io.vertx:vertx-kafka-client:3.6.0") implementation("io.vertx:vertx-codegen:3.6.0") - implementation("io.vertx:vertx-docgen:3.6.0") - implementation("org.apache.kafka:kafka-clients:0.11.0.0") - implementation(project(":instrumentation:vertx:vertx-kafka-client-3.5:testing")) + implementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing")) } targets { diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java similarity index 97% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java index d9598e91064a..f404be015940 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/InstrumentedSingleRecordHandler.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java @@ -3,9 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; -import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5.VertxKafkaSingletons.processInstrumenter; +import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6.VertxKafkaSingletons.processInstrumenter; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaConsumerRecordsImplInstrumentation.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaConsumerRecordsImplInstrumentation.java index 4738355aac83..d9fa5b116612 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaConsumerRecordsImplInstrumentation.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaConsumerRecordsImplInstrumentation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java index b2ac08418824..ba53efb2b888 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/KafkaReadStreamImplInstrumentation.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.named; diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaInstrumentationModule.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaInstrumentationModule.java index c9b7ab61f305..d3dbf9d69ee9 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaInstrumentationModule.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaInstrumentationModule.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import static java.util.Arrays.asList; diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java index 35c889caf751..e2135bb8caf5 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/VertxKafkaSingletons.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java index 5a9038cf38e1..caac2a862bfb 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/SingleRecordVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import static io.opentelemetry.api.common.AttributeKey.longKey; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java rename to instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java index 039d88793e08..f37c964d1b74 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/NoReceiveTelemetrySingleRecordVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/build.gradle.kts b/instrumentation/vertx/vertx-kafka-client-3.6/testing/build.gradle.kts new file mode 100644 index 000000000000..edb888c66ae6 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + implementation(project(":testing-common")) + implementation("org.testcontainers:kafka") + + compileOnly("io.vertx:vertx-kafka-client:3.6.0") + // vertx-codegen is needed for Xlint's annotation checking + compileOnly("io.vertx:vertx-codegen:3.6.0") +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java similarity index 99% rename from instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java rename to instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java index efa8f9dfda35..d96a0921a671 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.5/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_5/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_5; +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; diff --git a/settings.gradle.kts b/settings.gradle.kts index f238be475e1a..5fd5cd507950 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -429,8 +429,8 @@ include(":instrumentation:vaadin-14.2:testing") include(":instrumentation:vertx:vertx-http-client:vertx-http-client-3.0:javaagent") include(":instrumentation:vertx:vertx-http-client:vertx-http-client-4.0:javaagent") include(":instrumentation:vertx:vertx-http-client:vertx-http-client-common:javaagent") -include(":instrumentation:vertx:vertx-kafka-client-3.5:javaagent") -include(":instrumentation:vertx:vertx-kafka-client-3.5:testing") +include(":instrumentation:vertx:vertx-kafka-client-3.6:javaagent") +include(":instrumentation:vertx:vertx-kafka-client-3.6:testing") include(":instrumentation:vertx:vertx-rx-java-3.5:javaagent") include(":instrumentation:vertx:vertx-web-3.0:javaagent") include(":instrumentation:vertx:vertx-web-3.0:testing")