Skip to content

Commit

Permalink
Rewrite batching tests (#948)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent ec1f1ae commit a0001e7
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.pivovarit.collectors.BatchingSpliterator.partitioned;
Expand Down
99 changes: 0 additions & 99 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public static Stream<CollectorDefinition<Integer, Integer>> allOrdered() {
return Stream.of(
collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())),
collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join()
.toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())),
collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)),
collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)),
collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)),
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/com/pivovarit/collectors/test/BatchingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.test.BatchingTest.CollectorDefinition.collector;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

class BatchingTest {
private static Stream<BatchingTest.CollectorDefinition<Integer, Integer>> allBatching() {
return Stream.of(
collector("parallel(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e(), p), c -> c.thenApply(Stream::toList).join())),
collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallelToStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p), Stream::toList)),
collector("parallelToOrderedStream(e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p), Stream::toList))
);
}

@TestFactory
Stream<DynamicTest> shouldProcessOnExactlyNThreads() {
return allBatching()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var threads = new ConcurrentSkipListSet<>();
var parallelism = 4;

Stream.generate(() -> 42)
.limit(100)
.collect(c.collector().collector(i -> {
threads.add(Thread.currentThread().getName());
return i;
}, parallelism));

assertThat(threads).hasSizeLessThanOrEqualTo(parallelism);
}));
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Integer p);
}

record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

private static Executor e() {
return Executors.newCachedThreadPool();
}
}

0 comments on commit a0001e7

Please sign in to comment.