@@ -40,10 +40,10 @@ class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFutu
40
40
private final Dispatcher <R > dispatcher ;
41
41
42
42
private ParallelStreamCollector (
43
- Function <T , R > function ,
44
- CompletionStrategy <R > completionStrategy ,
45
- Set <Characteristics > characteristics ,
46
- Dispatcher <R > dispatcher ) {
43
+ Function <T , R > function ,
44
+ CompletionStrategy <R > completionStrategy ,
45
+ Set <Characteristics > characteristics ,
46
+ Dispatcher <R > dispatcher ) {
47
47
this .completionStrategy = completionStrategy ;
48
48
this .characteristics = characteristics ;
49
49
this .dispatcher = dispatcher ;
@@ -67,7 +67,7 @@ public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
67
67
public BinaryOperator <List <CompletableFuture <R >>> combiner () {
68
68
return (left , right ) -> {
69
69
throw new UnsupportedOperationException (
70
- "Using parallel stream with parallel collectors is a bad idea" );
70
+ "Using parallel stream with parallel collectors is a bad idea" );
71
71
};
72
72
}
73
73
@@ -90,6 +90,13 @@ public Set<Characteristics> characteristics() {
90
90
return new ParallelStreamCollector <>(mapper , unordered (), UNORDERED , Dispatcher .virtual ());
91
91
}
92
92
93
+ static <T , R > Collector <T , ?, Stream <R >> streaming (Function <T , R > mapper , int parallelism ) {
94
+ requireNonNull (mapper , "mapper can't be null" );
95
+ requireValidParallelism (parallelism );
96
+
97
+ return new ParallelStreamCollector <>(mapper , unordered (), UNORDERED , Dispatcher .virtual (parallelism ));
98
+ }
99
+
93
100
static <T , R > Collector <T , ?, Stream <R >> streaming (Function <T , R > mapper , Executor executor , int parallelism ) {
94
101
requireNonNull (executor , "executor can't be null" );
95
102
requireNonNull (mapper , "mapper can't be null" );
@@ -104,8 +111,15 @@ public Set<Characteristics> characteristics() {
104
111
return new ParallelStreamCollector <>(mapper , ordered (), emptySet (), Dispatcher .virtual ());
105
112
}
106
113
114
+ static <T , R > Collector <T , ?, Stream <R >> streamingOrdered (Function <T , R > mapper , int parallelism ) {
115
+ requireNonNull (mapper , "mapper can't be null" );
116
+ requireValidParallelism (parallelism );
117
+
118
+ return new ParallelStreamCollector <>(mapper , ordered (), emptySet (), Dispatcher .virtual (parallelism ));
119
+ }
120
+
107
121
static <T , R > Collector <T , ?, Stream <R >> streamingOrdered (Function <T , R > mapper , Executor executor ,
108
- int parallelism ) {
122
+ int parallelism ) {
109
123
requireNonNull (executor , "executor can't be null" );
110
124
requireNonNull (mapper , "mapper can't be null" );
111
125
requireValidParallelism (parallelism );
@@ -119,60 +133,57 @@ private BatchingCollectors() {
119
133
}
120
134
121
135
static <T , R > Collector <T , ?, Stream <R >> streaming (Function <T , R > mapper , Executor executor ,
122
- int parallelism ) {
136
+ int parallelism ) {
123
137
requireNonNull (executor , "executor can't be null" );
124
138
requireNonNull (mapper , "mapper can't be null" );
125
139
requireValidParallelism (parallelism );
126
140
127
141
return parallelism == 1
128
- ? syncCollector (mapper )
129
- : batchingCollector (mapper , executor , parallelism );
142
+ ? syncCollector (mapper )
143
+ : batchingCollector (mapper , executor , parallelism );
130
144
}
131
145
132
146
static <T , R > Collector <T , ?, Stream <R >> streamingOrdered (Function <T , R > mapper , Executor executor ,
133
- int parallelism ) {
147
+ int parallelism ) {
134
148
requireNonNull (executor , "executor can't be null" );
135
149
requireNonNull (mapper , "mapper can't be null" );
136
150
requireValidParallelism (parallelism );
137
151
138
152
return parallelism == 1
139
- ? syncCollector (mapper )
140
- : batchingCollector (mapper , executor , parallelism );
153
+ ? syncCollector (mapper )
154
+ : batchingCollector (mapper , executor , parallelism );
141
155
}
142
156
143
157
private static <T , R > Collector <T , ?, Stream <R >> batchingCollector (Function <T , R > mapper ,
144
- Executor executor , int parallelism ) {
158
+ Executor executor , int parallelism ) {
145
159
return collectingAndThen (
146
- toList (),
147
- list -> {
148
- // no sense to repack into batches of size 1
149
- if (list .size () == parallelism ) {
150
- return list .stream ()
151
- .collect (new ParallelStreamCollector <>(
152
- mapper ,
153
- ordered (),
154
- emptySet (),
155
- Dispatcher .from (executor , parallelism )));
156
- }
157
- else {
158
- return partitioned (list , parallelism )
159
- .collect (collectingAndThen (new ParallelStreamCollector <>(
160
- batching (mapper ),
161
- ordered (),
162
- emptySet (),
163
- Dispatcher .from (executor , parallelism )),
164
- s -> s .flatMap (Collection ::stream )));
165
- }
166
- });
160
+ toList (),
161
+ list -> {
162
+ // no sense to repack into batches of size 1
163
+ if (list .size () == parallelism ) {
164
+ return list .stream ()
165
+ .collect (new ParallelStreamCollector <>(
166
+ mapper ,
167
+ ordered (),
168
+ emptySet (),
169
+ Dispatcher .from (executor , parallelism )));
170
+ } else {
171
+ return partitioned (list , parallelism )
172
+ .collect (collectingAndThen (new ParallelStreamCollector <>(
173
+ batching (mapper ),
174
+ ordered (),
175
+ emptySet (),
176
+ Dispatcher .from (executor , parallelism )),
177
+ s -> s .flatMap (Collection ::stream )));
178
+ }
179
+ });
167
180
}
168
181
169
182
private static <T , R > Collector <T , Stream .Builder <R >, Stream <R >> syncCollector (Function <T , R > mapper ) {
170
183
return Collector .of (Stream ::builder , (rs , t ) -> rs .add (mapper .apply (t )), (rs , rs2 ) -> {
171
184
throw new UnsupportedOperationException (
172
- "Using parallel stream with parallel collectors is a bad idea" );
185
+ "Using parallel stream with parallel collectors is a bad idea" );
173
186
}, Stream .Builder ::build );
174
187
}
175
-
176
188
}
177
-
178
189
}
0 commit comments