11
11
import io .opentelemetry .api .trace .StatusCode ;
12
12
import io .opentelemetry .context .Scope ;
13
13
import java .util .Map ;
14
+ import java .util .concurrent .atomic .AtomicBoolean ;
14
15
import java .util .function .Function ;
15
16
import org .reactivestreams .Publisher ;
16
17
import org .reactivestreams .Subscription ;
@@ -79,17 +80,18 @@ static void finishSpanIfPresent(io.opentelemetry.context.Context context, Throwa
79
80
80
81
private static void finishSpanIfPresentInAttributes (
81
82
Map <String , Object > attributes , Throwable throwable ) {
82
-
83
83
io .opentelemetry .context .Context context =
84
84
(io .opentelemetry .context .Context ) attributes .remove (CONTEXT_ATTRIBUTE );
85
85
finishSpanIfPresent (context , throwable );
86
86
}
87
87
88
- public static class SpanFinishingSubscriber <T > implements CoreSubscriber <T > {
88
+ public static class SpanFinishingSubscriber <T > implements CoreSubscriber <T >, Subscription {
89
89
90
90
private final CoreSubscriber <? super T > subscriber ;
91
91
private final io .opentelemetry .context .Context otelContext ;
92
92
private final Context context ;
93
+ private final AtomicBoolean completed = new AtomicBoolean ();
94
+ private Subscription subscription ;
93
95
94
96
public SpanFinishingSubscriber (
95
97
CoreSubscriber <? super T > subscriber , io .opentelemetry .context .Context otelContext ) {
@@ -99,9 +101,10 @@ public SpanFinishingSubscriber(
99
101
}
100
102
101
103
@ Override
102
- public void onSubscribe (Subscription s ) {
104
+ public void onSubscribe (Subscription subscription ) {
105
+ this .subscription = subscription ;
103
106
try (Scope scope = otelContext .makeCurrent ()) {
104
- subscriber .onSubscribe (s );
107
+ subscriber .onSubscribe (this );
105
108
}
106
109
}
107
110
@@ -114,19 +117,36 @@ public void onNext(T t) {
114
117
115
118
@ Override
116
119
public void onError (Throwable t ) {
117
- finishSpanIfPresent (otelContext , t );
120
+ if (completed .compareAndSet (false , true )) {
121
+ finishSpanIfPresent (otelContext , t );
122
+ }
118
123
subscriber .onError (t );
119
124
}
120
125
121
126
@ Override
122
127
public void onComplete () {
123
- finishSpanIfPresent (otelContext , null );
128
+ if (completed .compareAndSet (false , true )) {
129
+ finishSpanIfPresent (otelContext , null );
130
+ }
124
131
subscriber .onComplete ();
125
132
}
126
133
127
134
@ Override
128
135
public Context currentContext () {
129
136
return context ;
130
137
}
138
+
139
+ @ Override
140
+ public void request (long n ) {
141
+ subscription .request (n );
142
+ }
143
+
144
+ @ Override
145
+ public void cancel () {
146
+ if (completed .compareAndSet (false , true )) {
147
+ finishSpanIfPresent (otelContext , null );
148
+ }
149
+ subscription .cancel ();
150
+ }
131
151
}
132
152
}
0 commit comments