diff --git a/instrumentation/spring/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/server/AdviceUtils.java b/instrumentation/spring/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/server/AdviceUtils.java index 8fa7e05e8685..2b24a2da4161 100644 --- a/instrumentation/spring/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/server/AdviceUtils.java +++ b/instrumentation/spring/spring-webflux-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/server/AdviceUtils.java @@ -11,6 +11,7 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -79,17 +80,18 @@ static void finishSpanIfPresent(io.opentelemetry.context.Context context, Throwa private static void finishSpanIfPresentInAttributes( Map attributes, Throwable throwable) { - io.opentelemetry.context.Context context = (io.opentelemetry.context.Context) attributes.remove(CONTEXT_ATTRIBUTE); finishSpanIfPresent(context, throwable); } - public static class SpanFinishingSubscriber implements CoreSubscriber { + public static class SpanFinishingSubscriber implements CoreSubscriber, Subscription { private final CoreSubscriber subscriber; private final io.opentelemetry.context.Context otelContext; private final Context context; + private final AtomicBoolean completed = new AtomicBoolean(); + private Subscription subscription; public SpanFinishingSubscriber( CoreSubscriber subscriber, io.opentelemetry.context.Context otelContext) { @@ -99,9 +101,10 @@ public SpanFinishingSubscriber( } @Override - public void onSubscribe(Subscription s) { + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; try (Scope scope = otelContext.makeCurrent()) { - subscriber.onSubscribe(s); + subscriber.onSubscribe(this); } } @@ -114,13 +117,17 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - finishSpanIfPresent(otelContext, t); + if (completed.compareAndSet(false, true)) { + finishSpanIfPresent(otelContext, t); + } subscriber.onError(t); } @Override public void onComplete() { - finishSpanIfPresent(otelContext, null); + if (completed.compareAndSet(false, true)) { + finishSpanIfPresent(otelContext, null); + } subscriber.onComplete(); } @@ -128,5 +135,18 @@ public void onComplete() { public Context currentContext() { return context; } + + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + if (completed.compareAndSet(false, true)) { + finishSpanIfPresent(otelContext, null); + } + subscription.cancel(); + } } }