|
27 | 27 | import java.util.concurrent.Callable;
|
28 | 28 | import java.util.concurrent.ExecutionException;
|
29 | 29 | import java.util.concurrent.RejectedExecutionException;
|
30 |
| -import java.util.concurrent.ExecutorService; |
31 | 30 | import java.util.concurrent.Future;
|
32 |
| -import java.util.concurrent.ThreadPoolExecutor; |
33 | 31 | import java.util.concurrent.TimeUnit;
|
34 |
| -import java.util.concurrent.ArrayBlockingQueue; |
35 | 32 | import java.util.concurrent.TimeoutException;
|
36 | 33 | import java.util.List;
|
37 | 34 |
|
|
54 | 51 | import org.apache.hadoop.ipc.RemoteException;
|
55 | 52 | import org.apache.hadoop.ipc.RpcInvocationHandler;
|
56 | 53 | import org.apache.hadoop.ipc.StandbyException;
|
| 54 | +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; |
57 | 55 | import org.apache.hadoop.util.Time;
|
58 | 56 | import org.slf4j.Logger;
|
59 | 57 | import org.slf4j.LoggerFactory;
|
@@ -187,18 +185,8 @@ public class ObserverReadProxyProvider<T>
|
187 | 185 |
|
188 | 186 | /**
|
189 | 187 | * Threadpool to send the getHAServiceState requests.
|
190 |
| - * |
191 |
| - * One thread running all the time, with up to 4 threads. Idle threads will be killed after |
192 |
| - * 1 minute. At most 1024 requests can be submitted before they start to be rejected. |
193 |
| - * |
194 |
| - * Each hdfs client will have its own ObserverReadProxyProvider. Thus, |
195 |
| - * having 1 thread running should be sufficient in most cases. |
196 |
| - * We are not expecting to receive a lot of outstanding RPC calls |
197 |
| - * from a single hdfs client, thus setting the queue size to 1024. |
198 | 188 | */
|
199 |
| - private final ExecutorService nnProbingThreadPool = |
200 |
| - new ThreadPoolExecutor(1, 4, 1L, TimeUnit.MINUTES, |
201 |
| - new ArrayBlockingQueue<Runnable>(1024)); |
| 189 | + private final BlockingThreadPoolExecutorService nnProbingThreadPool; |
202 | 190 |
|
203 | 191 | /**
|
204 | 192 | * By default ObserverReadProxyProvider uses
|
@@ -263,6 +251,15 @@ public ObserverReadProxyProvider(
|
263 | 251 | + "class does not implement {}", uri, ClientProtocol.class.getName());
|
264 | 252 | this.observerReadEnabled = false;
|
265 | 253 | }
|
| 254 | + |
| 255 | + /* |
| 256 | + * At most 4 threads will be running and each thread will die after 10 |
| 257 | + * seconds of no use. Up to 132 tasks (4 active + 128 waiting) can be |
| 258 | + * submitted simultaneously. |
| 259 | + */ |
| 260 | + nnProbingThreadPool = |
| 261 | + BlockingThreadPoolExecutorService.newInstance(4, 128, 10L, TimeUnit.SECONDS, |
| 262 | + "nn-ha-state-probing"); |
266 | 263 | }
|
267 | 264 |
|
268 | 265 | public AlignmentContext getAlignmentContext() {
|
@@ -649,6 +646,7 @@ public synchronized void close() throws IOException {
|
649 | 646 | }
|
650 | 647 | }
|
651 | 648 | failoverProxy.close();
|
| 649 | + nnProbingThreadPool.shutdown(); |
652 | 650 | }
|
653 | 651 |
|
654 | 652 | @Override
|
|
0 commit comments