@@ -81,8 +81,9 @@ private void onSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber, Cont
81
81
{
82
82
// As per rule 2.13, we MUST consider subscription cancelled and
83
83
// MUST raise this error condition in a fashion that is adequate for the runtime environment.
84
- subscription .cancel (new Suppressed (err ));
85
- LOG .error ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
84
+ subscription .cancel (new SuppressedException (err ));
85
+ if (LOG .isTraceEnabled ())
86
+ LOG .trace ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
86
87
}
87
88
}
88
89
@@ -106,7 +107,8 @@ private void onMultiSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
106
107
{
107
108
// As per rule 2.13, we MUST consider subscription cancelled and
108
109
// MUST raise this error condition in a fashion that is adequate for the runtime environment.
109
- LOG .error ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
110
+ if (LOG .isTraceEnabled ())
111
+ LOG .trace ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
110
112
}
111
113
}
112
114
@@ -168,12 +170,13 @@ protected Action process()
168
170
{
169
171
if (cancelled == COMPLETED )
170
172
this .subscriber .onComplete ();
171
- else if (!(cancelled instanceof Suppressed ))
173
+ else if (!(cancelled instanceof SuppressedException ))
172
174
this .subscriber .onError (cancelled );
173
175
}
174
176
catch (Throwable err )
175
177
{
176
- LOG .error ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
178
+ if (LOG .isTraceEnabled ())
179
+ LOG .trace ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
177
180
}
178
181
this .subscriber = null ;
179
182
return Action .SUCCEEDED ;
@@ -183,8 +186,8 @@ else if (!(cancelled instanceof Suppressed))
183
186
184
187
if (chunk == null )
185
188
{
186
- content .demand (this ::iterate );
187
- return Action .IDLE ;
189
+ content .demand (this ::succeeded );
190
+ return Action .SCHEDULED ;
188
191
}
189
192
190
193
if (Content .Chunk .isFailure (chunk ))
@@ -201,8 +204,9 @@ else if (!(cancelled instanceof Suppressed))
201
204
catch (Throwable err )
202
205
{
203
206
chunk .release ();
204
- cancel (new Suppressed (err ));
205
- LOG .error ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
207
+ cancel (new SuppressedException (err ));
208
+ if (LOG .isTraceEnabled ())
209
+ LOG .trace ("Flow.Subscriber " + subscriber + " violated rule 2.13" , err );
206
210
return Action .IDLE ;
207
211
}
208
212
chunk .release ();
@@ -243,7 +247,7 @@ public void request(long n)
243
247
@ Override
244
248
public void cancel ()
245
249
{
246
- cancel (new Suppressed ("Subscription was cancelled manually" ));
250
+ cancel (new SuppressedException ("Subscription was cancelled manually" ));
247
251
}
248
252
249
253
public void cancel (Throwable cause )
@@ -252,9 +256,8 @@ public void cancel(Throwable cause)
252
256
//
253
257
// As per rule 3.5, this handles cancellation requests, and is idempotent, thread-safe and not
254
258
// synchronously performing heavy computations
255
- if (!cancelled .compareAndSet (null , cause ))
256
- return ;
257
- this .iterate ();
259
+ if (cancelled .compareAndSet (null , cause ))
260
+ this .iterate ();
258
261
}
259
262
260
263
// Publisher notes
@@ -280,7 +283,7 @@ public void cancel(Throwable cause)
280
283
281
284
// Subscription notes
282
285
//
283
- // Subscription.cancel -> cancel(new Suppressed ("Subscription was cancelled manually"))
286
+ // Subscription.cancel -> cancel(new SuppressedException ("Subscription was cancelled manually"))
284
287
// It's not clearly specified in the specification, but according to:
285
288
// - the issue: https://github.com/reactive-streams/reactive-streams-jvm/issues/458
286
289
// - TCK test 'untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals'
@@ -291,14 +294,14 @@ public void cancel(Throwable cause)
291
294
// java.lang.IllegalArgumentException if the argument is <= 0.
292
295
}
293
296
294
- private static class Suppressed extends Throwable
297
+ private static class SuppressedException extends Exception
295
298
{
296
- Suppressed (String message )
299
+ SuppressedException (String message )
297
300
{
298
301
super (message );
299
302
}
300
303
301
- Suppressed (Throwable cause )
304
+ SuppressedException (Throwable cause )
302
305
{
303
306
super (cause .getMessage (), cause );
304
307
}
0 commit comments