-
Notifications
You must be signed in to change notification settings - Fork 1.9k
IGNITE-13012 Make node connection checking rely on the configuration. Simplify node ping routine. #7835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-13012 Make node connection checking rely on the configuration. Simplify node ping routine. #7835
Changes from 29 commits
3b66856
675c069
e4ddf05
bf93ac1
e729270
47a9f7d
2c929fa
245943a
f7d58ae
8f4dabf
62f5d6a
dc23756
7089343
3515f40
9dca4f1
bd00c20
c464725
5370831
a9ad35e
a8fad43
45c426f
0d58fe4
7f7a608
dde7e7c
7b40043
e1b9735
1b07dd5
a4be000
d9c3108
71435c2
322242a
be6e2ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -199,8 +199,11 @@ class ServerImpl extends TcpDiscoveryImpl { | |
| /** */ | ||
| private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); | ||
|
|
||
| /** When this interval pass connection check will be performed. */ | ||
| private static final int CON_CHECK_INTERVAL = 500; | ||
| /** Maximal interval of connection check to next node in the ring. */ | ||
| private static final long MAX_CON_CHECK_INTERVAL = 500; | ||
|
|
||
| /** Interval of checking connection to next node in the ring. */ | ||
| private long connCheckInterval; | ||
|
|
||
| /** */ | ||
| private IgniteThreadPoolExecutor utilityPool; | ||
|
|
@@ -275,6 +278,9 @@ class ServerImpl extends TcpDiscoveryImpl { | |
| /** Last time received message from ring. */ | ||
| private volatile long lastRingMsgReceivedTime; | ||
|
|
||
| /** Time of last sent and acknowledged message. */ | ||
| private volatile long lastRingMsgSentTime; | ||
|
|
||
| /** */ | ||
| private volatile boolean nodeCompactRepresentationSupported = | ||
| true; //assume that local node supports this feature | ||
|
|
@@ -356,8 +362,8 @@ class ServerImpl extends TcpDiscoveryImpl { | |
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override public long connectionCheckInterval() { | ||
| return CON_CHECK_INTERVAL; | ||
| @Override long connectionCheckInterval() { | ||
| return connCheckInterval; | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
|
|
@@ -368,6 +374,15 @@ class ServerImpl extends TcpDiscoveryImpl { | |
|
|
||
| lastRingMsgReceivedTime = 0; | ||
|
|
||
| lastRingMsgSentTime = 0; | ||
|
|
||
| long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : | ||
| spi.getSocketTimeout() + spi.getAckTimeout(); | ||
|
|
||
| // Since we take in account time of last sent message, the interval should be quite short to give enough piece | ||
| // of failure detection timeout as send-and-acknowledge timeout of the message to send. | ||
| connCheckInterval = Math.min(msgExchangeTimeout / 4, MAX_CON_CHECK_INTERVAL); | ||
|
|
||
| utilityPool = new IgniteThreadPoolExecutor("disco-pool", | ||
| spi.ignite().name(), | ||
| 0, | ||
|
|
@@ -2846,15 +2861,6 @@ private class RingMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessag | |
| /** Last time metrics update message has been sent. */ | ||
| private long lastTimeMetricsUpdateMsgSentNanos = System.nanoTime() - U.millisToNanos(spi.metricsUpdateFreq); | ||
|
|
||
| /** Time when the last status message has been sent. */ | ||
| private long lastTimeConnCheckMsgSent; | ||
|
|
||
| /** Flag that keeps info on whether the threshold is reached or not. */ | ||
| private boolean failureThresholdReached; | ||
|
|
||
| /** Connection check threshold. */ | ||
| private long connCheckThreshold; | ||
|
|
||
| /** */ | ||
| private long lastRingMsgTimeNanos; | ||
|
|
||
|
|
@@ -2873,8 +2879,6 @@ private class RingMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessag | |
| private RingMessageWorker(IgniteLogger log) { | ||
| super("tcp-disco-msg-worker-[]", log, 10, getWorkerRegistry(spi)); | ||
|
|
||
| initConnectionCheckThreshold(); | ||
|
|
||
| setBeforeEachPollAction(() -> { | ||
| updateHeartbeat(); | ||
|
|
||
|
|
@@ -3025,19 +3029,6 @@ private void nullifyDiscoData() { | |
| joiningNodesDiscoDataList = null; | ||
| } | ||
|
|
||
| /** | ||
| * Initializes connection check frequency. Used only when failure detection timeout is enabled. | ||
| */ | ||
| private void initConnectionCheckThreshold() { | ||
| if (spi.failureDetectionTimeoutEnabled()) | ||
| connCheckThreshold = spi.failureDetectionTimeout(); | ||
| else | ||
| connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq); | ||
|
|
||
| if (log.isInfoEnabled()) | ||
| log.info("Connection check threshold is calculated: " + connCheckThreshold); | ||
| } | ||
|
|
||
| /** | ||
| * | ||
| */ | ||
|
|
@@ -3146,9 +3137,6 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) | |
| if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { | ||
| // Received a message from remote node. | ||
| onMessageExchanged(); | ||
|
|
||
| // Reset the failure flag. | ||
| failureThresholdReached = false; | ||
| } | ||
|
|
||
| if (next != null && sock != null) { | ||
|
|
@@ -3469,6 +3457,8 @@ else if (log.isTraceEnabled()) | |
| } | ||
| } | ||
|
|
||
| updateLastSentMessageTime(); | ||
|
|
||
| if (log.isDebugEnabled()) | ||
| log.debug("Initialized connection with next node: " + next.id()); | ||
|
|
||
|
|
@@ -3559,8 +3549,10 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof | |
|
|
||
| addFailedNodes(pendingMsg, failedNodes); | ||
|
|
||
| if (timeoutHelper == null) | ||
| timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); | ||
| if (timeoutHelper == null) { | ||
| timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, | ||
| lastRingMsgSentTime); | ||
| } | ||
|
|
||
| try { | ||
| spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( | ||
|
|
@@ -3574,6 +3566,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof | |
|
|
||
| int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); | ||
|
|
||
| updateLastSentMessageTime(); | ||
anton-vinogradov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos)); | ||
|
|
||
| if (log.isDebugEnabled()) | ||
|
|
@@ -3602,7 +3596,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof | |
| long tsNanos = System.nanoTime(); | ||
|
|
||
| if (timeoutHelper == null) | ||
| timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); | ||
| timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime); | ||
|
|
||
| addFailedNodes(msg, failedNodes); | ||
|
|
||
|
|
@@ -3621,6 +3615,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof | |
|
|
||
| int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); | ||
|
|
||
| updateLastSentMessageTime(); | ||
|
|
||
| if (latencyCheck && log.isInfoEnabled()) | ||
| log.info("Latency check message has been acked: " + msg.id()); | ||
|
|
||
|
|
@@ -6192,40 +6188,21 @@ private void checkMetricsReceiving() { | |
| } | ||
|
|
||
| /** | ||
| * Check connection aliveness status. | ||
| * Check connection to next node in the ring. | ||
| */ | ||
| private void checkConnection() { | ||
| Boolean hasRemoteSrvNodes = null; | ||
|
|
||
| if (spi.failureDetectionTimeoutEnabled() && !failureThresholdReached && | ||
| U.millisSinceNanos(locNode.lastExchangeTimeNanos()) >= connCheckThreshold && | ||
| spiStateCopy() == CONNECTED && | ||
| (hasRemoteSrvNodes = ring.hasRemoteServerNodes())) { | ||
|
|
||
| if (log.isInfoEnabled()) | ||
| log.info("Local node seems to be disconnected from topology (failure detection timeout " + | ||
| "is reached) [failureDetectionTimeout=" + spi.failureDetectionTimeout() + | ||
| ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); | ||
|
|
||
| failureThresholdReached = true; | ||
|
|
||
| // Reset sent time deliberately to force sending connection check message. | ||
| lastTimeConnCheckMsgSent = 0; | ||
| } | ||
|
|
||
| long elapsed = (lastTimeConnCheckMsgSent + CON_CHECK_INTERVAL) - U.currentTimeMillis(); | ||
| long elapsed = (lastRingMsgSentTime + U.millisToNanos(connCheckInterval)) - System.nanoTime(); | ||
|
|
||
| if (elapsed > 0) | ||
| return; | ||
|
|
||
| if (hasRemoteSrvNodes == null) | ||
| hasRemoteSrvNodes = ring.hasRemoteServerNodes(); | ||
|
|
||
| if (hasRemoteSrvNodes) { | ||
| if (hasRemoteSrvNodes) | ||
|
||
| sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); | ||
|
|
||
| lastTimeConnCheckMsgSent = U.currentTimeMillis(); | ||
| } | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
|
|
@@ -6234,6 +6211,11 @@ private void checkConnection() { | |
| } | ||
| } | ||
|
|
||
| /** Fixates time of last sent message. */ | ||
| private void updateLastSentMessageTime() { | ||
| lastRingMsgSentTime = System.nanoTime(); | ||
| } | ||
|
|
||
| /** Thread that executes {@link TcpServer}'s code. */ | ||
| private class TcpServerThread extends IgniteSpiThread { | ||
| /** */ | ||
|
|
@@ -6569,7 +6551,7 @@ else if (req.changeTopology()) { | |
| long now = U.currentTimeMillis(); | ||
|
|
||
| // We got message from previous in less than double connection check interval. | ||
| boolean ok = rcvdTime + CON_CHECK_INTERVAL * 2 >= now; | ||
| boolean ok = rcvdTime + connCheckInterval * 2 >= now; | ||
| TcpDiscoveryNode previous = null; | ||
|
|
||
| if (ok) { | ||
|
|
@@ -6618,7 +6600,7 @@ else if (req.changeTopology()) { | |
| ", checkPreviousNodeId=" + req.checkPreviousNodeId() + | ||
| ", actualPreviousNode=" + previous + | ||
| ", lastMessageReceivedTime=" + rcvdTime + ", now=" + now + | ||
| ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); | ||
| ", connCheckInterval=" + connCheckInterval + ']'); | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.