Skip to content

Commit

Permalink
Rewrite streaming tests (#946)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent a520c44 commit b0ca86e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 37 deletions.
38 changes: 1 addition & 37 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import java.time.Duration;
import java.time.LocalTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -24,7 +22,6 @@
import java.util.stream.Stream;

import static com.pivovarit.collectors.ParallelCollectors.parallel;
import static com.pivovarit.collectors.ParallelCollectors.parallelToOrderedStream;
import static com.pivovarit.collectors.ParallelCollectors.parallelToStream;
import static com.pivovarit.collectors.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.TestUtils.withExecutor;
Expand Down Expand Up @@ -59,15 +56,6 @@ Stream<DynamicTest> collectors() {
).flatMap(i -> i);
}

@TestFactory
Stream<DynamicTest> streaming_collectors() {
return of(
// platform threads
streamingTests((m, e, p) -> adaptAsync(parallelToStream(m, e, p)), format("ParallelCollectors.parallelToStream(p=%d)", PARALLELISM)),
streamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m, e, p)), format("ParallelCollectors.parallelToOrderedStream(p=%d)", PARALLELISM))
).flatMap(i -> i);
}

@TestFactory
Stream<DynamicTest> streaming_batching_collectors() {
return of(
Expand Down Expand Up @@ -133,36 +121,12 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {
}
}

private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(shouldPushElementsToStreamAsSoonAsPossible(collector, name));
}

private static <R extends Collection<Integer>> Stream<DynamicTest> batchTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(shouldProcessOnNThreadsETParallelism(collector, name));
}

private static <R extends Collection<Integer>> Stream<DynamicTest> batchStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return Stream.concat(
streamingTests(collector, name),
of(shouldProcessOnNThreadsETParallelism(collector, name)));
}

private static <R extends Collection<Integer>> DynamicTest shouldPushElementsToStreamAsSoonAsPossible(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> {
int parallelism = 2;
int delayMillis = 50;
withExecutor(e -> {
LocalTime before = LocalTime.now();
Stream.generate(() -> 42)
.limit(100)
.collect(collector.apply(i -> returnWithDelay(i, ofMillis(delayMillis)), e, parallelism))
.join();

LocalTime after = LocalTime.now();
assertThat(Duration.between(before, after))
.isGreaterThanOrEqualTo(ofMillis(delayMillis * parallelism));
});
});
return of(shouldProcessOnNThreadsETParallelism(collector, name));
}

private static <R extends Collection<Integer>> DynamicTest shouldProcessOnNThreadsETParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
Expand Down
70 changes: 70 additions & 0 deletions src/test/java/com/pivovarit/collectors/test/StreamingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.pivovarit.collectors.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.test.StreamingTest.CollectorDefinition.collector;
import static java.time.Duration.ofSeconds;
import static org.awaitility.Awaitility.await;

class StreamingTest {

private static Stream<CollectorDefinition<Integer, Integer>> allStreaming() {
return Stream.of(
collector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)),
collector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())),
collector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())),
collector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)),
collector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())),
collector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p()))
);
}

@TestFactory
Stream<DynamicTest> shouldPushElementsAsSoonAsTheyAreReady() {
return allStreaming()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var counter = new AtomicInteger();
Thread.startVirtualThread(() -> {
Stream.concat(Stream.of(0), IntStream.range(1, 10).boxed())
.collect(c.factory().collector(i -> returnWithDelay(i, ofSeconds(i))))
.forEach(__ -> counter.incrementAndGet());
});

await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofMillis(100))
.until(() -> counter.get() > 0);
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, Stream<R>> collector(Function<T, R> f);
}

private static Executor e() {
return Executors.newCachedThreadPool();
}

private static int p() {
return 4;
}
}

0 comments on commit b0ca86e

Please sign in to comment.