Skip to content

Commit b30d281

Browse files
authored
Add ExecutorPollutionTest (#874)
1 parent 03d52fc commit b30d281

File tree

3 files changed

+80
-17
lines changed

3 files changed

+80
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.pivovarit.collectors.functional;
2+
3+
import com.pivovarit.collectors.ParallelCollectors;
4+
import org.junit.jupiter.api.DynamicTest;
5+
import org.junit.jupiter.api.TestFactory;
6+
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.Executor;
9+
import java.util.concurrent.LinkedBlockingQueue;
10+
import java.util.concurrent.ThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.function.Function;
13+
import java.util.stream.Collector;
14+
import java.util.stream.Stream;
15+
16+
import static java.util.stream.Collectors.toList;
17+
import static java.util.stream.Stream.of;
18+
19+
class ExecutorPollutionTest {
20+
21+
@TestFactory
22+
Stream<DynamicTest> shouldStartProcessingElementsTests() {
23+
return of(
24+
shouldNotSubmitMoreTasksThanParallelism(ParallelCollectors::parallel, "parallel#1"),
25+
shouldNotSubmitMoreTasksThanParallelism((f, e, p) -> ParallelCollectors.parallel(f, toList(), e, p), "parallel#2"),
26+
shouldNotSubmitMoreTasksThanParallelism(ParallelCollectors::parallelToStream, "parallelToStream"),
27+
shouldNotSubmitMoreTasksThanParallelism(ParallelCollectors::parallelToOrderedStream, "parallelToOrderedStream"),
28+
shouldNotSubmitMoreTasksThanParallelism(ParallelCollectors.Batching::parallel, "parallel#1 (batching)"),
29+
shouldNotSubmitMoreTasksThanParallelism((f, e, p) -> ParallelCollectors.Batching.parallel(f, toList(), e, p), "parallel#2 (batching)"),
30+
shouldNotSubmitMoreTasksThanParallelism(ParallelCollectors.Batching::parallelToStream, "parallelToStream (batching)"),
31+
shouldNotSubmitMoreTasksThanParallelism(ParallelCollectors.Batching::parallelToOrderedStream, "parallelToOrderedStream (batching)")
32+
);
33+
}
34+
35+
private static DynamicTest shouldNotSubmitMoreTasksThanParallelism(CollectorFactory<Integer> collector, String name) {
36+
return DynamicTest.dynamicTest(name, () -> {
37+
try (var e = warmedUp(new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2)))) {
38+
39+
var result = Stream.generate(() -> 42)
40+
.limit(1000)
41+
.collect(collector.apply(i -> i, e, 2));
42+
43+
switch (result) {
44+
case CompletableFuture<?> cf -> cf.join();
45+
case Stream<?> s -> s.forEach((__) -> {});
46+
default -> throw new IllegalStateException("can't happen");
47+
}
48+
}
49+
});
50+
}
51+
52+
interface CollectorFactory<T> {
53+
Collector<T, ?, ?> apply(Function<T, ?> function, Executor executorService, int parallelism);
54+
}
55+
56+
private static ThreadPoolExecutor warmedUp(ThreadPoolExecutor e) {
57+
for (int i = 0; i < e.getCorePoolSize(); i++) {
58+
e.submit(() -> {});
59+
}
60+
return e;
61+
}
62+
}

src/test/java/com/pivovarit/collectors/ExecutorValidationTest.java renamed to src/test/java/com/pivovarit/collectors/functional/ExecutorValidationTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package com.pivovarit.collectors;
1+
package com.pivovarit.collectors.functional;
22

3+
import com.pivovarit.collectors.ParallelCollectors;
34
import org.junit.jupiter.api.DynamicTest;
45
import org.junit.jupiter.api.TestFactory;
56

src/test/java/com/pivovarit/collectors/ToStreamTest.java renamed to src/test/java/com/pivovarit/collectors/functional/ImmediateStreamProcessingTest.java

+16-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package com.pivovarit.collectors;
1+
package com.pivovarit.collectors.functional;
22

3+
import com.pivovarit.collectors.ParallelCollectors;
34
import org.junit.jupiter.api.DynamicTest;
45
import org.junit.jupiter.api.Test;
56
import org.junit.jupiter.api.TestFactory;
@@ -15,7 +16,7 @@
1516
import static java.util.stream.Stream.of;
1617
import static org.awaitility.Awaitility.await;
1718

18-
class ToStreamTest {
19+
class ImmediateStreamProcessingTest {
1920

2021
@TestFactory
2122
Stream<DynamicTest> shouldStartProcessingElementsTests() {
@@ -24,25 +25,24 @@ Stream<DynamicTest> shouldStartProcessingElementsTests() {
2425
shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newVirtualThreadPerTaskExecutor(), 2), "parallelToStream, parallelism: 2, vthreads"),
2526
shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newCachedThreadPool(), 2), "parallelToOrderedStream, parallelism: 2, os threads"),
2627
shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newVirtualThreadPerTaskExecutor(), 2), "parallelToOrderedStream, parallelism: 2, vthreads")
27-
);
28+
);
2829
}
2930

3031
private static DynamicTest shouldStartProcessingElements(Function<Function<Integer, Integer>, Collector<Integer, ?, Stream<Integer>>> collector, String name) {
3132
return DynamicTest.dynamicTest(name, () -> {
3233
var counter = new AtomicInteger();
33-
Thread.ofPlatform().start(() -> {
34-
Stream.iterate(0, i -> i + 1)
35-
.limit(100)
36-
.collect(collector.apply(i -> {
37-
try {
38-
Thread.sleep(100);
39-
} catch (InterruptedException ex) {
40-
Thread.currentThread().interrupt();
41-
}
42-
return i;
43-
}))
44-
.forEach(c -> counter.incrementAndGet());
45-
});
34+
Thread.ofPlatform()
35+
.start(() -> Stream.iterate(0, i -> i + 1)
36+
.limit(100)
37+
.collect(collector.apply(i -> {
38+
try {
39+
Thread.sleep(100);
40+
} catch (InterruptedException ex) {
41+
Thread.currentThread().interrupt();
42+
}
43+
return i;
44+
}))
45+
.forEach(c -> counter.incrementAndGet()));
4646
await()
4747
.atMost(ofSeconds(1))
4848
.until(() -> counter.get() > 0);

0 commit comments

Comments
 (0)