Skip to content

Commit 85708cc

Browse files
committed
Fix issues
1 parent 93baa8a commit 85708cc

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.function.Function;
1313
import java.util.function.Supplier;
1414
import java.util.stream.Collector;
15+
import java.util.stream.Collectors;
1516
import java.util.stream.Stream;
1617

1718
import static com.pivovarit.collectors.BatchingStream.batching;
@@ -21,6 +22,7 @@
2122
import static java.util.Objects.requireNonNull;
2223
import static java.util.concurrent.CompletableFuture.allOf;
2324
import static java.util.concurrent.CompletableFuture.supplyAsync;
25+
import static java.util.stream.Collectors.*;
2426
import static java.util.stream.Collectors.collectingAndThen;
2527
import static java.util.stream.Collectors.toList;
2628

src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.pivovarit.collectors.blackbox;
22

33
import com.pivovarit.collectors.ParallelCollectors.Batching;
4-
import org.assertj.core.api.Assertions;
54
import org.junit.jupiter.api.DynamicTest;
65
import org.junit.jupiter.api.Test;
76
import org.junit.jupiter.api.TestFactory;
@@ -49,7 +48,6 @@
4948
import static java.util.stream.Collectors.toList;
5049
import static java.util.stream.Collectors.toSet;
5150
import static java.util.stream.Stream.of;
52-
import static org.assertj.core.api.Assertions.*;
5351
import static org.assertj.core.api.Assertions.assertThat;
5452
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5553
import static org.awaitility.Awaitility.await;
@@ -79,12 +77,18 @@ Stream<DynamicTest> collectors() {
7977
@TestFactory
8078
Stream<DynamicTest> batching_collectors() {
8179
return of(
82-
batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true),
83-
batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false),
84-
batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true),
85-
batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true),
86-
batchTests((m, e, p) -> adaptAsync(Batching.parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false),
87-
batchTests((m, e, p) -> adaptAsync(Batching.parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true)
80+
batchTests((m, e, p) -> Batching
81+
.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true),
82+
batchTests((m, e, p) -> Batching
83+
.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false),
84+
batchTests((m, e, p) -> Batching
85+
.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true),
86+
batchTests((m, e, p) -> adapt(Batching
87+
.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true),
88+
batchTests((m, e, p) -> adaptAsync(Batching
89+
.parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false),
90+
batchTests((m, e, p) -> adaptAsync(Batching
91+
.parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true)
8892
).flatMap(identity());
8993
}
9094

@@ -109,8 +113,9 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {
109113
try {
110114
List<String> list = Arrays.asList("A", "B");
111115

112-
list.stream()
116+
Stream<String> stream = list.stream()
113117
.collect(parallel(s -> {
118+
System.out.println("Running on " + Thread.currentThread().getName());
114119
executions.incrementAndGet();
115120
return s;
116121
}, countingExecutor, 1))
@@ -119,7 +124,7 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {
119124
executor.shutdown();
120125
}
121126

122-
assertThat(countingExecutor.getInvocations()).isEqualTo(2);
127+
assertThat(countingExecutor.getInvocations()).isEqualTo(1);
123128
assertThat(executions.get()).isEqualTo(2);
124129
}
125130

0 commit comments

Comments
 (0)