From f0dfc644388c353cd0ddf641210c2e7414ea042b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Mon, 3 Feb 2020 16:09:17 +0100 Subject: [PATCH] [polish] Improve onDiscardMultiple/QueueWithClear resiliency (#2021) This commit improves discard resiliency when dealing with queues, streams, collections and iterators. By introducing finer grained try/catch blocks, we ensure that failures around a single discarded element doesn't prevent discarding of further elements of the container. --- .../reactor/core/publisher/Operators.java | 54 ++++- .../reactor/core/publisher/OperatorsTest.java | 217 ++++++++++++++++++ 2 files changed, 261 insertions(+), 10 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 018a4fa8e7..d963e272ed 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -435,16 +435,33 @@ public static void onDiscardQueueWithClear( } if (extract != null) { - extract.apply(toDiscard) - .forEach(hook); + try { + extract.apply(toDiscard) + .forEach(elementToDiscard -> { + try { + hook.accept(elementToDiscard); + } + catch (Throwable t) { + log.warn("Error while discarding item extracted from a queue element, continuing with next item", t); + } + }); + } + catch (Throwable t) { + log.warn("Error while extracting items to discard from queue element, continuing with next queue element", t); + } } else { - hook.accept(toDiscard); + try { + hook.accept(toDiscard); + } + catch (Throwable t) { + log.warn("Error while discarding a queue element, continuing with next queue element", t); + } } } } catch (Throwable t) { - log.warn("Error in discard hook while discarding and clearing a queue", t); + log.warn("Cannot further apply discard hook while discarding and clearing a queue", t); } } @@ -465,10 +482,17 @@ public static void onDiscardMultiple(Stream multiple, Context context) { if (hook != null) { try { multiple.filter(Objects::nonNull) - .forEach(hook); + .forEach(v -> { + try { + hook.accept(v); + } + catch (Throwable t) { + log.warn("Error while discarding a stream element, continuing with next element", t); + } + }); } catch (Throwable t) { - log.warn("Error in discard hook while discarding multiple values", t); + log.warn("Error while discarding stream, stopping", t); } } } @@ -494,12 +518,17 @@ public static void onDiscardMultiple(@Nullable Collection multiple, Context c } for (Object o : multiple) { if (o != null) { - hook.accept(o); + try { + hook.accept(o); + } + catch (Throwable t) { + log.warn("Error while discarding element from a Collection, continuing with next element", t); + } } } } catch (Throwable t) { - log.warn("Error in discard hook while discarding multiple values", t); + log.warn("Error while discarding collection, stopping", t); } } } @@ -526,12 +555,17 @@ public static void onDiscardMultiple(@Nullable Iterator multiple, boolean kno try { multiple.forEachRemaining(o -> { if (o != null) { - hook.accept(o); + try { + hook.accept(o); + } + catch (Throwable t) { + log.warn("Error while discarding element from an Iterator, continuing with next element", t); + } } }); } catch (Throwable t) { - log.warn("Error in discard hook while discarding iterator remainder", t); + log.warn("Error while discarding Iterator, stopping", t); } } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/OperatorsTest.java b/reactor-core/src/test/java/reactor/core/publisher/OperatorsTest.java index 37eee44d1a..41a87fc690 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/OperatorsTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/OperatorsTest.java @@ -17,15 +17,22 @@ package reactor.core.publisher; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; import javax.annotation.Nullable; @@ -730,4 +737,214 @@ public void convertConditionalToConditionalShouldReturnTheSameInstance() { Assertions.assertThat(Operators.toConditionalSubscriber(original)) .isEqualTo(original); } + + @Test + public void discardQueueWithClearContinuesOnExtractionError() { + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Queue> q = new ArrayBlockingQueue<>(5); + q.add(Collections.singletonList(1)); + q.add(Collections.singletonList(2)); + q.add(Arrays.asList(3, 30)); + q.add(Collections.singletonList(4)); + q.add(Collections.singletonList(5)); + + Operators.onDiscardQueueWithClear(q, hookContext, o -> { + List l = o; + if (l.size() == 2) throw new IllegalStateException("boom in extraction"); + return l.stream(); + }); + + assertThat(discardedCount).hasValue(4); + } + + @Test + public void discardQueueWithClearContinuesOnExtractedElementNotDiscarded() { + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Queue> q = new ArrayBlockingQueue<>(5); + q.add(Collections.singletonList(1)); + q.add(Collections.singletonList(2)); + q.add(Collections.singletonList(3)); + q.add(Collections.singletonList(4)); + q.add(Collections.singletonList(5)); + + Operators.onDiscardQueueWithClear(q, hookContext, Collection::stream); + + assertThat(discardedCount).as("discarded 1 2 4 5").hasValue(4); + } + + @Test + public void discardQueueWithClearContinuesOnRawQueueElementNotDiscarded() { + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Queue q = new ArrayBlockingQueue<>(5); + q.add(1); + q.add(2); + q.add(3); + q.add(4); + q.add(5); + + Operators.onDiscardQueueWithClear(q, hookContext, null); + + assertThat(discardedCount).as("discarded 1 2 4 5").hasValue(4); + } + + @Test + public void discardQueueWithClearStopsOnQueuePollingError() { + AtomicInteger discardedCount = new AtomicInteger(); + @SuppressWarnings("unchecked") Queue q = Mockito.mock(Queue.class); + Mockito.when(q.poll()) + .thenReturn(1, 2) + .thenThrow(new IllegalStateException("poll boom")) + .thenReturn(4, 5) + .thenReturn(null); + + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> discardedCount.incrementAndGet()).apply(Context.empty()); + + Operators.onDiscardQueueWithClear(q, hookContext, null); + + assertThat(discardedCount).as("discarding stops").hasValue(2); + } + + @Test + public void discardStreamContinuesWhenElementFailsToBeDiscarded() { + Stream stream = Stream.of(1, 2, 3, 4, 5); + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Operators.onDiscardMultiple(stream, hookContext); + + assertThat(discardedCount).hasValue(4); + } + + @Test + public void discardStreamStopsOnIterationError() { + Stream stream = Stream.of(1, 2, 3, 4, 5); + //noinspection ResultOfMethodCallIgnored + stream.count(); //consumes the Stream on purpose + + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> discardedCount.incrementAndGet()).apply(Context.empty()); + + Operators.onDiscardMultiple(stream, hookContext); + + assertThat(discardedCount).hasValue(0); + } + + @Test + public void discardCollectionContinuesWhenIteratorElementFailsToBeDiscarded() { + List elements = Arrays.asList(1, 2, 3, 4, 5); + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Operators.onDiscardMultiple(elements, hookContext); + + assertThat(discardedCount).hasValue(4); + } + + @Test + public void discardCollectionStopsOnIterationError() { + List elements = Arrays.asList(1, 2, 3, 4, 5); + Iterator trueIterator = elements.iterator(); + Iterator failingIterator = new Iterator() { + @Override + public boolean hasNext() { + return trueIterator.hasNext(); + } + + @Override + public Integer next() { + Integer n = trueIterator.next(); + if (n >= 3) throw new IllegalStateException("Iterator boom"); + return n; + } + }; + List mock = Mockito.mock(List.class); + Mockito.when(mock.iterator()).thenReturn(failingIterator); + + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Operators.onDiscardMultiple(mock, hookContext); + + assertThat(discardedCount).hasValue(2); + } + + @Test + public void discardCollectionStopsOnIsEmptyError() { + @SuppressWarnings("unchecked") List mock = Mockito.mock(List.class); + Mockito.when(mock.isEmpty()).thenThrow(new IllegalStateException("isEmpty boom")); + + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> discardedCount.incrementAndGet()) + .apply(Context.empty()); + + Operators.onDiscardMultiple(mock, hookContext); + + assertThat(discardedCount).hasValue(0); + } + + @Test + public void discardIteratorContinuesWhenIteratorElementFailsToBeDiscarded() { + List elements = Arrays.asList(1, 2, 3, 4, 5); + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Operators.onDiscardMultiple(elements.iterator(), true, hookContext); + + assertThat(discardedCount).hasValue(4); + } + + @Test + public void discardIteratorStopsOnIterationError() { + List elements = Arrays.asList(1, 2, 3, 4, 5); + Iterator trueIterator = elements.iterator(); + Iterator failingIterator = new Iterator() { + @Override + public boolean hasNext() { + return trueIterator.hasNext(); + } + + @Override + public Integer next() { + Integer n = trueIterator.next(); + if (n >= 3) throw new IllegalStateException("Iterator boom"); + return n; + } + }; + AtomicInteger discardedCount = new AtomicInteger(); + Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> { + if (i == 3) throw new IllegalStateException("boom"); + discardedCount.incrementAndGet(); + }).apply(Context.empty()); + + Operators.onDiscardMultiple(failingIterator, true, hookContext); + + assertThat(discardedCount).hasValue(2); + } }