Skip to content

Commit

Permalink
Add streaming tests for immediate processing start (#871)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Apr 29, 2024
1 parent f341cc5 commit eb933ab
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTest
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldRespectParallelism(collector, name),
shouldPushElementsToStreamAsSoonAsPossible(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldHandleRejectedExecutionException(collector, name),
Expand Down Expand Up @@ -293,6 +294,25 @@ private static <R extends Collection<Integer>> DynamicTest shouldRespectParallel
});
}

private static <R extends Collection<Integer>> DynamicTest shouldPushElementsToStreamAsSoonAsPossible(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> {
int parallelism = 2;
int delayMillis = 50;
var counter = new AtomicInteger();
withExecutor(e -> {
LocalTime before = LocalTime.now();
Stream.generate(() -> 42)
.limit(100)
.collect(collector.apply(i -> returnWithDelay(i, ofMillis(delayMillis)), e, parallelism))
.join();

LocalTime after = LocalTime.now();
assertThat(Duration.between(before, after))
.isGreaterThanOrEqualTo(ofMillis(delayMillis * parallelism));
});
});
}

private static <R extends Collection<Integer>> DynamicTest shouldProcessOnNThreadsETParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should batch", name), () -> {
int parallelism = 2;
Expand Down
74 changes: 74 additions & 0 deletions src/test/java/com/pivovarit/collectors/ToStreamTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.pivovarit.collectors;

import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.ParallelCollectors.parallelToStream;
import static java.time.Duration.ofSeconds;
import static java.util.stream.Stream.of;
import static org.awaitility.Awaitility.await;

class ToStreamTest {

@TestFactory
Stream<DynamicTest> shouldStartProcessingElementsTests() {
return of(
shouldStartProcessingElements(f -> ParallelCollectors.parallelToStream(f, Executors.newCachedThreadPool(), 2), "parallelToStream, parallelism: 2, os threads"),
shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newVirtualThreadPerTaskExecutor(), 2), "parallelToStream, parallelism: 2, vthreads"),
shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newCachedThreadPool(), 2), "parallelToOrderedStream, parallelism: 2, os threads"),
shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newVirtualThreadPerTaskExecutor(), 2), "parallelToOrderedStream, parallelism: 2, vthreads")
);
}

private static DynamicTest shouldStartProcessingElements(Function<Function<Integer, Integer>, Collector<Integer, ?, Stream<Integer>>> collector, String name) {
return DynamicTest.dynamicTest(name, () -> {
var counter = new AtomicInteger();
Thread.ofPlatform().start(() -> {
Stream.iterate(0, i -> i + 1)
.limit(100)
.collect(collector.apply(i -> {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return i;
}))
.forEach(c -> counter.incrementAndGet());
});
await()
.atMost(ofSeconds(1))
.until(() -> counter.get() > 0);
});
}

@Test
void shouldStartProcessingElementsAsSoonAsTheyAreReady() {
var e = Executors.newCachedThreadPool();
var counter = new AtomicInteger();
Thread.ofPlatform().start(() -> {
Collector<Integer, ?, Stream<Integer>> collector = parallelToStream(i -> {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return i;
}, e, 2);
Stream.iterate(0, i -> i + 1)
.limit(100)
.collect(collector)
.forEach(c -> counter.incrementAndGet());
});
await()
.atMost(ofSeconds(1))
.until(() -> counter.get() > 0);
}
}

0 comments on commit eb933ab

Please sign in to comment.