-
Notifications
You must be signed in to change notification settings - Fork 867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support kafka streams 3 #4236
Support kafka streams 3 #4236
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap( | ||
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable SpanContext receiveSpanContext) { | ||
if (KafkaTracingWrapperUtil.wrappingEnabled()) { | ||
return new TracingIterator<>(delegateIterator, receiveSpanContext); | ||
} | ||
return delegateIterator; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a similar use case that @mateuszrzeszutek recently ran into recently:
Lines 74 to 78 in be090e4
// this will forcefully suppress the kafka-clients CONSUMER instrumentation even though | |
// there's no current CONSUMER span | |
if (records instanceof KafkaConsumerIterableWrapper) { | |
records = ((KafkaConsumerIterableWrapper<?, ?>) records).unwrap(); | |
} |
would it work to use the same "workaround" for both? (either thread local suppression, or force unwrapping for both)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can use thread local for both, I'll just have to move the thread local to a different method that exists in all kafka-streams versions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait until #4065 is merged though (I think it should be this week)
|
||
package io.opentelemetry.javaagent.instrumentation.kafka; | ||
|
||
public final class KafkaTracingWrapperUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could rename it to sth like KafkaClientsConsumerProcessTracing
- since it only affects the CONSUMER operation = "process"
part of kafka-clients instrumentation
@laurit fyi (so we don't duplicate) I'm resolving merge conflicts |
@trask too late. I think that it might be better to put |
No worries, feel free to force push over my changes |
|
||
package io.opentelemetry.javaagent.bootstrap.kafka; | ||
|
||
public final class KafkaClientsConsumerProcessTracing { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a brief comment about why this lives in the bootstrap class loader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a comment, could you check whether it makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx 👍
|
||
package io.opentelemetry.javaagent.bootstrap.kafka; | ||
|
||
public interface KafkaClientsConsumerProcessWrapper<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
* @param compileString An extra dependency in the gradle canonical form: | ||
* '<group_id>:<artifact_id>:<version_id>'. | ||
* @param dependencyNotation An extra dependency in the gradle canonical form: | ||
* '<group_id>:<artifact_id>:<version_id>' or a project dependency project(':some-project'). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…to ensure that everybody uses the same copy of them
Resolves #4174
Apparently tests didn't pass with kafka-streams 1.x perhaps we should also test some intermediate versions?