Skip to content

Commit

Permalink
Rewrite max parallelism tests (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent 3defaa4 commit 2d5eaad
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
20 changes: 0 additions & 20 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
shouldInterruptOnException(collector, name)
);

tests = limitedParallelism ? of(shouldRespectParallelism(collector, name)) : tests;
tests = limitedParallelism ? of(shouldRejectInvalidParallelism(collector, name)) : tests;

return tests;
Expand All @@ -190,7 +189,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread
private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldRespectParallelism(collector, name),
shouldPushElementsToStreamAsSoonAsPossible(collector, name),
shouldShortCircuitOnException(collector, name),
shouldRejectInvalidParallelism(collector, name)
Expand All @@ -209,24 +207,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> batchStreamin
of(shouldProcessOnNThreadsETParallelism(collector, name)));
}

private static <R extends Collection<Integer>> DynamicTest shouldRespectParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should respect parallelism", name), () -> {
int parallelism = 2;
int delayMillis = 50;
withExecutor(e -> {
LocalTime before = LocalTime.now();
Stream.generate(() -> 42)
.limit(4)
.collect(collector.apply(i -> returnWithDelay(i, ofMillis(delayMillis)), e, parallelism))
.join();

LocalTime after = LocalTime.now();
assertThat(Duration.between(before, after))
.isGreaterThanOrEqualTo(ofMillis(delayMillis * parallelism));
});
});
}

private static <R extends Collection<Integer>> DynamicTest shouldPushElementsToStreamAsSoonAsPossible(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> {
int parallelism = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import com.pivovarit.collectors.TestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -22,7 +26,8 @@ class BasicParallelismTest {

private static Stream<CollectorDefinition<Integer, Integer>> allBounded() {
return Stream.of(
collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())),
collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join()
.toList())),
collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)),
Expand Down Expand Up @@ -52,6 +57,15 @@ Stream<DynamicTest> shouldProcessAllElementsWithMaxParallelism() {
})));
}

@TestFactory
Stream<DynamicTest> shouldRespectMaxParallelism() {
return allBounded()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var duration = timed(() -> IntStream.range(0, 10).boxed().collect(c.factory().collector(i -> TestUtils.returnWithDelay(i, Duration.ofMillis(100)), 2)));
assertThat(duration).isCloseTo(Duration.ofMillis(500), Duration.ofMillis(100));
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
Expand All @@ -66,4 +80,10 @@ private interface CollectorFactory<T, R> {
private static Executor e() {
return Executors.newCachedThreadPool();
}

private static Duration timed(Supplier<?> action) {
long start = System.currentTimeMillis();
var result = action.get();
return Duration.ofMillis(System.currentTimeMillis() - start);
}
}

0 comments on commit 2d5eaad

Please sign in to comment.