diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java index 2e590ce593608..48161baf9f535 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -57,6 +57,22 @@ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) adapter.clientFailureDetectionTimeout(); } + /** + * Creates timeout helper based on time of last related operation. + * + * @param adapter SPI adapter. + * @param srvOp {@code True} if communicates with server node. + * @param lastOperStartNanos Time of last related operation in nanos. + */ + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastOperStartNanos) { + this(adapter, srvOp); + + this.lastOperStartNanos = lastOperStartNanos; + + if (lastOperStartNanos > 0) + timeout = failureDetectionTimeout; + } + /** * Returns a timeout value to use for the next network operation. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index df9f1d45fef0c..dc90f50f50e70 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -199,8 +199,11 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); - /** When this interval pass connection check will be performed. */ - private static final int CON_CHECK_INTERVAL = 500; + /** Maximal interval of connection check to next node in the ring. */ + private static final long MAX_CON_CHECK_INTERVAL = 500; + + /** Interval of checking connection to next node in the ring. */ + private long connCheckInterval; /** */ private IgniteThreadPoolExecutor utilityPool; @@ -275,6 +278,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Last time received message from ring. */ private volatile long lastRingMsgReceivedTime; + /** Time of last sent and acknowledged message. */ + private volatile long lastRingMsgSentTime; + /** */ private volatile boolean nodeCompactRepresentationSupported = true; //assume that local node supports this feature @@ -357,7 +363,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public long connectionCheckInterval() { - return CON_CHECK_INTERVAL; + return connCheckInterval; } /** {@inheritDoc} */ @@ -368,6 +374,15 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgReceivedTime = 0; + lastRingMsgSentTime = 0; + + long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout() + spi.getAckTimeout(); + + // Since we take in account time of last sent message, the interval should be quite short to give enough piece + // of failure detection timeout as send-and-acknowledge timeout of the message to send. + connCheckInterval = Math.min(msgExchangeTimeout / 4, MAX_CON_CHECK_INTERVAL); + utilityPool = new IgniteThreadPoolExecutor("disco-pool", spi.ignite().name(), 0, @@ -2846,15 +2861,6 @@ private class RingMessageWorker extends MessageWorker { updateHeartbeat(); @@ -3025,19 +3029,6 @@ private void nullifyDiscoData() { joiningNodesDiscoDataList = null; } - /** - * Initializes connection check frequency. Used only when failure detection timeout is enabled. - */ - private void initConnectionCheckThreshold() { - if (spi.failureDetectionTimeoutEnabled()) - connCheckThreshold = spi.failureDetectionTimeout(); - else - connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq); - - if (log.isInfoEnabled()) - log.info("Connection check threshold is calculated: " + connCheckThreshold); - } - /** * */ @@ -3146,9 +3137,6 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { // Received a message from remote node. onMessageExchanged(); - - // Reset the failure flag. - failureThresholdReached = false; } if (next != null && sock != null) { @@ -3469,6 +3457,8 @@ else if (log.isTraceEnabled()) } } + updateLastSentMessageTime(); + if (log.isDebugEnabled()) log.debug("Initialized connection with next node: " + next.id()); @@ -3559,8 +3549,10 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof addFailedNodes(pendingMsg, failedNodes); - if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + if (timeoutHelper == null) { + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, + lastRingMsgSentTime); + } try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( @@ -3574,6 +3566,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + updateLastSentMessageTime(); + spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos)); if (log.isDebugEnabled()) @@ -3602,7 +3596,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof long tsNanos = System.nanoTime(); if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime); addFailedNodes(msg, failedNodes); @@ -3621,6 +3615,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + updateLastSentMessageTime(); + if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been acked: " + msg.id()); @@ -6192,40 +6188,16 @@ private void checkMetricsReceiving() { } /** - * Check connection aliveness status. + * Check connection to next node in the ring. */ private void checkConnection() { - Boolean hasRemoteSrvNodes = null; - - if (spi.failureDetectionTimeoutEnabled() && !failureThresholdReached && - U.millisSinceNanos(locNode.lastExchangeTimeNanos()) >= connCheckThreshold && - spiStateCopy() == CONNECTED && - (hasRemoteSrvNodes = ring.hasRemoteServerNodes())) { - - if (log.isInfoEnabled()) - log.info("Local node seems to be disconnected from topology (failure detection timeout " + - "is reached) [failureDetectionTimeout=" + spi.failureDetectionTimeout() + - ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); - - failureThresholdReached = true; - - // Reset sent time deliberately to force sending connection check message. - lastTimeConnCheckMsgSent = 0; - } - - long elapsed = (lastTimeConnCheckMsgSent + CON_CHECK_INTERVAL) - U.currentTimeMillis(); + long elapsed = (lastRingMsgSentTime + U.millisToNanos(connCheckInterval)) - System.nanoTime(); if (elapsed > 0) return; - if (hasRemoteSrvNodes == null) - hasRemoteSrvNodes = ring.hasRemoteServerNodes(); - - if (hasRemoteSrvNodes) { + if (ring.hasRemoteServerNodes()) sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); - - lastTimeConnCheckMsgSent = U.currentTimeMillis(); - } } /** {@inheritDoc} */ @@ -6234,6 +6206,11 @@ private void checkConnection() { } } + /** Fixates time of last sent message. */ + private void updateLastSentMessageTime() { + lastRingMsgSentTime = System.nanoTime(); + } + /** Thread that executes {@link TcpServer}'s code. */ private class TcpServerThread extends IgniteSpiThread { /** */ @@ -6569,7 +6546,7 @@ else if (req.changeTopology()) { long now = U.currentTimeMillis(); // We got message from previous in less than double connection check interval. - boolean ok = rcvdTime + CON_CHECK_INTERVAL * 2 >= now; + boolean ok = rcvdTime + connCheckInterval * 2 >= now; TcpDiscoveryNode previous = null; if (ok) { @@ -6618,7 +6595,7 @@ else if (req.changeTopology()) { ", checkPreviousNodeId=" + req.checkPreviousNodeId() + ", actualPreviousNode=" + previous + ", lastMessageReceivedTime=" + rcvdTime + ", now=" + now + - ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); + ", connCheckInterval=" + connCheckInterval + ']'); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java index f29b05f78a66a..2c0aa013fdce3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java @@ -54,6 +54,9 @@ public class CacheContinuousQueryFilterDeploymentFailedTest extends GridCommonAb @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + // Failure detection timeout > P2P class loading timeout which is set as network timeout. + cfg.setFailureDetectionTimeout(cfg.getNetworkTimeout() * 2); + ((TestTcpDiscoverySpi)cfg.getDiscoverySpi()).discoveryHook(new DiscoveryHook() { @Override public void afterDiscovery(DiscoveryCustomMessage customMsg) { if (customMsg instanceof StopRoutineDiscoveryMessage)