|
15 | 15 |
|
16 | 16 | import java.util.Objects;
|
17 | 17 | import java.util.concurrent.CancellationException;
|
18 |
| -import java.util.concurrent.CompletionException; |
19 | 18 | import java.util.concurrent.Flow;
|
20 | 19 | import java.util.concurrent.atomic.AtomicLong;
|
21 | 20 | import java.util.concurrent.atomic.AtomicReference;
|
@@ -92,11 +91,9 @@ private void onMultiSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
|
92 | 91 | // As per rule 1.9, we need to throw a `java.lang.NullPointerException`
|
93 | 92 | // if the `Subscriber` is `null`
|
94 | 93 | if (subscriber == null)
|
95 |
| - { |
96 | 94 | throw new NullPointerException("Flow.Subscriber must not be null");
|
97 |
| - } |
98 |
| - LastWillSubscription subscription = new ExhaustedSubscription(); |
99 | 95 |
|
| 96 | + LastWillSubscription subscription = new ExhaustedSubscription(); |
100 | 97 | // As per 1.9, this method must return normally (i.e. not throw).
|
101 | 98 | try
|
102 | 99 | {
|
@@ -196,7 +193,7 @@ public void cancel(LastWill lastWill)
|
196 | 193 | private static final class ActiveSubscription extends IteratingCallback implements LastWillSubscription
|
197 | 194 | {
|
198 | 195 | private static final long NO_MORE_DEMAND = -1;
|
199 |
| - private static final Throwable COMPLETED = new StaticException("Source.Content read fully"); |
| 196 | + private static final LastWill COMPLETED = new LastWill(new StaticException("Source.Content read fully"), FinalSignal.COMPLETE); |
200 | 197 | private final AtomicReference<LastWill> cancelled;
|
201 | 198 | private final AtomicLong demand;
|
202 | 199 | private Content.Source content;
|
@@ -230,6 +227,7 @@ protected Action process()
|
230 | 227 | // drop any references to the corresponding subscriber.
|
231 | 228 | this.demand.set(NO_MORE_DEMAND);
|
232 | 229 | // TODO: HttpChannelState does not satisfy the contract of Content.Source "If read() has returned a last chunk, this is a no operation."
|
| 230 | + // https://github.com/jetty/jetty.project/issues/11879 |
233 | 231 | if (finalSignal != FinalSignal.COMPLETE)
|
234 | 232 | this.content.fail(reason);
|
235 | 233 | this.content = null;
|
@@ -270,14 +268,16 @@ protected Action process()
|
270 | 268 | }
|
271 | 269 | catch (Throwable err)
|
272 | 270 | {
|
| 271 | + chunk.release(); |
273 | 272 | cancel(err, FinalSignal.SUPPRESS);
|
274 | 273 | LOG.error("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
|
| 274 | + return Action.IDLE; |
275 | 275 | }
|
276 | 276 | chunk.release();
|
277 | 277 |
|
278 | 278 | if (chunk.isLast())
|
279 | 279 | {
|
280 |
| - cancel(COMPLETED, FinalSignal.COMPLETE); |
| 280 | + cancel(COMPLETED); |
281 | 281 | return Action.IDLE;
|
282 | 282 | }
|
283 | 283 |
|
|
0 commit comments