|
72 | 72 | import org.elasticsearch.transport.TransportChannel; |
73 | 73 | import org.elasticsearch.transport.TransportException; |
74 | 74 | import org.elasticsearch.transport.TransportRequest; |
75 | | -import org.elasticsearch.transport.TransportRequestHandler; |
76 | 75 | import org.elasticsearch.transport.TransportRequestOptions; |
77 | 76 | import org.elasticsearch.transport.TransportResponse; |
78 | 77 | import org.elasticsearch.transport.TransportResponse.Empty; |
@@ -145,21 +144,21 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans |
145 | 144 |
|
146 | 145 | this.transportPrimaryAction = actionName + "[p]"; |
147 | 146 | this.transportReplicaAction = actionName + "[r]"; |
148 | | - registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); |
| 147 | + |
| 148 | + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); |
| 149 | + transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, |
| 150 | + forcePrimaryActionExecution(), this::handlePrimaryRequest); |
| 151 | + // we must never reject on because of thread pool capacity on replicas |
| 152 | + transportService.registerRequestHandler( |
| 153 | + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); |
149 | 154 |
|
150 | 155 | this.transportOptions = transportOptions(settings); |
151 | 156 |
|
152 | 157 | this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; |
153 | 158 | } |
154 | 159 |
|
155 | | - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request, |
156 | | - Supplier<ReplicaRequest> replicaRequest, String executor) { |
157 | | - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); |
158 | | - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, |
159 | | - this::handlePrimaryRequest); |
160 | | - // we must never reject on because of thread pool capacity on replicas |
161 | | - transportService.registerRequestHandler( |
162 | | - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); |
| 160 | + protected boolean forcePrimaryActionExecution() { |
| 161 | + return false; |
163 | 162 | } |
164 | 163 |
|
165 | 164 | @Override |
|
0 commit comments