3636import org .elasticsearch .common .bytes .BytesArray ;
3737import org .elasticsearch .common .bytes .BytesReference ;
3838import org .elasticsearch .common .bytes .CompositeBytesReference ;
39+ import org .elasticsearch .common .collect .MapBuilder ;
3940import org .elasticsearch .common .component .AbstractLifecycleComponent ;
4041import org .elasticsearch .common .component .Lifecycle ;
4142import org .elasticsearch .common .compress .Compressor ;
9899import java .util .TreeSet ;
99100import java .util .concurrent .ConcurrentHashMap ;
100101import java .util .concurrent .ConcurrentMap ;
102+ import java .util .concurrent .CopyOnWriteArrayList ;
101103import java .util .concurrent .CountDownLatch ;
102104import java .util .concurrent .TimeUnit ;
103105import java .util .concurrent .atomic .AtomicBoolean ;
104- import java .util .concurrent .atomic .AtomicLong ;
105106import java .util .concurrent .atomic .AtomicReference ;
106107import java .util .concurrent .locks .ReadWriteLock ;
107108import java .util .concurrent .locks .ReentrantReadWriteLock ;
@@ -205,7 +206,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
205206 protected final NetworkService networkService ;
206207 protected final Set <ProfileSettings > profileSettings ;
207208
208- private volatile TransportService transportService ;
209+ private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener () ;
209210
210211 private final ConcurrentMap <String , BoundTransportAddress > profileBoundAddresses = newConcurrentMap ();
211212 // node id to actual channel
@@ -225,12 +226,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
225226 protected final ConnectionProfile defaultConnectionProfile ;
226227
227228 private final ConcurrentMap <Long , HandshakeResponseHandler > pendingHandshakes = new ConcurrentHashMap <>();
228- private final AtomicLong requestIdGenerator = new AtomicLong ();
229229 private final CounterMetric numHandshakes = new CounterMetric ();
230230 private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake" ;
231231
232232 private final MeanMetric readBytesMetric = new MeanMetric ();
233233 private final MeanMetric transmittedBytesMetric = new MeanMetric ();
234+ private volatile Map <String , RequestHandlerRegistry > requestHandlers = Collections .emptyMap ();
235+ private final ResponseHandlers responseHandlers = new ResponseHandlers ();
234236
235237 public TcpTransport (String transportName , Settings settings , ThreadPool threadPool , BigArrays bigArrays ,
236238 CircuitBreakerService circuitBreakerService , NamedWriteableRegistry namedWriteableRegistry ,
@@ -287,18 +289,28 @@ protected void doStart() {
287289 }
288290 }
289291
292+ @ Override
293+ public void addConnectionListener (TransportConnectionListener listener ) {
294+ transportListener .listeners .add (listener );
295+ }
296+
297+ @ Override
298+ public boolean removeConnectionListener (TransportConnectionListener listener ) {
299+ return transportListener .listeners .remove (listener );
300+ }
301+
290302 @ Override
291303 public CircuitBreaker getInFlightRequestBreaker () {
292304 // We always obtain a fresh breaker to reflect changes to the breaker configuration.
293305 return circuitBreakerService .getBreaker (CircuitBreaker .IN_FLIGHT_REQUESTS );
294306 }
295307
296308 @ Override
297- public void setTransportService ( TransportService service ) {
298- if (service . getRequestHandler ( HANDSHAKE_ACTION_NAME ) != null ) {
299- throw new IllegalStateException ( HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered" );
309+ public synchronized < Request extends TransportRequest > void registerRequestHandler ( RequestHandlerRegistry < Request > reg ) {
310+ if (requestHandlers . containsKey ( reg . getAction ()) ) {
311+ throw new IllegalArgumentException ( "transport handlers for action " + reg . getAction () + " is already registered" );
300312 }
301- this . transportService = service ;
313+ requestHandlers = MapBuilder . newMapBuilder ( requestHandlers ). put ( reg . getAction (), reg ). immutableMap () ;
302314 }
303315
304316 private static class HandshakeResponseHandler implements TransportResponseHandler <VersionHandshakeResponse > {
@@ -482,7 +494,7 @@ public void close() {
482494 boolean block = lifecycle .stopped () && Transports .isTransportThread (Thread .currentThread ()) == false ;
483495 CloseableChannel .closeChannels (channels , block );
484496 } finally {
485- transportService .onConnectionClosed (this );
497+ transportListener .onConnectionClosed (this );
486498 }
487499 }
488500 }
@@ -538,7 +550,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
538550 logger .debug ("connected to node [{}]" , node );
539551 }
540552 try {
541- transportService .onNodeConnected (node );
553+ transportListener .onNodeConnected (node );
542554 } finally {
543555 if (nodeChannels .isClosed ()) {
544556 // we got closed concurrently due to a disconnect or some other event on the channel.
@@ -550,7 +562,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
550562 // try to remove it first either way one of the two wins even if the callback has run before we even added the
551563 // tuple to the map since in that case we remove it here again
552564 if (connectedNodes .remove (node , nodeChannels )) {
553- transportService .onNodeDisconnected (node );
565+ transportListener .onNodeDisconnected (node );
554566 }
555567 throw new NodeNotConnectedException (node , "connection concurrently closed" );
556568 }
@@ -652,7 +664,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
652664 // At this point we should construct the connection, notify the transport service, and attach close listeners to the
653665 // underlying channels.
654666 nodeChannels = new NodeChannels (node , channels , connectionProfile , version );
655- transportService .onConnectionOpened (nodeChannels );
667+ transportListener .onConnectionOpened (nodeChannels );
656668 final NodeChannels finalNodeChannels = nodeChannels ;
657669 final AtomicBoolean runOnce = new AtomicBoolean (false );
658670 Consumer <TcpChannel > onClose = c -> {
@@ -695,7 +707,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n
695707 if (closeLock .readLock ().tryLock ()) {
696708 try {
697709 if (connectedNodes .remove (node , nodeChannels )) {
698- transportService .onNodeDisconnected (node );
710+ transportListener .onNodeDisconnected (node );
699711 }
700712 } finally {
701713 closeLock .readLock ().unlock ();
@@ -722,7 +734,7 @@ public void disconnectFromNode(DiscoveryNode node) {
722734 } finally {
723735 closeLock .readLock ().unlock ();
724736 if (nodeChannels != null ) { // if we found it and removed it we close and notify
725- IOUtils .closeWhileHandlingException (nodeChannels , () -> transportService .onNodeDisconnected (node ));
737+ IOUtils .closeWhileHandlingException (nodeChannels , () -> transportListener .onNodeDisconnected (node ));
726738 }
727739 }
728740 }
@@ -979,7 +991,7 @@ protected final void doStop() {
979991 Map .Entry <DiscoveryNode , NodeChannels > next = iterator .next ();
980992 try {
981993 IOUtils .closeWhileHandlingException (next .getValue ());
982- transportService .onNodeDisconnected (next .getKey ());
994+ transportListener .onNodeDisconnected (next .getKey ());
983995 } finally {
984996 iterator .remove ();
985997 }
@@ -1133,7 +1145,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
11331145 final TransportRequestOptions finalOptions = options ;
11341146 // this might be called in a different thread
11351147 SendListener onRequestSent = new SendListener (channel , stream ,
1136- () -> transportService .onRequestSent (node , requestId , action , request , finalOptions ), message .length ());
1148+ () -> transportListener .onRequestSent (node , requestId , action , request , finalOptions ), message .length ());
11371149 internalSendMessage (channel , message , onRequestSent );
11381150 addedReleaseListener = true ;
11391151 } finally {
@@ -1187,7 +1199,7 @@ public void sendErrorResponse(
11871199 final BytesReference header = buildHeader (requestId , status , nodeVersion , bytes .length ());
11881200 CompositeBytesReference message = new CompositeBytesReference (header , bytes );
11891201 SendListener onResponseSent = new SendListener (channel , null ,
1190- () -> transportService .onResponseSent (requestId , action , error ), message .length ());
1202+ () -> transportListener .onResponseSent (requestId , action , error ), message .length ());
11911203 internalSendMessage (channel , message , onResponseSent );
11921204 }
11931205 }
@@ -1236,7 +1248,7 @@ private void sendResponse(
12361248 final TransportResponseOptions finalOptions = options ;
12371249 // this might be called in a different thread
12381250 SendListener listener = new SendListener (channel , stream ,
1239- () -> transportService .onResponseSent (requestId , action , response , finalOptions ), message .length ());
1251+ () -> transportListener .onResponseSent (requestId , action , response , finalOptions ), message .length ());
12401252 internalSendMessage (channel , message , listener );
12411253 addedReleaseListener = true ;
12421254 } finally {
@@ -1492,7 +1504,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel)
14921504 if (isHandshake ) {
14931505 handler = pendingHandshakes .remove (requestId );
14941506 } else {
1495- TransportResponseHandler theHandler = transportService .onResponseReceived (requestId );
1507+ TransportResponseHandler theHandler = responseHandlers .onResponseReceived (requestId , transportListener );
14961508 if (theHandler == null && TransportStatus .isError (status )) {
14971509 handler = pendingHandshakes .remove (requestId );
14981510 } else {
@@ -1599,15 +1611,15 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
15991611 features = Collections .emptySet ();
16001612 }
16011613 final String action = stream .readString ();
1602- transportService .onRequestReceived (requestId , action );
1614+ transportListener .onRequestReceived (requestId , action );
16031615 TransportChannel transportChannel = null ;
16041616 try {
16051617 if (TransportStatus .isHandshake (status )) {
16061618 final VersionHandshakeResponse response = new VersionHandshakeResponse (getCurrentVersion ());
16071619 sendResponse (version , features , channel , response , requestId , HANDSHAKE_ACTION_NAME , TransportResponseOptions .EMPTY ,
16081620 TransportStatus .setHandshake ((byte ) 0 ));
16091621 } else {
1610- final RequestHandlerRegistry reg = transportService . getRequestHandler (action );
1622+ final RequestHandlerRegistry reg = getRequestHandler (action );
16111623 if (reg == null ) {
16121624 throw new ActionNotFoundTransportException (action );
16131625 }
@@ -1714,7 +1726,7 @@ public void writeTo(StreamOutput out) throws IOException {
17141726 protected Version executeHandshake (DiscoveryNode node , TcpChannel channel , TimeValue timeout )
17151727 throws IOException , InterruptedException {
17161728 numHandshakes .inc ();
1717- final long requestId = newRequestId ();
1729+ final long requestId = responseHandlers . newRequestId ();
17181730 final HandshakeResponseHandler handler = new HandshakeResponseHandler (channel );
17191731 AtomicReference <Version > versionRef = handler .versionRef ;
17201732 AtomicReference <Exception > exceptionRef = handler .exceptionRef ;
@@ -1764,11 +1776,6 @@ final long getNumHandshakes() {
17641776 return numHandshakes .count (); // for testing
17651777 }
17661778
1767- @ Override
1768- public long newRequestId () {
1769- return requestIdGenerator .incrementAndGet ();
1770- }
1771-
17721779 /**
17731780 * Called once the channel is closed for instance due to a disconnect or a closed socket etc.
17741781 */
@@ -1912,4 +1919,82 @@ public ProfileSettings(Settings settings, String profileName) {
19121919 PUBLISH_PORT_PROFILE .getConcreteSettingForNamespace (profileName ).get (settings );
19131920 }
19141921 }
1922+
1923+ private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
1924+ private final List <TransportConnectionListener > listeners = new CopyOnWriteArrayList <>();
1925+
1926+ @ Override
1927+ public void onRequestReceived (long requestId , String action ) {
1928+ for (TransportConnectionListener listener : listeners ) {
1929+ listener .onRequestReceived (requestId , action );
1930+ }
1931+ }
1932+
1933+ @ Override
1934+ public void onResponseSent (long requestId , String action , TransportResponse response , TransportResponseOptions finalOptions ) {
1935+ for (TransportConnectionListener listener : listeners ) {
1936+ listener .onResponseSent (requestId , action , response , finalOptions );
1937+ }
1938+ }
1939+
1940+ @ Override
1941+ public void onResponseSent (long requestId , String action , Exception error ) {
1942+ for (TransportConnectionListener listener : listeners ) {
1943+ listener .onResponseSent (requestId , action , error );
1944+ }
1945+ }
1946+
1947+ @ Override
1948+ public void onRequestSent (DiscoveryNode node , long requestId , String action , TransportRequest request ,
1949+ TransportRequestOptions finalOptions ) {
1950+ for (TransportConnectionListener listener : listeners ) {
1951+ listener .onRequestSent (node , requestId , action , request , finalOptions );
1952+ }
1953+ }
1954+
1955+ @ Override
1956+ public void onNodeDisconnected (DiscoveryNode key ) {
1957+ for (TransportConnectionListener listener : listeners ) {
1958+ listener .onNodeDisconnected (key );
1959+ }
1960+ }
1961+
1962+ @ Override
1963+ public void onConnectionOpened (Connection nodeChannels ) {
1964+ for (TransportConnectionListener listener : listeners ) {
1965+ listener .onConnectionOpened (nodeChannels );
1966+ }
1967+ }
1968+
1969+ @ Override
1970+ public void onNodeConnected (DiscoveryNode node ) {
1971+ for (TransportConnectionListener listener : listeners ) {
1972+ listener .onNodeConnected (node );
1973+ }
1974+ }
1975+
1976+ @ Override
1977+ public void onConnectionClosed (Connection nodeChannels ) {
1978+ for (TransportConnectionListener listener : listeners ) {
1979+ listener .onConnectionClosed (nodeChannels );
1980+ }
1981+ }
1982+
1983+ @ Override
1984+ public void onResponseReceived (long requestId , ResponseContext holder ) {
1985+ for (TransportConnectionListener listener : listeners ) {
1986+ listener .onResponseReceived (requestId , holder );
1987+ }
1988+ }
1989+ }
1990+
1991+ @ Override
1992+ public final ResponseHandlers getResponseHandlers () {
1993+ return responseHandlers ;
1994+ }
1995+
1996+ @ Override
1997+ public final RequestHandlerRegistry getRequestHandler (String action ) {
1998+ return requestHandlers .get (action );
1999+ }
19152000}
0 commit comments