23
23
import java .util .Map ;
24
24
import java .util .Map .Entry ;
25
25
import java .util .Objects ;
26
- import java .util .concurrent .atomic .AtomicInteger ;
27
26
import javax .annotation .Nonnull ;
28
27
import javax .annotation .concurrent .ThreadSafe ;
29
28
import org .apache .commons .pool2 .impl .EvictionPolicy ;
@@ -37,15 +36,6 @@ public class WorkerPoolImpl implements WorkerPool {
37
36
/** Unless otherwise specified, the max number of multiplex workers per WorkerKey. */
38
37
private static final int DEFAULT_MAX_MULTIPLEX_WORKERS = 8 ;
39
38
40
- private static final int MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS = 1 ;
41
- /**
42
- * How many high-priority workers are currently borrowed. If greater than one, low-priority
43
- * workers cannot be borrowed until the high-priority ones are done.
44
- */
45
- private final AtomicInteger highPriorityWorkersInUse = new AtomicInteger (0 );
46
- /** Which mnemonics create high-priority workers. */
47
- private final ImmutableSet <String > highPriorityWorkerMnemonics ;
48
-
49
39
private final WorkerPoolConfig workerPoolConfig ;
50
40
/** Map of singleplex worker pools, one per mnemonic. */
51
41
private final ImmutableMap <String , SimpleWorkerPool > workerPools ;
@@ -58,9 +48,6 @@ public class WorkerPoolImpl implements WorkerPool {
58
48
public WorkerPoolImpl (WorkerPoolConfig workerPoolConfig ) {
59
49
this .workerPoolConfig = workerPoolConfig ;
60
50
61
- highPriorityWorkerMnemonics =
62
- ImmutableSet .copyOf ((Iterable <String >) workerPoolConfig .getHighPriorityWorkers ());
63
-
64
51
ImmutableMap <String , Integer > config =
65
52
createConfigFromOptions (workerPoolConfig .getWorkerMaxInstances (), DEFAULT_MAX_WORKERS );
66
53
ImmutableMap <String , Integer > multiplexConfig =
@@ -143,8 +130,7 @@ public void evictWithPolicy(EvictionPolicy<Worker> evictionPolicy) throws Interr
143
130
}
144
131
145
132
/**
146
- * Gets a worker. May block indefinitely if too many high-priority workers are in use and the
147
- * requested worker is not high priority.
133
+ * Gets a worker from worker pool. Could wait if no idle workers are available.
148
134
*
149
135
* @param key worker key
150
136
* @return a worker
@@ -158,44 +144,11 @@ public Worker borrowObject(WorkerKey key) throws IOException, InterruptedExcepti
158
144
Throwables .propagateIfPossible (t , IOException .class , InterruptedException .class );
159
145
throw new RuntimeException ("unexpected" , t );
160
146
}
161
-
162
- // TODO(b/244297036): move highPriorityWorkerMnemonics logic to the ResourceManager.
163
- if (highPriorityWorkerMnemonics .contains (key .getMnemonic ())) {
164
- highPriorityWorkersInUse .incrementAndGet ();
165
- } else {
166
- try {
167
- waitForHighPriorityWorkersToFinish ();
168
- } catch (InterruptedException e ) {
169
- returnObject (key , result );
170
- throw e ;
171
- }
172
- }
173
-
174
147
return result ;
175
148
}
176
149
177
- /**
178
- * Checks if there is no blockers from high priority workers to take new worker with this worker
179
- * key. Doesn't check occupancy of worker pool for this mnemonic.
180
- */
181
- @ Override
182
- public boolean couldBeBorrowed (WorkerKey key ) {
183
- if (highPriorityWorkerMnemonics .contains (key .getMnemonic ())) {
184
- return true ;
185
- }
186
-
187
- if (highPriorityWorkersInUse .get () <= MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS ) {
188
- return true ;
189
- }
190
-
191
- return false ;
192
- }
193
-
194
150
@ Override
195
151
public void returnObject (WorkerKey key , Worker obj ) {
196
- if (highPriorityWorkerMnemonics .contains (key .getMnemonic ())) {
197
- decrementHighPriorityWorkerCount ();
198
- }
199
152
if (doomedWorkers .contains (obj .getWorkerId ())) {
200
153
obj .setDoomed (true );
201
154
}
@@ -204,9 +157,6 @@ public void returnObject(WorkerKey key, Worker obj) {
204
157
205
158
@ Override
206
159
public void invalidateObject (WorkerKey key , Worker obj ) throws InterruptedException {
207
- if (highPriorityWorkerMnemonics .contains (key .getMnemonic ())) {
208
- decrementHighPriorityWorkerCount ();
209
- }
210
160
if (doomedWorkers .contains (obj .getWorkerId ())) {
211
161
obj .setDoomed (true );
212
162
}
@@ -218,29 +168,6 @@ public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedExcept
218
168
}
219
169
}
220
170
221
- // Decrements the high-priority workers counts and pings waiting threads if appropriate.
222
- private void decrementHighPriorityWorkerCount () {
223
- if (highPriorityWorkersInUse .decrementAndGet () <= MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS ) {
224
- synchronized (highPriorityWorkersInUse ) {
225
- highPriorityWorkersInUse .notifyAll ();
226
- }
227
- }
228
- }
229
-
230
- // Returns once less than two high-priority workers are running.
231
- private void waitForHighPriorityWorkersToFinish () throws InterruptedException {
232
- // Fast path for the case where the high-priority workers feature is not in use.
233
- if (highPriorityWorkerMnemonics .isEmpty ()) {
234
- return ;
235
- }
236
-
237
- while (highPriorityWorkersInUse .get () > MAX_NON_BLOCKING_HIGH_PRIORITY_WORKERS ) {
238
- synchronized (highPriorityWorkersInUse ) {
239
- highPriorityWorkersInUse .wait ();
240
- }
241
- }
242
- }
243
-
244
171
@ Override
245
172
public synchronized void setDoomedWorkers (ImmutableSet <Integer > workerIds ) {
246
173
this .doomedWorkers = workerIds ;
@@ -280,17 +207,14 @@ public static class WorkerPoolConfig {
280
207
private final WorkerFactory workerFactory ;
281
208
private final List <Entry <String , Integer >> workerMaxInstances ;
282
209
private final List <Entry <String , Integer >> workerMaxMultiplexInstances ;
283
- private final List <String > highPriorityWorkers ;
284
210
285
211
public WorkerPoolConfig (
286
212
WorkerFactory workerFactory ,
287
213
List <Entry <String , Integer >> workerMaxInstances ,
288
- List <Entry <String , Integer >> workerMaxMultiplexInstances ,
289
- List <String > highPriorityWorkers ) {
214
+ List <Entry <String , Integer >> workerMaxMultiplexInstances ) {
290
215
this .workerFactory = workerFactory ;
291
216
this .workerMaxInstances = workerMaxInstances ;
292
217
this .workerMaxMultiplexInstances = workerMaxMultiplexInstances ;
293
- this .highPriorityWorkers = highPriorityWorkers ;
294
218
}
295
219
296
220
public WorkerFactory getWorkerFactory () {
@@ -305,10 +229,6 @@ public List<Entry<String, Integer>> getWorkerMaxMultiplexInstances() {
305
229
return workerMaxMultiplexInstances ;
306
230
}
307
231
308
- public List <String > getHighPriorityWorkers () {
309
- return highPriorityWorkers ;
310
- }
311
-
312
232
@ Override
313
233
public boolean equals (Object o ) {
314
234
if (this == o ) {
@@ -320,14 +240,12 @@ public boolean equals(Object o) {
320
240
WorkerPoolConfig that = (WorkerPoolConfig ) o ;
321
241
return workerFactory .equals (that .workerFactory )
322
242
&& workerMaxInstances .equals (that .workerMaxInstances )
323
- && workerMaxMultiplexInstances .equals (that .workerMaxMultiplexInstances )
324
- && highPriorityWorkers .equals (that .highPriorityWorkers );
243
+ && workerMaxMultiplexInstances .equals (that .workerMaxMultiplexInstances );
325
244
}
326
245
327
246
@ Override
328
247
public int hashCode () {
329
- return Objects .hash (
330
- workerFactory , workerMaxInstances , workerMaxMultiplexInstances , highPriorityWorkers );
248
+ return Objects .hash (workerFactory , workerMaxInstances , workerMaxMultiplexInstances );
331
249
}
332
250
}
333
251
}
0 commit comments