Skip to content

Commit f43ede9

Browse files
authored
Merge branch 'master' into sarahchen6/upgrade-bytebuddy
2 parents 89a5674 + b3d2c4a commit f43ede9

File tree

26 files changed

+937
-6
lines changed

26 files changed

+937
-6
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ out/
4646
######################
4747
.vscode
4848

49+
# Cursor #
50+
##########
51+
.cursor
52+
4953
# Others #
5054
##########
5155
/logs/*

dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
1919
@SuppressWarnings('UnusedPrivateField')
2020
private final Set<DataStreamsTags> backlogs = []
2121

22+
@SuppressWarnings('UnusedPrivateField')
23+
private final List<StatsBucket.SchemaKey> schemaRegistryUsages = []
24+
2225
private final Set<String> serviceNameOverrides = []
2326

2427
@Override
@@ -33,6 +36,11 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
3336
this.@backlogs.add(backlog.getKey())
3437
}
3538
}
39+
if (bucket.schemaRegistryUsages != null) {
40+
for (Map.Entry<StatsBucket.SchemaKey, Long> usage : bucket.schemaRegistryUsages) {
41+
this.@schemaRegistryUsages.add(usage.getKey())
42+
}
43+
}
3644
}
3745
}
3846

@@ -52,10 +60,15 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
5260
Collections.unmodifiableList(new ArrayList<>(this.@backlogs))
5361
}
5462

63+
synchronized List<StatsBucket.SchemaKey> getSchemaRegistryUsages() {
64+
Collections.unmodifiableList(new ArrayList<>(this.@schemaRegistryUsages))
65+
}
66+
5567
synchronized void clear() {
5668
this.@payloads.clear()
5769
this.@groups.clear()
5870
this.@backlogs.clear()
71+
this.@schemaRegistryUsages.clear()
5972
}
6073

6174
void waitForPayloads(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) {
@@ -70,6 +83,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
7083
waitFor(count, timeout, this.@backlogs)
7184
}
7285

86+
void waitForSchemaRegistryUsages(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) {
87+
waitFor(count, timeout, this.@schemaRegistryUsages)
88+
}
89+
7390
private static void waitFor(int count, long timeout, Collection collection) {
7491
long deadline = System.currentTimeMillis() + timeout
7592
while (System.currentTimeMillis() < deadline) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apply from: "$rootDir/gradle/java.gradle"
2+
3+
muzzle {
4+
pass {
5+
group = "io.confluent"
6+
module = "kafka-schema-registry-client"
7+
versions = "[7.0.0,)"
8+
assertInverse = true
9+
}
10+
}
11+
12+
dependencies {
13+
compileOnly project(':dd-java-agent:instrumentation:kafka:kafka-common')
14+
compileOnly group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.0.0'
15+
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.0.0'
16+
compileOnly group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.0.0'
17+
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0'
18+
19+
testImplementation project(':dd-java-agent:instrumentation:kafka:kafka-common')
20+
testImplementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.5.2'
21+
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.5.2'
22+
testImplementation group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.5.1'
23+
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.5.0'
24+
testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.0'
25+
}
26+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package datadog.trace.instrumentation.confluentschemaregistry;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.Instrumenter;
11+
import datadog.trace.agent.tooling.Instrumenter.MethodTransformer;
12+
import datadog.trace.agent.tooling.InstrumenterModule;
13+
import datadog.trace.bootstrap.InstrumentationContext;
14+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
15+
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import net.bytebuddy.asm.Advice;
19+
import org.apache.kafka.common.serialization.Deserializer;
20+
21+
/**
22+
* Instruments Confluent Schema Registry deserializers (Avro, Protobuf, and JSON) to capture
23+
* deserialization operations.
24+
*/
25+
@AutoService(InstrumenterModule.class)
26+
public class KafkaDeserializerInstrumentation extends InstrumenterModule.Tracing
27+
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
28+
29+
public KafkaDeserializerInstrumentation() {
30+
super("confluent-schema-registry", "kafka");
31+
}
32+
33+
@Override
34+
public String[] knownMatchingTypes() {
35+
return new String[] {
36+
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
37+
"io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer",
38+
"io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"
39+
};
40+
}
41+
42+
@Override
43+
public String[] helperClassNames() {
44+
return new String[] {
45+
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
46+
packageName + ".SchemaIdExtractor"
47+
};
48+
}
49+
50+
@Override
51+
public Map<String, String> contextStore() {
52+
Map<String, String> contextStores = new HashMap<>();
53+
contextStores.put("org.apache.kafka.common.serialization.Deserializer", "java.lang.Boolean");
54+
return contextStores;
55+
}
56+
57+
@Override
58+
public void methodAdvice(MethodTransformer transformer) {
59+
// Instrument configure to capture isKey value
60+
transformer.applyAdvice(
61+
isMethod()
62+
.and(named("configure"))
63+
.and(isPublic())
64+
.and(takesArguments(2))
65+
.and(takesArgument(1, boolean.class)),
66+
getClass().getName() + "$ConfigureAdvice");
67+
68+
// Instrument deserialize(String topic, Headers headers, byte[] data)
69+
// The 2-arg version calls this one, so we only need to instrument this to avoid duplicates
70+
transformer.applyAdvice(
71+
isMethod()
72+
.and(named("deserialize"))
73+
.and(isPublic())
74+
.and(takesArguments(3))
75+
.and(takesArgument(0, String.class))
76+
.and(takesArgument(2, byte[].class)),
77+
getClass().getName() + "$DeserializeAdvice");
78+
}
79+
80+
public static class ConfigureAdvice {
81+
@Advice.OnMethodExit(suppress = Throwable.class)
82+
public static void onExit(
83+
@Advice.This Deserializer deserializer, @Advice.Argument(1) boolean isKey) {
84+
// Store the isKey value in InstrumentationContext for later use
85+
InstrumentationContext.get(Deserializer.class, Boolean.class).put(deserializer, isKey);
86+
}
87+
}
88+
89+
public static class DeserializeAdvice {
90+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
91+
public static void onExit(
92+
@Advice.This Deserializer deserializer,
93+
@Advice.Argument(0) String topic,
94+
@Advice.Argument(2) byte[] data,
95+
@Advice.Return Object result,
96+
@Advice.Thrown Throwable throwable) {
97+
98+
// Get isKey from InstrumentationContext (stored during configure)
99+
Boolean isKeyObj =
100+
InstrumentationContext.get(Deserializer.class, Boolean.class).get(deserializer);
101+
boolean isKey = isKeyObj != null && isKeyObj;
102+
103+
// Get cluster ID from thread-local (set by Kafka consumer instrumentation)
104+
String clusterId = ClusterIdHolder.get();
105+
106+
boolean isSuccess = throwable == null;
107+
int schemaId = isSuccess ? SchemaIdExtractor.extractSchemaId(data) : -1;
108+
109+
// Record the schema registry usage
110+
AgentTracer.get()
111+
.getDataStreamsMonitoring()
112+
.reportSchemaRegistryUsage(topic, clusterId, schemaId, isSuccess, isKey, "deserialize");
113+
}
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package datadog.trace.instrumentation.confluentschemaregistry;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
6+
import static net.bytebuddy.matcher.ElementMatchers.returns;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
9+
10+
import com.google.auto.service.AutoService;
11+
import datadog.trace.agent.tooling.Instrumenter;
12+
import datadog.trace.agent.tooling.Instrumenter.MethodTransformer;
13+
import datadog.trace.agent.tooling.InstrumenterModule;
14+
import datadog.trace.bootstrap.InstrumentationContext;
15+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
16+
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import net.bytebuddy.asm.Advice;
20+
import org.apache.kafka.common.serialization.Serializer;
21+
22+
/**
23+
* Instruments Confluent Schema Registry serializers (Avro, Protobuf, and JSON) to capture
24+
* serialization operations.
25+
*/
26+
@AutoService(InstrumenterModule.class)
27+
public class KafkaSerializerInstrumentation extends InstrumenterModule.Tracing
28+
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
29+
30+
public KafkaSerializerInstrumentation() {
31+
super("confluent-schema-registry", "kafka");
32+
}
33+
34+
@Override
35+
public String[] knownMatchingTypes() {
36+
return new String[] {
37+
"io.confluent.kafka.serializers.KafkaAvroSerializer",
38+
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer",
39+
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"
40+
};
41+
}
42+
43+
@Override
44+
public String[] helperClassNames() {
45+
return new String[] {
46+
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
47+
packageName + ".SchemaIdExtractor"
48+
};
49+
}
50+
51+
@Override
52+
public Map<String, String> contextStore() {
53+
Map<String, String> contextStores = new HashMap<>();
54+
contextStores.put("org.apache.kafka.common.serialization.Serializer", "java.lang.Boolean");
55+
return contextStores;
56+
}
57+
58+
@Override
59+
public void methodAdvice(MethodTransformer transformer) {
60+
// Instrument configure to capture isKey value
61+
transformer.applyAdvice(
62+
isMethod()
63+
.and(named("configure"))
64+
.and(isPublic())
65+
.and(takesArguments(2))
66+
.and(takesArgument(1, boolean.class)),
67+
getClass().getName() + "$ConfigureAdvice");
68+
69+
// Instrument both serialize(String topic, Object data)
70+
// and serialize(String topic, Headers headers, Object data) for Kafka 2.1+
71+
transformer.applyAdvice(
72+
isMethod()
73+
.and(named("serialize"))
74+
.and(isPublic())
75+
.and(takesArgument(0, String.class))
76+
.and(returns(byte[].class)),
77+
getClass().getName() + "$SerializeAdvice");
78+
}
79+
80+
public static class ConfigureAdvice {
81+
@Advice.OnMethodExit(suppress = Throwable.class)
82+
public static void onExit(
83+
@Advice.This Serializer serializer, @Advice.Argument(1) boolean isKey) {
84+
// Store the isKey value in InstrumentationContext for later use
85+
InstrumentationContext.get(Serializer.class, Boolean.class).put(serializer, isKey);
86+
}
87+
}
88+
89+
public static class SerializeAdvice {
90+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
91+
public static void onExit(
92+
@Advice.This Serializer serializer,
93+
@Advice.Argument(0) String topic,
94+
@Advice.Return byte[] result,
95+
@Advice.Thrown Throwable throwable) {
96+
97+
// Get isKey from InstrumentationContext (stored during configure)
98+
Boolean isKeyObj =
99+
InstrumentationContext.get(Serializer.class, Boolean.class).get(serializer);
100+
boolean isKey = isKeyObj != null && isKeyObj;
101+
102+
// Get cluster ID from thread-local (set by Kafka producer instrumentation)
103+
String clusterId = ClusterIdHolder.get();
104+
105+
boolean isSuccess = throwable == null;
106+
int schemaId = isSuccess ? SchemaIdExtractor.extractSchemaId(result) : -1;
107+
108+
// Record the schema registry usage
109+
AgentTracer.get()
110+
.getDataStreamsMonitoring()
111+
.reportSchemaRegistryUsage(topic, clusterId, schemaId, isSuccess, isKey, "serialize");
112+
}
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package datadog.trace.instrumentation.confluentschemaregistry;
2+
3+
/**
4+
* Helper class to extract schema ID from Confluent Schema Registry wire format. Wire format:
5+
* [magic_byte][4-byte schema id][data]
6+
*/
7+
public class SchemaIdExtractor {
8+
public static int extractSchemaId(byte[] data) {
9+
if (data == null || data.length < 5 || data[0] != 0) {
10+
return -1;
11+
}
12+
13+
try {
14+
// Confluent wire format: [magic_byte][4-byte schema id][data]
15+
return ((data[1] & 0xFF) << 24)
16+
| ((data[2] & 0xFF) << 16)
17+
| ((data[3] & 0xFF) << 8)
18+
| (data[4] & 0xFF);
19+
} catch (Throwable ignored) {
20+
return -1;
21+
}
22+
}
23+
}

0 commit comments

Comments
 (0)