Skip to content

Commit

Permalink
[polish] Improve onDiscardMultiple/QueueWithClear resiliency (#2021)
Browse files Browse the repository at this point in the history
This commit improves discard resiliency when dealing with queues,
streams, collections and iterators. By introducing finer grained
try/catch blocks, we ensure that failures around a single discarded
element doesn't prevent discarding of further elements of the container.
  • Loading branch information
simonbasle committed Feb 7, 2020
1 parent db7d3cb commit f0dfc64
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 10 deletions.
54 changes: 44 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,16 +435,33 @@ public static <T> void onDiscardQueueWithClear(
}

if (extract != null) {
extract.apply(toDiscard)
.forEach(hook);
try {
extract.apply(toDiscard)
.forEach(elementToDiscard -> {
try {
hook.accept(elementToDiscard);
}
catch (Throwable t) {
log.warn("Error while discarding item extracted from a queue element, continuing with next item", t);
}
});
}
catch (Throwable t) {
log.warn("Error while extracting items to discard from queue element, continuing with next queue element", t);
}
}
else {
hook.accept(toDiscard);
try {
hook.accept(toDiscard);
}
catch (Throwable t) {
log.warn("Error while discarding a queue element, continuing with next queue element", t);
}
}
}
}
catch (Throwable t) {
log.warn("Error in discard hook while discarding and clearing a queue", t);
log.warn("Cannot further apply discard hook while discarding and clearing a queue", t);
}
}

Expand All @@ -465,10 +482,17 @@ public static void onDiscardMultiple(Stream<?> multiple, Context context) {
if (hook != null) {
try {
multiple.filter(Objects::nonNull)
.forEach(hook);
.forEach(v -> {
try {
hook.accept(v);
}
catch (Throwable t) {
log.warn("Error while discarding a stream element, continuing with next element", t);
}
});
}
catch (Throwable t) {
log.warn("Error in discard hook while discarding multiple values", t);
log.warn("Error while discarding stream, stopping", t);
}
}
}
Expand All @@ -494,12 +518,17 @@ public static void onDiscardMultiple(@Nullable Collection<?> multiple, Context c
}
for (Object o : multiple) {
if (o != null) {
hook.accept(o);
try {
hook.accept(o);
}
catch (Throwable t) {
log.warn("Error while discarding element from a Collection, continuing with next element", t);
}
}
}
}
catch (Throwable t) {
log.warn("Error in discard hook while discarding multiple values", t);
log.warn("Error while discarding collection, stopping", t);
}
}
}
Expand All @@ -526,12 +555,17 @@ public static void onDiscardMultiple(@Nullable Iterator<?> multiple, boolean kno
try {
multiple.forEachRemaining(o -> {
if (o != null) {
hook.accept(o);
try {
hook.accept(o);
}
catch (Throwable t) {
log.warn("Error while discarding element from an Iterator, continuing with next element", t);
}
}
});
}
catch (Throwable t) {
log.warn("Error in discard hook while discarding iterator remainder", t);
log.warn("Error while discarding Iterator, stopping", t);
}
}
}
Expand Down
217 changes: 217 additions & 0 deletions reactor-core/src/test/java/reactor/core/publisher/OperatorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@
package reactor.core.publisher;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -730,4 +737,214 @@ public void convertConditionalToConditionalShouldReturnTheSameInstance() {
Assertions.assertThat(Operators.toConditionalSubscriber(original))
.isEqualTo(original);
}

@Test
public void discardQueueWithClearContinuesOnExtractionError() {
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Queue<List<Integer>> q = new ArrayBlockingQueue<>(5);
q.add(Collections.singletonList(1));
q.add(Collections.singletonList(2));
q.add(Arrays.asList(3, 30));
q.add(Collections.singletonList(4));
q.add(Collections.singletonList(5));

Operators.onDiscardQueueWithClear(q, hookContext, o -> {
List<Integer> l = o;
if (l.size() == 2) throw new IllegalStateException("boom in extraction");
return l.stream();
});

assertThat(discardedCount).hasValue(4);
}

@Test
public void discardQueueWithClearContinuesOnExtractedElementNotDiscarded() {
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Queue<List<Integer>> q = new ArrayBlockingQueue<>(5);
q.add(Collections.singletonList(1));
q.add(Collections.singletonList(2));
q.add(Collections.singletonList(3));
q.add(Collections.singletonList(4));
q.add(Collections.singletonList(5));

Operators.onDiscardQueueWithClear(q, hookContext, Collection::stream);

assertThat(discardedCount).as("discarded 1 2 4 5").hasValue(4);
}

@Test
public void discardQueueWithClearContinuesOnRawQueueElementNotDiscarded() {
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Queue<Integer> q = new ArrayBlockingQueue<>(5);
q.add(1);
q.add(2);
q.add(3);
q.add(4);
q.add(5);

Operators.onDiscardQueueWithClear(q, hookContext, null);

assertThat(discardedCount).as("discarded 1 2 4 5").hasValue(4);
}

@Test
public void discardQueueWithClearStopsOnQueuePollingError() {
AtomicInteger discardedCount = new AtomicInteger();
@SuppressWarnings("unchecked") Queue<Integer> q = Mockito.mock(Queue.class);
Mockito.when(q.poll())
.thenReturn(1, 2)
.thenThrow(new IllegalStateException("poll boom"))
.thenReturn(4, 5)
.thenReturn(null);

Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> discardedCount.incrementAndGet()).apply(Context.empty());

Operators.onDiscardQueueWithClear(q, hookContext, null);

assertThat(discardedCount).as("discarding stops").hasValue(2);
}

@Test
public void discardStreamContinuesWhenElementFailsToBeDiscarded() {
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5);
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Operators.onDiscardMultiple(stream, hookContext);

assertThat(discardedCount).hasValue(4);
}

@Test
public void discardStreamStopsOnIterationError() {
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5);
//noinspection ResultOfMethodCallIgnored
stream.count(); //consumes the Stream on purpose

AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> discardedCount.incrementAndGet()).apply(Context.empty());

Operators.onDiscardMultiple(stream, hookContext);

assertThat(discardedCount).hasValue(0);
}

@Test
public void discardCollectionContinuesWhenIteratorElementFailsToBeDiscarded() {
List<Integer> elements = Arrays.asList(1, 2, 3, 4, 5);
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Operators.onDiscardMultiple(elements, hookContext);

assertThat(discardedCount).hasValue(4);
}

@Test
public void discardCollectionStopsOnIterationError() {
List<Integer> elements = Arrays.asList(1, 2, 3, 4, 5);
Iterator<Integer> trueIterator = elements.iterator();
Iterator<Integer> failingIterator = new Iterator<Integer>() {
@Override
public boolean hasNext() {
return trueIterator.hasNext();
}

@Override
public Integer next() {
Integer n = trueIterator.next();
if (n >= 3) throw new IllegalStateException("Iterator boom");
return n;
}
};
List<Integer> mock = Mockito.mock(List.class);
Mockito.when(mock.iterator()).thenReturn(failingIterator);

AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Operators.onDiscardMultiple(mock, hookContext);

assertThat(discardedCount).hasValue(2);
}

@Test
public void discardCollectionStopsOnIsEmptyError() {
@SuppressWarnings("unchecked") List<Integer> mock = Mockito.mock(List.class);
Mockito.when(mock.isEmpty()).thenThrow(new IllegalStateException("isEmpty boom"));

AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> discardedCount.incrementAndGet())
.apply(Context.empty());

Operators.onDiscardMultiple(mock, hookContext);

assertThat(discardedCount).hasValue(0);
}

@Test
public void discardIteratorContinuesWhenIteratorElementFailsToBeDiscarded() {
List<Integer> elements = Arrays.asList(1, 2, 3, 4, 5);
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Operators.onDiscardMultiple(elements.iterator(), true, hookContext);

assertThat(discardedCount).hasValue(4);
}

@Test
public void discardIteratorStopsOnIterationError() {
List<Integer> elements = Arrays.asList(1, 2, 3, 4, 5);
Iterator<Integer> trueIterator = elements.iterator();
Iterator<Integer> failingIterator = new Iterator<Integer>() {
@Override
public boolean hasNext() {
return trueIterator.hasNext();
}

@Override
public Integer next() {
Integer n = trueIterator.next();
if (n >= 3) throw new IllegalStateException("Iterator boom");
return n;
}
};
AtomicInteger discardedCount = new AtomicInteger();
Context hookContext = Operators.discardLocalAdapter(Integer.class, i -> {
if (i == 3) throw new IllegalStateException("boom");
discardedCount.incrementAndGet();
}).apply(Context.empty());

Operators.onDiscardMultiple(failingIterator, true, hookContext);

assertThat(discardedCount).hasValue(2);
}
}

0 comments on commit f0dfc64

Please sign in to comment.