Skip to content

Commit ecf4503

Browse files
committed
Make keepalive pings bidirectional and optimizable (elastic#35441)
This is related to elastic#34405 and a follow-up to elastic#34753. It makes a number of changes to our current keepalive pings. The ping interval configuration is moved to the ConnectionProfile. The server channel now responds to pings. This makes the keepalive pings bidirectional. On the client-side, the pings can now be optimized away. What this means is that if the channel has received a message or sent a message since the last pinging round, the ping is not sent for this round.
1 parent 8d58334 commit ecf4503

File tree

23 files changed

+918
-474
lines changed

23 files changed

+918
-474
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@
3535
public class Netty4TcpChannel implements TcpChannel {
3636

3737
private final Channel channel;
38+
private final boolean isServer;
3839
private final String profile;
3940
private final CompletableContext<Void> connectContext;
4041
private final CompletableContext<Void> closeContext = new CompletableContext<>();
42+
private final ChannelStats stats = new ChannelStats();
4143

42-
Netty4TcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) {
44+
Netty4TcpChannel(Channel channel, boolean isServer, String profile, @Nullable ChannelFuture connectFuture) {
4345
this.channel = channel;
46+
this.isServer = isServer;
4447
this.profile = profile;
4548
this.connectContext = new CompletableContext<>();
4649
this.channel.closeFuture().addListener(f -> {
@@ -77,6 +80,11 @@ public void close() {
7780
channel.close();
7881
}
7982

83+
@Override
84+
public boolean isServerChannel() {
85+
return isServer;
86+
}
87+
8088
@Override
8189
public String getProfile() {
8290
return profile;
@@ -92,6 +100,11 @@ public void addConnectListener(ActionListener<Void> listener) {
92100
connectContext.addListener(ActionListener.toBiConsumer(listener));
93101
}
94102

103+
@Override
104+
public ChannelStats getChannelStats() {
105+
return stats;
106+
}
107+
95108
@Override
96109
public boolean isOpen() {
97110
return channel.isOpen();

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOExceptio
232232
}
233233
addClosedExceptionLogger(channel);
234234

235-
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default", connectFuture);
235+
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
236236
channel.attr(CHANNEL_KEY).set(nettyChannel);
237237

238238
return nettyChannel;
@@ -246,14 +246,6 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
246246
return esChannel;
247247
}
248248

249-
long successfulPingCount() {
250-
return successfulPings.count();
251-
}
252-
253-
long failedPingCount() {
254-
return failedPings.count();
255-
}
256-
257249
@Override
258250
@SuppressForbidden(reason = "debug")
259251
protected void stopInternal() {
@@ -297,8 +289,7 @@ protected ServerChannelInitializer(String name) {
297289
@Override
298290
protected void initChannel(Channel ch) throws Exception {
299291
addClosedExceptionLogger(ch);
300-
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name, ch.newSucceededFuture());
301-
292+
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
302293
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
303294
serverAcceptedChannel(nettyTcpChannel);
304295
ch.pipeline().addLast("logging", new ESLoggingHandler());

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

Lines changed: 0 additions & 139 deletions
This file was deleted.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common;
20+
21+
import org.elasticsearch.action.ActionListener;
22+
23+
/**
24+
* A {@link java.util.function.BiFunction}-like interface designed to be used with asynchronous executions.
25+
*/
26+
public interface AsyncBiFunction<T,U,C> {
27+
28+
void apply(T t, U u, ActionListener<C> listener);
29+
}

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 6 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@
2727
import org.elasticsearch.common.component.Lifecycle;
2828
import org.elasticsearch.common.lease.Releasable;
2929
import org.elasticsearch.common.settings.Settings;
30-
import org.elasticsearch.common.unit.TimeValue;
31-
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
3230
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
33-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3431
import org.elasticsearch.common.util.concurrent.KeyedLock;
3532
import org.elasticsearch.core.internal.io.IOUtils;
3633
import org.elasticsearch.threadpool.ThreadPool;
@@ -53,32 +50,28 @@
5350
* the connection when the connection manager is closed.
5451
*/
5552
public class ConnectionManager implements Closeable {
53+
5654
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
5755

5856
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
5957
private final KeyedLock<String> connectionLock = new KeyedLock<>();
6058
private final Transport transport;
6159
private final ThreadPool threadPool;
62-
private final TimeValue pingSchedule;
6360
private final ConnectionProfile defaultProfile;
6461
private final Lifecycle lifecycle = new Lifecycle();
6562
private final AtomicBoolean closed = new AtomicBoolean(false);
6663
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
6764
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
6865

6966
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
70-
this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings));
67+
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool);
7168
}
7269

73-
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
70+
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) {
7471
this.transport = transport;
7572
this.threadPool = threadPool;
76-
this.pingSchedule = pingSchedule;
77-
this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings);
73+
this.defaultProfile = connectionProfile;
7874
this.lifecycle.moveToStarted();
79-
if (pingSchedule.millis() > 0) {
80-
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
81-
}
8275
}
8376

8477
public void addListener(TransportConnectionListener listener) {
@@ -251,47 +244,8 @@ private void ensureOpen() {
251244
}
252245
}
253246

254-
TimeValue getPingSchedule() {
255-
return pingSchedule;
256-
}
257-
258-
private class ScheduledPing extends AbstractLifecycleRunnable {
259-
260-
private ScheduledPing() {
261-
super(lifecycle, logger);
262-
}
263-
264-
@Override
265-
protected void doRunInLifecycle() {
266-
for (Map.Entry<DiscoveryNode, Transport.Connection> entry : connectedNodes.entrySet()) {
267-
Transport.Connection connection = entry.getValue();
268-
if (connection.sendPing() == false) {
269-
logger.warn("attempted to send ping to connection without support for pings [{}]", connection);
270-
}
271-
}
272-
}
273-
274-
@Override
275-
protected void onAfterInLifecycle() {
276-
try {
277-
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this);
278-
} catch (EsRejectedExecutionException ex) {
279-
if (ex.isExecutorShutdown()) {
280-
logger.debug("couldn't schedule new ping execution, executor is shutting down", ex);
281-
} else {
282-
throw ex;
283-
}
284-
}
285-
}
286-
287-
@Override
288-
public void onFailure(Exception e) {
289-
if (lifecycle.stoppedOrClosed()) {
290-
logger.trace("failed to send ping transport message", e);
291-
} else {
292-
logger.warn("failed to send ping transport message", e);
293-
}
294-
}
247+
ConnectionProfile getConnectionProfile() {
248+
return defaultProfile;
295249
}
296250

297251
private static final class DelegatingNodeConnectionListener implements TransportConnectionListener {

0 commit comments

Comments
 (0)