Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use valid Java identifiers for message keys #3863

Merged
merged 3 commits into from
Aug 19, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.caching.Cache;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;
import java.util.Map;
Expand All @@ -22,12 +23,16 @@
import org.springframework.util.LinkedMultiValueMap;

final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
private static final String CONTEXT_AND_SCOPE_KEY = ContextAndScope.class.getName();
private static final String SCOPE_KEY = TracingChannelInterceptor.class.getName() + ".scope";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand dot is invalid, underscore would be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll try that. Any idea what a non-serializable message header even means tho?

Copy link
Contributor

@laurit laurit Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good question. In our test with rabbitmq https://github.com/spring-projects/spring-amqp/blob/bd088731b08ceb7ca90543e18b19cbf70a21c6c8/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java#L184 converts it to string by calling toString. For actual jms I guess the relevant part is https://github.com/spring-projects/spring-integration/blob/590fb01743d537f3436bccf3a8c445c51b7640d1/spring-integration-jms/src/main/java/org/springframework/integration/jms/DefaultJmsHeaderMapper.java#L133 which seems to ignore properties of unknown type. Though this doesn't explain how there is an exception with IBM MQ, there must be some other place where property is set on jms message. Even if we correct the property name it is likely to still fail as https://docs.oracle.com/javaee/5/api/javax/jms/Message.html#setObjectProperty(java.lang.String,%20java.lang.Object) does not accept arbitrary objects (assuming that this is the method that caused original exception).


private final ContextPropagators propagators;
private final Instrumenter<MessageWithChannel, Void> instrumenter;

// TODO (trask): optimize for javaagent by using field-backed context store
private final Cache<MessageChannel, ContextAndScope> sendContextAndScopeHolder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't MessageChannel a shared object that is used to send many possibly concurrent messages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no 🙁

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the underscore idea doesn't work out, here's another take on this problem that I considered when developing this instrumentation: 7c8d614

Cache.newBuilder().setWeakKeys().build();
private final Cache<MessageChannel, Scope> handleScopeHolder =
Cache.newBuilder().setWeakKeys().build();

TracingChannelInterceptor(
ContextPropagators propagators, Instrumenter<MessageWithChannel, Void> instrumenter) {
this.propagators = propagators;
Expand All @@ -40,7 +45,6 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);

final Context context;
MessageHeaderAccessor messageHeaderAccessor = createMutableHeaderAccessor(message);

// only start a new CONSUMER span when there is no span in the context: this situation happens
// when there's no other messaging instrumentation that can do this - this way
Expand All @@ -54,16 +58,17 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
// instrumentation should not create another one
if (shouldStart(parentContext, messageWithChannel)) {
context = instrumenter.start(parentContext, messageWithChannel);
messageHeaderAccessor.setHeader(
CONTEXT_AND_SCOPE_KEY, ContextAndScope.create(context, context.makeCurrent()));
sendContextAndScopeHolder.put(
messageChannel, ContextAndScope.create(context, context.makeCurrent()));
} else {
// in case there already was another span in the context: back off and just inject the current
// context into the message
context = parentContext;
messageHeaderAccessor.setHeader(
CONTEXT_AND_SCOPE_KEY, ContextAndScope.create(null, context.makeCurrent()));
sendContextAndScopeHolder.put(
messageChannel, ContextAndScope.create(null, context.makeCurrent()));
}

MessageHeaderAccessor messageHeaderAccessor = createMutableHeaderAccessor(message);
propagators
.getTextMapPropagator()
.inject(context, messageHeaderAccessor, MessageHeadersSetter.INSTANCE);
Expand All @@ -81,11 +86,10 @@ public void postSend(Message<?> message, MessageChannel messageChannel, boolean
@Override
public void afterSendCompletion(
Message<?> message, MessageChannel messageChannel, boolean sent, Exception e) {
Object contextAndScope = message.getHeaders().get(CONTEXT_AND_SCOPE_KEY);
if (contextAndScope instanceof ContextAndScope) {
ContextAndScope cas = (ContextAndScope) contextAndScope;
cas.close();
Context context = cas.getContext();
ContextAndScope contextAndScope = sendContextAndScopeHolder.get(messageChannel);
if (contextAndScope != null) {
contextAndScope.close();
Context context = contextAndScope.getContext();

if (context != null) {
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);
Expand Down Expand Up @@ -117,16 +121,17 @@ public Message<?> beforeHandle(
.getTextMapPropagator()
.extract(Context.current(), messageWithChannel, MessageHeadersGetter.INSTANCE);
MessageHeaderAccessor messageHeaderAccessor = MessageHeaderAccessor.getMutableAccessor(message);
messageHeaderAccessor.setHeader(SCOPE_KEY, context.makeCurrent());
return createMessageWithHeaders(message, messageHeaderAccessor);
Message<?> messageWithHeaders = createMessageWithHeaders(message, messageHeaderAccessor);
handleScopeHolder.put(channel, context.makeCurrent());
return messageWithHeaders;
}

@Override
public void afterMessageHandled(
Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
Object scope = message.getHeaders().get(SCOPE_KEY);
if (scope instanceof Scope) {
((Scope) scope).close();
Scope scope = handleScopeHolder.get(channel);
if (scope != null) {
scope.close();
}
}

Expand Down