Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spring Kafka 3 #7271

Merged
merged 4 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ muzzle {
pass {
group.set("org.springframework.kafka")
module.set("spring-kafka")
versions.set("[2.7.0,3)")
versions.set("[2.7.0,)")
assertInverse.set(true)
}
}
Expand All @@ -27,21 +27,26 @@ dependencies {

testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3")
testLibrary("org.springframework.boot:spring-boot-starter:2.5.3")

latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.+")
// TODO: temp change, will be reverted in #7271
latestDepTestLibrary("org.springframework.boot:spring-boot-starter-test:2.+")
latestDepTestLibrary("org.springframework.boot:spring-boot-starter:2.+")
}

val latestDepTest = findProperty("testLatestDeps") as Boolean

testing {
suites {
val testNoReceiveTelemetry by registering(JvmTestSuite::class) {
dependencies {
implementation("org.springframework.kafka:spring-kafka:2.7.0")
implementation(project(":instrumentation:spring:spring-kafka-2.7:testing"))
implementation("org.springframework.boot:spring-boot-starter-test:2.5.3")
implementation("org.springframework.boot:spring-boot-starter:2.5.3")

// the "library" configuration is not recognized by the test suite plugin
if (latestDepTest) {
implementation("org.springframework.kafka:spring-kafka:+")
implementation("org.springframework.boot:spring-boot-starter-test:+")
implementation("org.springframework.boot:spring-boot-starter:+")
} else {
implementation("org.springframework.kafka:spring-kafka:2.7.0")
implementation("org.springframework.boot:spring-boot-starter-test:2.5.3")
implementation("org.springframework.boot:spring-boot-starter:2.5.3")
}
}

targets {
Expand Down Expand Up @@ -71,18 +76,28 @@ tasks {
}
}

configurations {
listOf(
testRuntimeClasspath,
named("testNoReceiveTelemetryRuntimeClasspath")
)
.forEach {
it.configure {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
// spring 6 (which spring-kafka 3.+ uses) requires java 17
if (latestDepTest) {
otelJava {
minJavaVersionSupported.set(JavaVersion.VERSION_17)
}
}

// spring 6 uses slf4j 2.0
if (!latestDepTest) {
configurations {
listOf(
testRuntimeClasspath,
named("testNoReceiveTelemetryRuntimeClasspath")
)
.forEach {
it.configure {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void shouldCreateSpansForSingleRecordProcess() {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "testSpan");
send("testSingleTopic", "10", "testSpan");
return 0;
});
});
Expand Down Expand Up @@ -113,7 +113,7 @@ void shouldHandleFailureInKafkaListener() {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "error");
send("testSingleTopic", "10", "error");
return 0;
});
});
Expand Down Expand Up @@ -240,7 +240,7 @@ void shouldHandleFailureInKafkaBatchListener() {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testBatchTopic", "10", "error");
send("testBatchTopic", "10", "error");
return 0;
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ dependencies {

testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3")
testLibrary("org.springframework.boot:spring-boot-starter:2.5.3")
}

val latestDepTest = findProperty("testLatestDeps") as Boolean

latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.+")
// TODO: temp change, will be reverted in #7271
latestDepTestLibrary("org.springframework.boot:spring-boot-starter-test:2.+")
latestDepTestLibrary("org.springframework.boot:spring-boot-starter:2.+")
// spring 6 (which spring-kafka 3.+ uses) requires java 17
if (latestDepTest) {
otelJava {
minJavaVersionSupported.set(JavaVersion.VERSION_17)
}
}

configurations.testRuntimeClasspath {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
// spring 6 uses slf4j 2.0
if (!latestDepTest) {
configurations.testRuntimeClasspath {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -20,6 +23,22 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
VirtualField.find(ConsumerRecord.class, Context.class);
private static final VirtualField<ConsumerRecord<?, ?>, State<ConsumerRecord<?, ?>>> stateField =
VirtualField.find(ConsumerRecord.class, State.class);
private static final MethodHandle interceptRecord;

static {
MethodHandle interceptRecordHandle;
try {
interceptRecordHandle =
MethodHandles.lookup()
.findVirtual(
RecordInterceptor.class,
"intercept",
MethodType.methodType(ConsumerRecord.class, ConsumerRecord.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
interceptRecordHandle = null;
}
interceptRecord = interceptRecordHandle;
}

private final Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter;
@Nullable private final RecordInterceptor<K, V> decorated;
Expand All @@ -31,11 +50,25 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
this.decorated = decorated;
}

@SuppressWarnings("deprecation") // implementing deprecated method for better compatibility
@SuppressWarnings({
"deprecation",
"unchecked"
}) // implementing deprecated method (removed in 3.0) for better compatibility
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
if (interceptRecord == null) {
return null;
}
start(record);
return decorated == null ? record : decorated.intercept(record);
if (decorated == null) {
return null;
}
try {
return (ConsumerRecord<K, V>) interceptRecord.invoke(decorated, record);
} catch (Throwable e) {
rethrow(e);
return null; // unreachable
}
}

@Override
Expand All @@ -44,6 +77,11 @@ public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V
return decorated == null ? record : decorated.intercept(record, consumer);
}

@SuppressWarnings("unchecked")
private static <E extends Throwable> void rethrow(Throwable e) throws E {
throw (E) e;
}

private void start(ConsumerRecord<K, V> record) {
Context parentContext = getParentContext(record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void shouldCreateSpansForSingleRecordProcess() {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "testSpan");
send("testSingleTopic", "10", "testSpan");
return 0;
});
});
Expand Down Expand Up @@ -75,7 +75,7 @@ void shouldHandleFailureInKafkaListener() {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "error");
send("testSingleTopic", "10", "error");
return 0;
});
});
Expand Down Expand Up @@ -177,7 +177,7 @@ void shouldHandleFailureInKafkaBatchListener() {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testBatchTopic", "10", "error");
send("testBatchTopic", "10", "error");
return 0;
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -23,7 +27,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -82,6 +88,49 @@ void tearDownApp() {
}
}

static final MethodHandle send;

static {
MethodHandle sendMethod = null;
Exception failure = null;
try {
sendMethod =
MethodHandles.lookup()
.findVirtual(
KafkaOperations.class,
"send",
MethodType.methodType(
ListenableFuture.class, String.class, Object.class, Object.class));
} catch (NoSuchMethodException e) {
// spring-kafka 3.0 changed the return type
try {
sendMethod =
MethodHandles.lookup()
.findVirtual(
KafkaOperations.class,
"send",
MethodType.methodType(
CompletableFuture.class, String.class, Object.class, Object.class));
} catch (NoSuchMethodException | IllegalAccessException ex) {
failure = ex;
}
} catch (IllegalAccessException e) {
failure = e;
}
if (sendMethod == null) {
throw new AssertionError("Could not find the KafkaOperations#send() method", failure);
}
send = sendMethod;
}

protected void send(String topic, String key, String data) {
try {
send.invoke(kafkaTemplate, topic, key, data);
} catch (Throwable e) {
throw new AssertionError(e);
}
}

protected void sendBatchMessages(Map<String, String> keyToData) throws InterruptedException {
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
Expand All @@ -97,7 +146,7 @@ protected void sendBatchMessages(Map<String, String> keyToData) throws Interrupt
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
keyToData.forEach((key, data) -> ops.send("testBatchTopic", key, data));
keyToData.forEach((key, data) -> send("testBatchTopic", key, data));
return 0;
});
});
Expand Down