-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Decouple nio constructs from the tcp transport #27484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
b35a1a8
23844d3
187f870
81ec8a9
bb38080
4287cad
a8f1fe1
e0420a7
6cfbb2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,10 +33,12 @@ | |
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.TcpTransport; | ||
| import org.elasticsearch.transport.Transports; | ||
| import org.elasticsearch.transport.nio.channel.ChannelFactory; | ||
| import org.elasticsearch.transport.nio.channel.NioChannel; | ||
| import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; | ||
| import org.elasticsearch.transport.nio.channel.NioSocketChannel; | ||
| import org.elasticsearch.transport.nio.channel.TcpChannelFactory; | ||
| import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel; | ||
| import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel; | ||
| import org.elasticsearch.transport.nio.channel.TcpReadContext; | ||
| import org.elasticsearch.transport.nio.channel.TcpWriteContext; | ||
|
|
||
|
|
@@ -65,12 +67,12 @@ public class NioTransport extends TcpTransport { | |
| public static final Setting<Integer> NIO_ACCEPTOR_COUNT = | ||
| intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); | ||
|
|
||
| protected final OpenChannels openChannels = new OpenChannels(logger); | ||
| private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap(); | ||
| private final OpenChannels openChannels = new OpenChannels(logger); | ||
| private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap(); | ||
| private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>(); | ||
| private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>(); | ||
| private RoundRobinSelectorSupplier clientSelectorSupplier; | ||
| private ChannelFactory clientChannelFactory; | ||
| private TcpChannelFactory clientChannelFactory; | ||
| private int acceptorNumber; | ||
|
|
||
| public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, | ||
|
|
@@ -84,17 +86,21 @@ public long getNumOpenServerConnections() { | |
| } | ||
|
|
||
| @Override | ||
| protected NioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { | ||
| ChannelFactory channelFactory = this.profileToChannelFactory.get(name); | ||
| protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { | ||
| TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name); | ||
| AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings)); | ||
| return channelFactory.openNioServerSocketChannel(address, selector); | ||
| TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector); | ||
| openChannels.serverChannelOpened(serverChannel); | ||
| serverChannel.addCloseListener(new RemoveClosedChannelListener(serverChannel)); | ||
| return serverChannel; | ||
| } | ||
|
|
||
| @Override | ||
| protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener) | ||
| protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener) | ||
| throws IOException { | ||
| NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get()); | ||
| TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get()); | ||
| openChannels.clientChannelOpened(channel); | ||
| channel.addCloseListener(new RemoveClosedChannelListener(channel)); | ||
| channel.addConnectListener(connectListener); | ||
| return channel; | ||
| } | ||
|
|
@@ -119,14 +125,14 @@ protected void doStart() { | |
|
|
||
| Consumer<NioSocketChannel> clientContextSetter = getContextSetter("client-socket"); | ||
| clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); | ||
| clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), clientContextSetter); | ||
| ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); | ||
| clientChannelFactory = new TcpChannelFactory(clientProfileSettings, clientContextSetter, getServerContextSetter()); | ||
|
|
||
| if (NetworkService.NETWORK_SERVER.get(settings)) { | ||
| int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); | ||
| for (int i = 0; i < acceptorCount; ++i) { | ||
| Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); | ||
| AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, openChannels, selectorSupplier, | ||
| this::serverAcceptedChannel); | ||
| AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, selectorSupplier); | ||
| AcceptingSelector acceptor = new AcceptingSelector(eventHandler); | ||
| acceptors.add(acceptor); | ||
| } | ||
|
|
@@ -143,7 +149,8 @@ protected void doStart() { | |
| for (ProfileSettings profileSettings : profileSettings) { | ||
| String profileName = profileSettings.profileName; | ||
| Consumer<NioSocketChannel> contextSetter = getContextSetter(profileName); | ||
| profileToChannelFactory.putIfAbsent(profileName, new ChannelFactory(profileSettings, contextSetter)); | ||
| TcpChannelFactory factory = new TcpChannelFactory(profileSettings, contextSetter, getServerContextSetter()); | ||
| profileToChannelFactory.putIfAbsent(profileName, factory); | ||
| bindServer(profileSettings); | ||
| } | ||
| } | ||
|
|
@@ -169,14 +176,48 @@ protected void stopInternal() { | |
| } | ||
|
|
||
| protected SocketEventHandler getSocketEventHandler() { | ||
| return new SocketEventHandler(logger, this::exceptionCaught, openChannels); | ||
| return new SocketEventHandler(logger); | ||
| } | ||
|
|
||
| final void exceptionCaught(NioSocketChannel channel, Exception exception) { | ||
| onException(channel, exception); | ||
| onException((TcpNioSocketChannel) channel, exception); | ||
| } | ||
|
|
||
| private Consumer<NioSocketChannel> getContextSetter(String profileName) { | ||
| return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c)); | ||
| return (c) -> { | ||
| c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c)); | ||
| c.setExceptionHandler(this::exceptionCaught); | ||
| }; | ||
| } | ||
|
|
||
| private void acceptChannel(NioSocketChannel channel) { | ||
| TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel; | ||
| openChannels.acceptedChannelOpened(tcpChannel); | ||
| tcpChannel.addCloseListener(new RemoveClosedChannelListener(channel)); | ||
|
||
| serverAcceptedChannel(tcpChannel); | ||
|
|
||
| } | ||
|
|
||
| private Consumer<NioServerSocketChannel> getServerContextSetter() { | ||
| return (c) -> c.setAcceptContext(this::acceptChannel); | ||
| } | ||
|
|
||
| private class RemoveClosedChannelListener implements ActionListener<Void> { | ||
|
|
||
| private final NioChannel channel; | ||
|
|
||
| private RemoveClosedChannelListener(NioChannel channel) { | ||
| this.channel = channel; | ||
| } | ||
|
|
||
| @Override | ||
| public void onResponse(Void aVoid) { | ||
| openChannels.channelClosed(channel); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| openChannels.channelClosed(channel); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover?