Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand Down Expand Up @@ -85,43 +87,37 @@ private Object proceedUnderReactorSpan(MethodInvocation invocation, NewSpan newS
Publisher<?> publisher = (Publisher) invocation.proceed();

if (publisher instanceof Mono) {
return new MonoSpan((Mono<Object>) publisher,
this,
newSpan,
span,
invocation,
return new MonoSpan((Mono<Object>) publisher, this, newSpan, span, invocation,
log);
}
else if (publisher instanceof Flux) {
return new FluxSpan((Flux<Object>) publisher,
this,
newSpan,
span,
invocation,
return new FluxSpan((Flux<Object>) publisher, this, newSpan, span, invocation,
log);
}
else {
throw new IllegalArgumentException("Unexpected type of publisher: " + publisher.getClass());
throw new IllegalArgumentException(
"Unexpected type of publisher: " + publisher.getClass());
}
}

private static final class FluxSpan extends Flux<Object> implements Scannable {
private static final class FluxSpan extends FluxOperator<Object, Object> {

final Span span;

final MethodInvocation invocation;

final String log;

final boolean hasLog;

final Flux<Object> source;
final Span span;
final MethodInvocation invocation;
final String log;
final boolean hasLog;
final ReactorSleuthMethodInvocationProcessor processor;
final NewSpan newSpan;

FluxSpan(Flux<Object> source,
ReactorSleuthMethodInvocationProcessor processor,
NewSpan newSpan,
@Nullable Span span,
MethodInvocation invocation,
final NewSpan newSpan;

FluxSpan(Flux<Object> source, ReactorSleuthMethodInvocationProcessor processor,
NewSpan newSpan, @Nullable Span span, MethodInvocation invocation,
String log) {
this.source = source;
super(source);
this.span = span;
this.newSpan = newSpan;
this.invocation = invocation;
Expand All @@ -143,44 +139,31 @@ public void subscribe(CoreSubscriber<? super Object> actual) {
span = this.span;
}
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
this.source.subscribe(new SpanSubscriber(actual,
this.processor,
this.invocation,
this.span == null,
span,
this.log,
this.hasLog));
this.source.subscribe(new SpanSubscriber(actual, this.processor,
this.invocation, this.span == null, span, this.log, this.hasLog));
}
}

@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) {
return this.source;
}
else {
return null;
}
}
}

private static final class MonoSpan extends Mono<Object> implements Scannable {
private static final class MonoSpan extends MonoOperator<Object, Object> {

final Span span;

final MethodInvocation invocation;

final String log;

final boolean hasLog;

final Mono<Object> source;
final Span span;
final MethodInvocation invocation;
final String log;
final boolean hasLog;
final ReactorSleuthMethodInvocationProcessor processor;
final NewSpan newSpan;

MonoSpan(Mono<Object> source,
ReactorSleuthMethodInvocationProcessor processor,
NewSpan newSpan,
@Nullable Span span,
MethodInvocation invocation,
final NewSpan newSpan;

MonoSpan(Mono<Object> source, ReactorSleuthMethodInvocationProcessor processor,
NewSpan newSpan, @Nullable Span span, MethodInvocation invocation,
String log) {
this.source = source;
super(source);
this.processor = processor;
this.newSpan = newSpan;
this.span = span;
Expand All @@ -202,48 +185,37 @@ public void subscribe(CoreSubscriber<? super Object> actual) {
span = this.span;
}
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
this.source.subscribe(new SpanSubscriber(actual,
this.processor,
this.invocation,
this.span == null,
span,
this.log,
this.hasLog));
this.source.subscribe(new SpanSubscriber(actual, this.processor,
this.invocation, this.span == null, span, this.log, this.hasLog));
}
}

@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) {
return this.source;
}
else {
return null;
}
}
}

private static final class SpanSubscriber implements CoreSubscriber<Object>,
Subscription,
Scannable {
private static final class SpanSubscriber
implements CoreSubscriber<Object>, Subscription, Scannable {

final CoreSubscriber<? super Object> actual;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit this doublespacing is stress testing my thumbs ability to swipe up!


final boolean isNewSpan;

final Span span;

final String log;

final boolean hasLog;

final CurrentTraceContext currentTraceContext;

final CoreSubscriber<? super Object> actual;
final boolean isNewSpan;
final Span span;
final String log;
final boolean hasLog;
final CurrentTraceContext currentTraceContext;
final ReactorSleuthMethodInvocationProcessor processor;
final Context context;

final Context context;

Subscription parent;

SpanSubscriber(CoreSubscriber<? super Object> actual,
ReactorSleuthMethodInvocationProcessor processor,
MethodInvocation invocation,
boolean isNewSpan,
Span span,
String log,
MethodInvocation invocation, boolean isNewSpan, Span span, String log,
boolean hasLog) {
this.actual = actual;
this.isNewSpan = isNewSpan;
Expand Down Expand Up @@ -332,6 +304,7 @@ public Object scanUnsafe(Attr key) {
}
return null;
}

}

private boolean isReactorReturnType(Class<?> returnType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,12 @@ private ReactorSleuth() {
log.trace("Scope passing operator [" + beanFactory + "]");
}

//Adapt if lazy bean factory
BooleanSupplier isActive =
beanFactory instanceof ConfigurableApplicationContext ?
((ConfigurableApplicationContext) beanFactory)::isActive :
() -> true;
// Adapt if lazy bean factory
BooleanSupplier isActive = beanFactory instanceof ConfigurableApplicationContext
? ((ConfigurableApplicationContext) beanFactory)::isActive : () -> true;

return Operators.liftPublisher((p, sub) -> {
//if Flux/Mono #just, #empty, #error
// if Flux/Mono #just, #empty, #error
if (p instanceof Fuseable.ScalarCallable) {
return sub;
}
Expand All @@ -81,40 +79,43 @@ private ReactorSleuth() {
if (log.isTraceEnabled()) {
log.trace("Spring Context [" + beanFactory
+ "] already refreshed. Creating a scope "
+ "passing span subscriber with Reactor Context "
+ "[" + sub.currentContext() + "] and name ["
+ scannable.name() + "]");
+ "passing span subscriber with Reactor Context " + "["
+ sub.currentContext() + "] and name [" + scannable.name()
+ "]");
}

return scopePassingSpanSubscription(beanFactory.getBean(Tracing.class), sub);
return scopePassingSpanSubscription(beanFactory.getBean(Tracing.class),
sub);
}
if (log.isTraceEnabled()) {
log.trace("Spring Context [" + beanFactory
+ "] is not yet refreshed, falling back to lazy span subscriber. Reactor Context is ["
+ sub.currentContext() + "] and name is ["
+ scannable.name() + "]");
+ sub.currentContext() + "] and name is [" + scannable.name()
+ "]");
}
return new LazySpanSubscriber<>(lazyScopePassingSpanSubscription(beanFactory, scannable, sub));
return new LazySpanSubscriber<>(
lazyScopePassingSpanSubscription(beanFactory, scannable, sub));
});
}

static <T> SpanSubscriptionProvider<T> lazyScopePassingSpanSubscription(
BeanFactory beanFactory, Scannable scannable, CoreSubscriber<? super T> sub) {
return new SpanSubscriptionProvider<>(beanFactory, sub, sub.currentContext(), scannable.name());
return new SpanSubscriptionProvider<>(beanFactory, sub, sub.currentContext(),
scannable.name());
}


static <T> CoreSubscriber<? super T> scopePassingSpanSubscription(
Tracing tracing, CoreSubscriber<? super T> sub) {
static <T> CoreSubscriber<? super T> scopePassingSpanSubscription(Tracing tracing,
CoreSubscriber<? super T> sub) {

Context context = sub.currentContext();

Span root = context.hasKey(Span.class) ? context.get(Span.class) : tracing.tracer().currentSpan();
Span root = context.hasKey(Span.class) ? context.get(Span.class)
: tracing.tracer().currentSpan();
if (root != null) {
return new ScopePassingSpanSubscriber<>(sub, context, tracing, root);
}
else {
return sub; //no need to trace
return sub; // no need to trace
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ public Context currentContext() {
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) {
return this.s;
} else {
}
else {
return key == Attr.ACTUAL ? this.subscriber : null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public SpanSubscription<T> get() {
}

SpanSubscription<T> newCoreSubscriber(Tracing tracing) {
Span root = context.hasKey(Span.class) ? context.get(Span.class) : tracing.tracer().currentSpan();
return new ScopePassingSpanSubscriber<>(this.subscriber, this.context, tracing, root);
Span root = context.hasKey(Span.class) ? context.get(Span.class)
: tracing.tracer().currentSpan();
return new ScopePassingSpanSubscriber<>(this.subscriber, this.context, tracing,
root);
}

private Tracing tracing() {
Expand Down
Loading