Skip to content

Commit

Permalink
Rewrite basic collecting tests cases with various parallelism levels (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 15, 2024
1 parent bc1fb1b commit 0d2fe7f
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 34 deletions.
34 changes: 0 additions & 34 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,8 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
var tests = of(
shouldCollect(collector, name, 1),
shouldCollect(collector, name, PARALLELISM),
shouldCollectNElementsWithNParallelism(collector, name, 1),
shouldCollectNElementsWithNParallelism(collector, name, PARALLELISM),
shouldCollectToEmpty(collector, name),
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldHandleThrowable(collector, name),
Expand All @@ -185,11 +182,8 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread

private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) {
var tests = of(
shouldCollect(collector, name, 1),
shouldCollect(collector, name, PARALLELISM),
shouldCollectNElementsWithNParallelism(collector, name, 1),
shouldCollectNElementsWithNParallelism(collector, name, PARALLELISM),
shouldCollectToEmpty(collector, name),
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldHandleThrowable(collector, name),
Expand All @@ -209,9 +203,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
var tests = of(
shouldCollect(collector, name, 1),
shouldCollect(collector, name, PARALLELISM),
shouldCollectToEmpty(collector, name),
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldHandleThrowable(collector, name),
Expand All @@ -226,9 +217,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread

private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
var tests = of(
shouldCollect(collector, name, 1),
shouldCollect(collector, name, PARALLELISM),
shouldCollectToEmpty(collector, name),
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldRespectParallelism(collector, name),
Expand Down Expand Up @@ -268,14 +256,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldNotBlockTheCall
});
}

private static <R extends Collection<Integer>> DynamicTest shouldCollectToEmpty(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should collect to empty", name), () -> {
withExecutor(e -> {
assertThat(Stream.<Integer>empty().collect(collector.apply(i -> i, e, PARALLELISM)).join()).isEmpty();
});
});
}

private static <R extends Collection<Integer>> DynamicTest shouldRespectParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should respect parallelism", name), () -> {
int parallelism = 2;
Expand Down Expand Up @@ -333,20 +313,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldProcessOnNThrea
});
}

private static <R extends Collection<Integer>> DynamicTest shouldCollect(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> factory, String name, int parallelism) {
return dynamicTest(format("%s: should collect with parallelism %s", name, parallelism), () -> {
var elements = IntStream.range(0, 10).boxed().toList();

withExecutor(e -> {
Collector<Integer, ?, CompletableFuture<R>> ctor = factory.apply(i -> i, e, parallelism);
Collection<Integer> result = elements.stream().collect(ctor)
.join();

assertThat(result).hasSameElementsAs(elements);
});
});
}

private static <R extends Collection<Integer>> DynamicTest shouldCollectNElementsWithNParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> factory, String name, int parallelism) {
return dynamicTest(format("%s: should collect %s elements with parallelism %s", name, parallelism, parallelism), () -> {
var elements = IntStream.iterate(0, i -> i + 1).limit(parallelism).boxed().toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.pivovarit.collectors.test.BasicParallelismTest.CollectorDefinition.collector;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

class BasicParallelismTest {

private static Stream<CollectorDefinition<Integer, Integer>> allBounded() {
return Stream.of(
collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())),
collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)),
collector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)),
collector("parallelToOrderedStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p), Stream::toList)),
collector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList))
);
}

@TestFactory
Stream<DynamicTest> shouldProcessEmptyWithMaxParallelism() {
return Stream.of(1, 2, 4, 8, 16, 32, 64, 100)
.flatMap(p -> allBounded()
.map(c -> DynamicTest.dynamicTest("%s (parallelism: %d)".formatted(c.name(), p), () -> {
assertThat(Stream.<Integer>empty().collect(c.factory().collector(i -> i, p))).isEmpty();
})));
}

@TestFactory
Stream<DynamicTest> shouldProcessAllElementsWithMaxParallelism() {
return Stream.of(1, 2, 4, 8, 16, 32, 64, 100)
.flatMap(p -> allBounded()
.map(c -> DynamicTest.dynamicTest("%s (parallelism: %d)".formatted(c.name(), p), () -> {
var list = IntStream.range(0, 100).boxed().toList();
List<Integer> result = list.stream().collect(c.factory().collector(i -> i, p));
assertThat(result).containsExactlyInAnyOrderElementsOf(list);
})));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

@FunctionalInterface
interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Integer p);
}

private static Executor e() {
return Executors.newCachedThreadPool();
}
}
100 changes: 100 additions & 0 deletions src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

class BasicProcessingTest {

private static Stream<CollectorDefinition<Integer, Integer>> all() {
return Stream.of(
collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())),
collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())),
collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)),
collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)),
collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)),
collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)),
collector("parallelToStream()", f -> collectingAndThen(ParallelCollectors.parallelToStream(f), Stream::toList)),
collector("parallelToStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e()), Stream::toList)),
collector("parallelToStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p()), Stream::toList)),
collector("parallelToStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p()), Stream::toList)),
collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)),
collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)),
collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)),
collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList))
);
}

public static Stream<CollectorDefinition<Integer, Integer>> allOrdered() {
return Stream.of(
collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())),
collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())),
collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)),
collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)),
collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)),
collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)),
collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)),
collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)),
collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)),
collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList))
);
}

@TestFactory
Stream<DynamicTest> shouldProcessEmpty() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
assertThat(Stream.<Integer>empty().collect(c.collector().apply(i -> i))).isEmpty();
}));
}

@TestFactory
Stream<DynamicTest> shouldProcessAllElements() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var list = IntStream.range(0, 100).boxed().toList();
List<Integer> result = list.stream().collect(c.collector().apply(i -> i));
assertThat(result).containsExactlyInAnyOrderElementsOf(list);
}));
}

@TestFactory
Stream<DynamicTest> shouldProcessAllElementsInOrder() {
return allOrdered()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var list = IntStream.range(0, 100).boxed().toList();
List<Integer> result = list.stream().collect(c.collector().apply(i -> i));
assertThat(result).containsAnyElementsOf(list);
}));
}

record CollectorDefinition<T, R>(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
return new CollectorDefinition<>(name, collector);
}
}

private static Executor e() {
return Executors.newCachedThreadPool();
}

private static int p() {
return 4;
}
}

0 comments on commit 0d2fe7f

Please sign in to comment.