From 3b668562077ada39800f8010bf28bc084218b462 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 4 Apr 2020 23:18:11 +0300 Subject: [PATCH 01/26] IGNITE-13021 : First impl. --- .../platform/utils/PlatformUtils.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 90 ++++++++----------- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +- .../platform/PlatformDeployServiceTask.java | 48 +++++++++- .../Services/ServicesTest.cs | 47 ++++++++++ .../Impl/Services/ServiceProxySerializer.cs | 13 +++ 6 files changed, 142 insertions(+), 60 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index a2f7636961278..2c1d0b29fdecf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.platform.utils; +import java.sql.Timestamp; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -66,7 +67,6 @@ import javax.cache.event.EventType; import java.lang.reflect.Field; import java.math.BigDecimal; -import java.security.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2886478c10405..e604676510a5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -199,8 +199,8 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); - /** When this interval pass connection check will be performed. */ - private static final int CON_CHECK_INTERVAL = 500; + /** Interval of checking connection to next node in the ring. */ + private long connCheckInterval; /** */ private IgniteThreadPoolExecutor utilityPool; @@ -275,6 +275,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Last time received message from ring. */ private volatile long lastRingMsgReceivedTime; + /** Time of last sent and answered message. */ + private volatile long lastRingMsgSentTime; + /** */ private volatile boolean nodeCompactRepresentationSupported = true; //assume that local node supports this feature @@ -356,8 +359,8 @@ class ServerImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ - @Override public long connectionCheckInterval() { - return CON_CHECK_INTERVAL; + @Override long connectionCheckInterval() { + return connCheckInterval; } /** {@inheritDoc} */ @@ -368,6 +371,10 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgReceivedTime = 0; + lastRingMsgSentTime = 0; + + initConnectionCheckInterval(); + utilityPool = new IgniteThreadPoolExecutor("disco-pool", spi.ignite().name(), 0, @@ -439,6 +446,14 @@ class ServerImpl extends TcpDiscoveryImpl { spi.printStartInfo(); } + /** Determines interval of connection checking to next node in the ring. */ + private void initConnectionCheckInterval(){ + long sendAndReadTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout() + spi.getAckTimeout(); + + connCheckInterval = sendAndReadTimeout / 2; + } + /** {@inheritDoc} */ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { spiCtx.registerPort(tcpSrvr.port, TCP); @@ -2823,15 +2838,6 @@ private class RingMessageWorker extends MessageWorker { updateHeartbeat(); @@ -3002,19 +3006,6 @@ private void nullifyDiscoData() { joiningNodesDiscoDataList = null; } - /** - * Initializes connection check frequency. Used only when failure detection timeout is enabled. - */ - private void initConnectionCheckThreshold() { - if (spi.failureDetectionTimeoutEnabled()) - connCheckThreshold = spi.failureDetectionTimeout(); - else - connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq); - - if (log.isInfoEnabled()) - log.info("Connection check threshold is calculated: " + connCheckThreshold); - } - /** * */ @@ -3123,9 +3114,6 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { // Received a message from remote node. onMessageExchanged(); - - // Reset the failure flag. - failureThresholdReached = false; } if (next != null && sock != null) { @@ -3446,6 +3434,8 @@ else if (log.isTraceEnabled()) } } + updateLastSentMessageTime(); + if (log.isDebugEnabled()) log.debug("Initialized connection with next node: " + next.id()); @@ -3551,6 +3541,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + updateLastSentMessageTime(); + spi.stats.onMessageSent(pendingMsg, U.nanosToMillis(tsNanos0 - tsNanos)); if (log.isDebugEnabled()) @@ -3598,6 +3590,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + updateLastSentMessageTime(); + if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been acked: " + msg.id()); @@ -6212,28 +6206,12 @@ private void checkMetricsReceiving() { } /** - * Check connection aliveness status. + * Check connection to next node in the ring. */ private void checkConnection() { Boolean hasRemoteSrvNodes = null; - if (spi.failureDetectionTimeoutEnabled() && !failureThresholdReached && - U.millisSinceNanos(locNode.lastExchangeTimeNanos()) >= connCheckThreshold && - spiStateCopy() == CONNECTED && - (hasRemoteSrvNodes = ring.hasRemoteServerNodes())) { - - if (log.isInfoEnabled()) - log.info("Local node seems to be disconnected from topology (failure detection timeout " + - "is reached) [failureDetectionTimeout=" + spi.failureDetectionTimeout() + - ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); - - failureThresholdReached = true; - - // Reset sent time deliberately to force sending connection check message. - lastTimeConnCheckMsgSent = 0; - } - - long elapsed = (lastTimeConnCheckMsgSent + CON_CHECK_INTERVAL) - U.currentTimeMillis(); + long elapsed = (lastRingMsgSentTime + connCheckInterval) - U.currentTimeMillis(); if (elapsed > 0) return; @@ -6241,11 +6219,8 @@ private void checkConnection() { if (hasRemoteSrvNodes == null) hasRemoteSrvNodes = ring.hasRemoteServerNodes(); - if (hasRemoteSrvNodes) { + if (hasRemoteSrvNodes) sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); - - lastTimeConnCheckMsgSent = U.currentTimeMillis(); - } } /** {@inheritDoc} */ @@ -6254,6 +6229,11 @@ private void checkConnection() { } } + /** Fixates time of last sent message. */ + private void updateLastSentMessageTime() { + lastRingMsgSentTime = U.currentTimeMillis(); + } + /** Thread that executes {@link TcpServer}'s code. */ private class TcpServerThread extends IgniteSpiThread { /** */ @@ -6589,7 +6569,7 @@ else if (req.changeTopology()) { long now = U.currentTimeMillis(); // We got message from previous in less than double connection check interval. - boolean ok = rcvdTime + CON_CHECK_INTERVAL * 2 >= now; + boolean ok = rcvdTime + connCheckInterval * 2 >= now; TcpDiscoveryNode previous = null; if (ok) { @@ -6638,7 +6618,7 @@ else if (req.changeTopology()) { ", checkPreviousNodeId=" + req.checkPreviousNodeId() + ", actualPreviousNode=" + previous + ", lastMessageReceivedTime=" + rcvdTime + ", now=" + now + - ", connCheckInterval=" + CON_CHECK_INTERVAL + ']'); + ", connCheckInterval=" + connCheckInterval + ']'); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index bbd78fe4f5d55..e9fb02ed04281 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -286,7 +286,7 @@ public int boundPort() throws IgniteSpiException { /** * @return connection check interval. */ - public long connectionCheckInterval() { + long connectionCheckInterval() { return 0; } diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java index dda44c493d0ac..ca9e0bd2e8da3 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java @@ -17,6 +17,8 @@ package org.apache.ignite.platform; +import java.sql.Timestamp; +import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; @@ -38,6 +40,8 @@ import java.util.List; import java.util.Map; +import static java.util.Calendar.JANUARY; + /** * Task that deploys a Java service. */ @@ -176,6 +180,21 @@ public String test(String arg) { return arg == null ? null : arg + "!"; } + /** */ + public Timestamp test(Timestamp input) { + Timestamp exp = new Timestamp(92, JANUARY, 1, 0, 0, 0, 0); + + if (!exp.equals(input)) + throw new RuntimeException("Expected \"" + exp + "\" but got \"" + input + "\""); + + return input; + } + + /** */ + public UUID test(UUID input) { + return input; + } + /** */ public Byte testWrapper(Byte arg) { return arg == null ? null : (byte) (arg + 1); @@ -203,17 +222,17 @@ public Float testWrapper(Float arg) { /** */ public Double testWrapper(Double arg) { - return arg == null ? null : arg + 2.5; + return arg == null ? null : arg + 2.5; } /** */ public Boolean testWrapper(Boolean arg) { - return arg == null ? null : !arg; + return arg == null ? null : !arg; } /** */ public Character testWrapper(Character arg) { - return arg == null ? null : (char) (arg + 1); + return arg == null ? null : (char) (arg + 1); } /** */ @@ -297,11 +316,34 @@ public boolean[] testArray(boolean[] arg) { return arg; } + /** */ + public Timestamp[] testArray(Timestamp[] arg) { + if (arg == null || arg.length != 1) + throw new RuntimeException("Expected array of length 1"); + + return new Timestamp[] {test(arg[0])}; + } + + /** */ + public UUID[] testArray(UUID[] arg) { + return arg; + } + /** */ public Integer testNull(Integer arg) { return arg == null ? null : arg + 1; } + /** */ + public UUID testNullUUID(UUID arg) { + return arg; + } + + /** */ + public Timestamp testNullTimestamp(Timestamp arg) { + return arg; + } + /** */ public int testParams(Object... args) { return args.length; diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs index 32a7f73dcd168..7038c84f9d93a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs @@ -870,6 +870,20 @@ public void TestCallJavaService() binSvc.testBinaryObject( Grid1.GetBinary().ToBinary(new PlatformComputeBinarizable {Field = 6})) .GetField("Field")); + + DateTime dt = new DateTime(1992, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); + + Assert.AreEqual(dt, svc.test(dt)); + Assert.AreEqual(dt, svc.testNullTimestamp(dt)); + Assert.IsNull(svc.testNullTimestamp(null)); + Assert.AreEqual(dt, svc.testArray(new DateTime?[] {dt})[0]); + + Guid guid = Guid.NewGuid(); + + Assert.AreEqual(guid, svc.test(guid)); + Assert.AreEqual(guid, svc.testNullUUID(guid)); + Assert.IsNull(svc.testNullUUID(null)); + Assert.AreEqual(guid, svc.testArray(new Guid?[] {guid})[0]); Services.Cancel(javaSvcName); } @@ -940,6 +954,20 @@ public void TestCallJavaServiceDynamicProxy() binSvc.testBinaryObject( Grid1.GetBinary().ToBinary(new PlatformComputeBinarizable { Field = 6 })) .GetField("Field")); + + DateTime dt = new DateTime(1992, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); + + Assert.AreEqual(dt, svc.test(dt)); + Assert.AreEqual(dt, svc.testNullTimestamp(dt)); + Assert.IsNull(svc.testNullTimestamp(null)); + Assert.AreEqual(dt, svc.testArray(new DateTime?[] { dt })[0]); + + Guid guid = Guid.NewGuid(); + + Assert.AreEqual(guid, svc.test(guid)); + Assert.AreEqual(guid, svc.testNullUUID(guid)); + Assert.IsNull(svc.testNullUUID(null)); + Assert.AreEqual(guid, svc.testArray(new Guid?[] { guid })[0]); } /// @@ -1373,6 +1401,12 @@ public interface IJavaService /** */ bool test(bool x); + /** */ + DateTime test(DateTime x); + + /** */ + Guid test(Guid x); + /** */ byte? testWrapper(byte? x); @@ -1424,14 +1458,27 @@ public interface IJavaService /** */ bool[] testArray(bool[] x); + /** */ + DateTime?[] testArray(DateTime?[] x); + + /** */ + Guid?[] testArray(Guid?[] x); + /** */ int test(int x, string y); + /** */ int test(string x, int y); /** */ int? testNull(int? x); + /** */ + DateTime? testNullTimestamp(DateTime? x); + + /** */ + Guid? testNullUUID(Guid? x); + /** */ int testParams(params object[] args); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs index 42638bc92e38d..481758201492c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs @@ -247,6 +247,16 @@ private static Action GetPlatformArgWriter(Type paramType, if (type.IsPrimitive) return null; + if (type.IsArray) + { + Type elemType = type.GetElementType(); + + if (elemType == typeof(Guid?)) + return (writer, o) => writer.WriteGuidArray((Guid?[]) o); + else if (elemType == typeof(DateTime?)) + return (writer, o) => writer.WriteTimestampArray((DateTime?[]) o); + } + var handler = BinarySystemHandlers.GetWriteHandler(type); if (handler != null) @@ -258,6 +268,9 @@ private static Action GetPlatformArgWriter(Type paramType, if (arg is ICollection) return (writer, o) => writer.WriteCollection((ICollection) o); + if (arg is DateTime) + return (writer, o) => writer.WriteTimestamp((DateTime) o); + return null; } } From bf93ac157526169db0d1cac1bdb19905d472fdab Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 25 May 2020 14:29:29 +0300 Subject: [PATCH 02/26] IGNITE-13012 : halt timeouts on the ping. --- .../apache/ignite/spi/discovery/tcp/ServerImpl.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index cfe059936ca9e..31c3315df69a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -373,6 +373,7 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgSentTime = 0; + // Node ping interval is a half of actual failure detection timeout. Timeout on this ping is the second half. connCheckInterval = effectiveExchangeTimeout() / 2; utilityPool = new IgniteThreadPoolExecutor("disco-pool", @@ -3601,15 +3602,21 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been written to socket: " + msg.id()); + boolean ping = msg instanceof TcpDiscoveryConnectionCheckMessage; + + // For the ping we take half of actual failure detection. Another half is the interval. spi.writeToSocket(newNextNode ? newNext : next, sock, out, msg, - timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + ping ? timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()) / 2 : + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()) + ); long tsNanos0 = System.nanoTime(); - int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, ping ? timeoutHelper.nextTimeoutChunk(ackTimeout0) / 2 : + timeoutHelper.nextTimeoutChunk(ackTimeout0)); updateLastSentMessageTime(); From 47a9f7dfaa08052a64a4a229743eef0c4628fad8 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 25 May 2020 19:00:43 +0300 Subject: [PATCH 03/26] IGNITE-13012 : +test. --- .../discovery/tcp/ConnectionCheckingTest.java | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java new file mode 100644 index 0000000000000..ad7592f596313 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.Exchanger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Checks pinging next node in the ring relies on configured timeouts. + */ +public class ConnectionCheckingTest extends GridCommonAbstractTest { + /** + * Maximal additional delay before sending the ping message. 10 ms is the granulation in {@link + * IgniteUtils#currentTimeMillis()} and other 10ms is for code. + */ + private static final int ACCEPTABLE_CODE_DELAYS = 10 + 10; + + /** Number of the ping messages to watch to ensure node pinging works well. */ + private static final int PING_MESSAGES_CNT_TO_ENSURE = 10; + + /** Checks connection to next node is checked depending on configured failure detection timeout. */ + @Test + public void testWithFailureDetectionTimeout() throws Exception { + for (long failureDetectionTimeout = 200; failureDetectionTimeout <= 600; failureDetectionTimeout += 100) { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); + + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + launchTest(cfg); + } + } + + /** Checks connection to next node is checked depending on configured socket and acknowledgement timeouts. */ + @Test + public void testWithSocketAndAckTimeouts() throws Exception { + for (long sockTimeout = 200; sockTimeout <= 600; sockTimeout += 200) { + for (long ackTimeout = 200; ackTimeout <= 600; ackTimeout += 200) { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); + + cfg.setFailureDetectionTimeout(sockTimeout); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(sockTimeout); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(sockTimeout); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setReconnectCount(1); + + launchTest(cfg); + } + } + } + + /** */ + private void launchTest(IgniteConfiguration cfg) throws Exception { + startGrid(0); + + Exchanger errHolder = new Exchanger<>(); + + TcpDiscoverySpi prevSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + TcpDiscoverySpi spi = tcpDiscoverySpi(errHolder); + + spi.setIpFinder(LOCAL_IP_FINDER); + + cfg.setDiscoverySpi(spi); + + if (!prevSpi.failureDetectionTimeoutEnabled()) { + spi.setReconnectCount(prevSpi.getReconnectCount()); + + spi.setSocketTimeout(prevSpi.getSocketTimeout()); + + spi.setAckTimeout(prevSpi.getAckTimeout()); + } + + startGrid(cfg); + + String errMsg = errHolder.exchange(null); + + assertNull(errMsg, errMsg); + + stopAllGrids(true); + } + + /** + * @return Testing tcp discovery which monitors message traffic. + */ + private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder) { + return new TcpDiscoverySpi() { + /** Last sent message. */ + private final AtomicReference lastMsg = new AtomicReference<>(); + + /** Time of the last sent message. */ + private long lastSentMsgTime; + + /** Cycles counter. */ + private long cycles; + + /** Stop flag. */ + private boolean stop; + + /** */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + + TcpDiscoveryAbstractMessage prevMsg = lastMsg.getAndSet(msg); + + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + synchronized (lastMsg) { + if (!stop && prevMsg instanceof TcpDiscoveryConnectionCheckMessage) { + long period = System.currentTimeMillis() - lastSentMsgTime; + + lastSentMsgTime = System.currentTimeMillis(); + + long properTimeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : + getSocketTimeout(); + + if (period > properTimeout / 2 + ACCEPTABLE_CODE_DELAYS + || period < properTimeout / 2 - 10) { + stop("Invalid node ping interval: " + period + ". Expected value is half of actual " + + "message exchange timeout which is: " + properTimeout); + } + else if (failureDetectionTimeoutEnabled() && timeout > properTimeout / 2) { + stop("Invalid timeout on writting TcpDiscoveryConnectionCheckMessage: " + timeout + + ". Expected value is half of IgniteConfiguration.getFailureDetectionTimeout()" + + " which is: " + properTimeout); + } + else if (!failureDetectionTimeoutEnabled() && timeout > properTimeout / 2) { + stop("Invalid timeout on writting TcpDiscoveryConnectionCheckMessage: " + timeout + + ". Expected value is half of TcpDiscoverySpi.getSocketTimeout() which is: " + + properTimeout); + } + else if (++cycles == PING_MESSAGES_CNT_TO_ENSURE) + stop(null); + } + } + } + } + + /** */ + @Override protected int readReceipt(Socket sock, long timeout) throws IOException { + int res = super.readReceipt(sock, timeout); + + synchronized (lastMsg) { + lastSentMsgTime = System.currentTimeMillis(); + + if (lastMsg.get() instanceof TcpDiscoveryConnectionCheckMessage) { + if (failureDetectionTimeoutEnabled() && timeout > failureDetectionTimeout() / 2) { + stop("Invalid timeout on reading acknowledgement for TcpDiscoveryConnectionCheckMessage: " + + timeout + ". Expected value is half of IgniteConfiguration.failureDetectionTimeout " + + "which is: " + failureDetectionTimeout()); + } + else if (!failureDetectionTimeoutEnabled() && timeout > getAckTimeout() / 2) { + stop("Invalid timeout on reading acknowledgement for TcpDiscoveryConnectionCheckMessage: " + + timeout + ". Expected value is half of TcpDiscoverySpi.ackTimeout " + + "which is: " + getAckTimeout()); + } + } + } + + return res; + } + + /** Stops watching messages and notifies the exchanger. */ + private void stop(String errMsg) { + stop = true; + + try { + errHolder.exchange(errMsg); + } + catch (InterruptedException e) { + // No-op. + } + } + }; + } + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSystemWorkerBlockedTimeout(20_000); + + //Prevent other discovery messages. + cfg.setMetricsUpdateFrequency(24 * 60 * 60_000); + + cfg.setClientFailureDetectionTimeout(cfg.getMetricsUpdateFrequency()); + + return cfg; + } + + /** */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(true); + } +} From 2c929faf829c3243b4b511e0fb13ce867a83642e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 25 May 2020 19:31:33 +0300 Subject: [PATCH 04/26] IGNITE-13012 : redeem of the timeouts. Fixed test. --- .../ignite/spi/discovery/tcp/ServerImpl.java | 15 ++-- .../discovery/tcp/ConnectionCheckingTest.java | 79 ++++++++++--------- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 31c3315df69a4..11d4cde08ab2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -373,7 +373,7 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgSentTime = 0; - // Node ping interval is a half of actual failure detection timeout. Timeout on this ping is the second half. + // Node ping interval is a half of actual failure detection timeout. connCheckInterval = effectiveExchangeTimeout() / 2; utilityPool = new IgniteThreadPoolExecutor("disco-pool", @@ -1924,7 +1924,7 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { return threads; } - /** @return Total timeout of single complete exchange operation in network on established connection. */ + /** @return Total timeout on complete message exchange in network over established connection. */ protected long effectiveExchangeTimeout() { return spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout() + spi.getAckTimeout(); @@ -3604,19 +3604,22 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof boolean ping = msg instanceof TcpDiscoveryConnectionCheckMessage; + long timeout = timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()); + // For the ping we take half of actual failure detection. Another half is the interval. spi.writeToSocket(newNextNode ? newNext : next, sock, out, msg, - ping ? timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()) / 2 : - timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()) + ping && spi.failureDetectionTimeoutEnabled() ? timeout / 2 : timeout ); + timeout = timeoutHelper.nextTimeoutChunk(ackTimeout0); + long tsNanos0 = System.nanoTime(); - int res = spi.readReceipt(sock, ping ? timeoutHelper.nextTimeoutChunk(ackTimeout0) / 2 : - timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int res = spi.readReceipt(sock, ping && spi.failureDetectionTimeoutEnabled() ? + timeout / 2 : timeout); updateLastSentMessageTime(); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java index ad7592f596313..6433e80ef28e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java @@ -30,19 +30,23 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static com.sun.tools.javac.util.Assert.check; + /** * Checks pinging next node in the ring relies on configured timeouts. */ public class ConnectionCheckingTest extends GridCommonAbstractTest { - /** - * Maximal additional delay before sending the ping message. 10 ms is the granulation in {@link - * IgniteUtils#currentTimeMillis()} and other 10ms is for code. - */ - private static final int ACCEPTABLE_CODE_DELAYS = 10 + 10; - /** Number of the ping messages to watch to ensure node pinging works well. */ private static final int PING_MESSAGES_CNT_TO_ENSURE = 10; + /** Timer granulation in milliseconds. See {@link IgniteUtils#currentTimeMillis()}. */ + private static final int TIMER_GRANULATION = 10; + + /** + * Maximal additional delay before sending the ping message including timer granulation in and other 10ms in code. + */ + private static final int ACCEPTABLE_ADDITIONAL_DELAY = TIMER_GRANULATION + 10; + /** Checks connection to next node is checked depending on configured failure detection timeout. */ @Test public void testWithFailureDetectionTimeout() throws Exception { @@ -58,8 +62,8 @@ public void testWithFailureDetectionTimeout() throws Exception { /** Checks connection to next node is checked depending on configured socket and acknowledgement timeouts. */ @Test public void testWithSocketAndAckTimeouts() throws Exception { - for (long sockTimeout = 200; sockTimeout <= 600; sockTimeout += 200) { - for (long ackTimeout = 200; ackTimeout <= 600; ackTimeout += 200) { + for (long sockTimeout = 200; sockTimeout <= 400; sockTimeout += 100) { + for (long ackTimeout = 200; ackTimeout <= 400; ackTimeout += 100) { IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); cfg.setFailureDetectionTimeout(sockTimeout); @@ -101,7 +105,7 @@ private void launchTest(IgniteConfiguration cfg) throws Exception { String errMsg = errHolder.exchange(null); - assertNull(errMsg, errMsg); + check(errMsg == null, errMsg); stopAllGrids(true); } @@ -123,7 +127,7 @@ private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder) { /** Stop flag. */ private boolean stop; - /** */ + /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { super.writeToSocket(sock, out, msg, timeout); @@ -137,23 +141,20 @@ private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder) { lastSentMsgTime = System.currentTimeMillis(); - long properTimeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : - getSocketTimeout(); + long msgExchangeTimeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : + getSocketTimeout() + getAckTimeout(); - if (period > properTimeout / 2 + ACCEPTABLE_CODE_DELAYS - || period < properTimeout / 2 - 10) { - stop("Invalid node ping interval: " + period + ". Expected value is half of actual " + - "message exchange timeout which is: " + properTimeout); + if (period > msgExchangeTimeout / 2 + ACCEPTABLE_ADDITIONAL_DELAY || + period < msgExchangeTimeout / 2 - TIMER_GRANULATION) { + stop("Invalid interval of sending TcpDiscoveryConnectionCheckMessage: " + period + + "ms. Expected value is near " + msgExchangeTimeout / 2 + "ms, half of message " + + "exchange timeout (" + msgExchangeTimeout + "ms)."); } - else if (failureDetectionTimeoutEnabled() && timeout > properTimeout / 2) { - stop("Invalid timeout on writting TcpDiscoveryConnectionCheckMessage: " + timeout + - ". Expected value is half of IgniteConfiguration.getFailureDetectionTimeout()" + - " which is: " + properTimeout); - } - else if (!failureDetectionTimeoutEnabled() && timeout > properTimeout / 2) { - stop("Invalid timeout on writting TcpDiscoveryConnectionCheckMessage: " + timeout + - ". Expected value is half of TcpDiscoverySpi.getSocketTimeout() which is: " + - properTimeout); + else if (failureDetectionTimeoutEnabled() && + timeout > msgExchangeTimeout / 2 + TIMER_GRANULATION) { + stop("Invalid timeout on sending TcpDiscoveryConnectionCheckMessage: " + timeout + + "ms. Expected value is near " + failureDetectionTimeout() / 2 + "ms, half of " + + "IgniteConfiguration.failureDetectionTimeout (" + msgExchangeTimeout + "ms)."); } else if (++cycles == PING_MESSAGES_CNT_TO_ENSURE) stop(null); @@ -162,24 +163,19 @@ else if (++cycles == PING_MESSAGES_CNT_TO_ENSURE) } } - /** */ + /** {@inheritDoc} */ @Override protected int readReceipt(Socket sock, long timeout) throws IOException { int res = super.readReceipt(sock, timeout); synchronized (lastMsg) { lastSentMsgTime = System.currentTimeMillis(); - if (lastMsg.get() instanceof TcpDiscoveryConnectionCheckMessage) { - if (failureDetectionTimeoutEnabled() && timeout > failureDetectionTimeout() / 2) { - stop("Invalid timeout on reading acknowledgement for TcpDiscoveryConnectionCheckMessage: " + - timeout + ". Expected value is half of IgniteConfiguration.failureDetectionTimeout " + - "which is: " + failureDetectionTimeout()); - } - else if (!failureDetectionTimeoutEnabled() && timeout > getAckTimeout() / 2) { - stop("Invalid timeout on reading acknowledgement for TcpDiscoveryConnectionCheckMessage: " + - timeout + ". Expected value is half of TcpDiscoverySpi.ackTimeout " + - "which is: " + getAckTimeout()); - } + if ((lastMsg.get() instanceof TcpDiscoveryConnectionCheckMessage) && + failureDetectionTimeoutEnabled() && + timeout > failureDetectionTimeout() / 2 + TIMER_GRANULATION) { + stop("Invalid timeout set on reading acknowledgement for TcpDiscoveryConnectionCheckMessage: " + + timeout + "ms. Expected value is up to " + failureDetectionTimeout() / 2 + "ms, half of " + + "IgniteConfiguration.failureDetectionTimeout (" + failureDetectionTimeout() + "ms)."); } } @@ -200,7 +196,7 @@ private void stop(String errMsg) { }; } - /** */ + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -214,10 +210,15 @@ private void stop(String errMsg) { return cfg; } - /** */ + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); stopAllGrids(true); } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } } From 245943a154e0e8cd083ad90a552ce69cf4b989f6 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 25 May 2020 20:46:11 +0300 Subject: [PATCH 05/26] IGNITE-13012 : redeem of the timeouts. Fixed test. --- .../apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java index 6433e80ef28e3..f980c2703b1e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java @@ -50,7 +50,7 @@ public class ConnectionCheckingTest extends GridCommonAbstractTest { /** Checks connection to next node is checked depending on configured failure detection timeout. */ @Test public void testWithFailureDetectionTimeout() throws Exception { - for (long failureDetectionTimeout = 200; failureDetectionTimeout <= 600; failureDetectionTimeout += 100) { + for (long failureDetectionTimeout = 200; failureDetectionTimeout <= 800; failureDetectionTimeout += 100) { IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); cfg.setFailureDetectionTimeout(failureDetectionTimeout); From 8f4dabf5a0bdab538fc9aaa37383880772b80cac Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 27 May 2020 13:25:23 +0300 Subject: [PATCH 06/26] IGNITE-13012 : Fixed tests. + a test. --- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 +- ...kingTest.java => ConnectionCheckTest.java} | 134 ++++++++++++++---- ...TcpDiscoverySpiFailureTimeoutSelfTest.java | 49 ------- 3 files changed, 111 insertions(+), 74 deletions(-) rename modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/{ConnectionCheckingTest.java => ConnectionCheckTest.java} (63%) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 11d4cde08ab2d..b86e71bec58d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -275,7 +275,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** Last time received message from ring. */ private volatile long lastRingMsgReceivedTime; - /** Time of last sent and answered message. */ + /** Time of last sent and acknowledged message. */ private volatile long lastRingMsgSentTime; /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java similarity index 63% rename from modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java rename to modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java index f980c2703b1e4..69cc55da4c5f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java @@ -21,12 +21,14 @@ import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.Exchanger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.GridTestClockTimer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -35,59 +37,129 @@ /** * Checks pinging next node in the ring relies on configured timeouts. */ -public class ConnectionCheckingTest extends GridCommonAbstractTest { - /** Number of the ping messages to watch to ensure node pinging works well. */ - private static final int PING_MESSAGES_CNT_TO_ENSURE = 10; +public class ConnectionCheckTest extends GridCommonAbstractTest { + /** Number of the ping messages to ensure node pinging works well. */ + private static final int PING_MESSAGES_CNT_TO_ENSURE = 15; - /** Timer granulation in milliseconds. See {@link IgniteUtils#currentTimeMillis()}. */ + /** Timer granulation in milliseconds. See {@link GridTestClockTimer}. */ private static final int TIMER_GRANULATION = 10; /** - * Maximal additional delay before sending the ping message including timer granulation in and other 10ms in code. + * Maximal additional delay before sending the ping message including timer granulation in and other delays + * like code delays and/or GC. */ - private static final int ACCEPTABLE_ADDITIONAL_DELAY = TIMER_GRANULATION + 10; + private static final int ACCEPTABLE_ADDITIONAL_DELAY = TIMER_GRANULATION + 50; - /** Checks connection to next node is checked depending on configured failure detection timeout. */ + /** Metric message period. Quite long by default to prevent other but ping discovery messages. */ + private long metricsUpdateFreq = 60 * 60 * 1000; + + /** */ + private long failureDetectionTimeout = 500; + + /** Checks TcpDiscoveryConnectionCheckMessage is send depending on failure detection timeout. */ @Test public void testWithFailureDetectionTimeout() throws Exception { - for (long failureDetectionTimeout = 200; failureDetectionTimeout <= 800; failureDetectionTimeout += 100) { - IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); + for (long failureDetectionTimeout = 300; failureDetectionTimeout <= 600; failureDetectionTimeout += 100) { + this.failureDetectionTimeout = failureDetectionTimeout; - cfg.setFailureDetectionTimeout(failureDetectionTimeout); + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); - launchTest(cfg); + launchConnectionCheckingTest(cfg); } } - /** Checks connection to next node is checked depending on configured socket and acknowledgement timeouts. */ + /** Checks TcpDiscoveryConnectionCheckMessage is send depending on socket and acknowledgement timeouts. */ @Test public void testWithSocketAndAckTimeouts() throws Exception { - for (long sockTimeout = 200; sockTimeout <= 400; sockTimeout += 100) { - for (long ackTimeout = 200; ackTimeout <= 400; ackTimeout += 100) { + for (long sockTimeout = 300; sockTimeout <= 500; sockTimeout += 100) { + for (long ackTimeout = 300; ackTimeout <= 500; ackTimeout += 100) { IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); - cfg.setFailureDetectionTimeout(sockTimeout); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(sockTimeout); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(sockTimeout); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setReconnectCount(1); - launchTest(cfg); + launchConnectionCheckingTest(cfg); } } } + /** Checks other than TcpDiscoveryConnectionCheckMessage message detects node failure. */ + @Test + public void testNodeFailureWithoutPing() throws Exception { + // Set metrics frequency more often than failure detection timeout. + failureDetectionTimeout = 5_000; + + metricsUpdateFreq = 300; + + Exchanger errHolder = new Exchanger<>(); + + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0)); + + cfg.setDiscoverySpi(new TcpDiscoverySpi() { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + + // Connection check message must not appear because of frequent metrics update message. + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { + try { + errHolder.exchange(msg.getClass().getSimpleName() + " must not appear in the message traffic."); + } + catch (InterruptedException e) { + // No-op. + } + } + + // Stop test once failed node detected. + if (msg instanceof TcpDiscoveryNodeFailedMessage) { + TcpDiscoveryNodeFailedMessage nodeFailedMsg = (TcpDiscoveryNodeFailedMessage)msg; + + try { + // We simulate failure of node 2. + errHolder.exchange(nodeFailedMsg.internalOrder() == 2 ? null : + "Wrong order of failed node: " + nodeFailedMsg.internalOrder() + ". Expected: 2"); + } + catch (InterruptedException e) { + // No-op. + } + } + } + }); + + startGrid(cfg); + + startGrid(1); + + startGrid(2); + + // Let cluster breathe. + Thread.sleep(2000); + + // Simulate failure of the second node. + TcpDiscoverySpi disco1 = (TcpDiscoverySpi)grid(1).configuration().getDiscoverySpi(); + + disco1.simulateNodeFailure(); + + String err = errHolder.exchange(null); + + check(err == null, err); + } + /** */ - private void launchTest(IgniteConfiguration cfg) throws Exception { + private void launchConnectionCheckingTest(IgniteConfiguration cfg) throws Exception { startGrid(0); Exchanger errHolder = new Exchanger<>(); + AtomicBoolean beginFlag = new AtomicBoolean(); + TcpDiscoverySpi prevSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - TcpDiscoverySpi spi = tcpDiscoverySpi(errHolder); + TcpDiscoverySpi spi = tcpDiscoverySpi(errHolder, beginFlag); spi.setIpFinder(LOCAL_IP_FINDER); @@ -103,6 +175,11 @@ private void launchTest(IgniteConfiguration cfg) throws Exception { startGrid(cfg); + // Let cluster breathe. + Thread.sleep(2000); + + beginFlag.set(true); + String errMsg = errHolder.exchange(null); check(errMsg == null, errMsg); @@ -111,9 +188,9 @@ private void launchTest(IgniteConfiguration cfg) throws Exception { } /** - * @return Testing tcp discovery which monitors message traffic. + * @return Testing TCP discovery monitoring the message traffic. */ - private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder) { + private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder, AtomicBoolean beginFlag) { return new TcpDiscoverySpi() { /** Last sent message. */ private final AtomicReference lastMsg = new AtomicReference<>(); @@ -132,6 +209,9 @@ private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder) { long timeout) throws IOException, IgniteCheckedException { super.writeToSocket(sock, out, msg, timeout); + if (!beginFlag.get()) + return; + TcpDiscoveryAbstractMessage prevMsg = lastMsg.getAndSet(msg); if (msg instanceof TcpDiscoveryConnectionCheckMessage) { @@ -167,6 +247,9 @@ else if (++cycles == PING_MESSAGES_CNT_TO_ENSURE) @Override protected int readReceipt(Socket sock, long timeout) throws IOException { int res = super.readReceipt(sock, timeout); + if (!beginFlag.get()) + return res; + synchronized (lastMsg) { lastSentMsgTime = System.currentTimeMillis(); @@ -202,11 +285,14 @@ private void stop(String errMsg) { cfg.setSystemWorkerBlockedTimeout(20_000); - //Prevent other discovery messages. - cfg.setMetricsUpdateFrequency(24 * 60 * 60_000); + cfg.setMetricsUpdateFrequency(metricsUpdateFreq); + + cfg.setFailureDetectionTimeout(failureDetectionTimeout); cfg.setClientFailureDetectionTimeout(cfg.getMetricsUpdateFrequency()); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(LOCAL_IP_FINDER); + return cfg; } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 3b4fbc7b758fe..84180d1fb4fc6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.AbstractDiscoverySelfTest; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -163,54 +162,6 @@ public void testFailureDetectionOnSocketWrite() throws Exception { } } - /** - * @throws Exception In case of error. - */ - @Test - public void testConnectionCheckMessage() throws Exception { - TestTcpDiscoverySpi nextSpi = null; - - try { - assert firstSpi().connCheckStatusMsgCntSent == 0; - - TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); - - assertNotNull(nextNode); - - nextSpi = null; - - for (int i = 1; i < spis.size(); i++) - if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { - nextSpi = (TestTcpDiscoverySpi)spis.get(i); - break; - } - - assertNotNull(nextSpi); - - assert nextSpi.connCheckStatusMsgCntReceived == 0; - - firstSpi().cntConnCheckMsg = true; - nextSpi.cntConnCheckMsg = true; - - Thread.sleep(firstSpi().failureDetectionTimeout()); - - firstSpi().cntConnCheckMsg = false; - nextSpi.cntConnCheckMsg = false; - - int sent = firstSpi().connCheckStatusMsgCntSent; - int received = nextSpi.connCheckStatusMsgCntReceived; - - assert sent >= 15 && sent < 25 : "messages sent: " + sent; - assert received >= 15 && received < 25 : "messages received: " + received; - } - finally { - firstSpi().resetState(); - - if (nextSpi != null) - nextSpi.resetState(); - } - } - /** * Returns the first spi with failure detection timeout enabled. * From 62f5d6a2b67ff97a94d08f19c9d4ff4dfff4a3ed Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 27 May 2020 17:13:48 +0300 Subject: [PATCH 07/26] IGNITE-13012 : fix of coordinator failure test. --- .../apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 16fd75115aa0d..c4bc8aa0b2b85 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -1976,7 +1976,8 @@ private void checkFailedCoordinatorNode(SegmentationPolicy segPlc) throws Except ignite1.configuration().getDiscoverySpi().failNode(coordId, null); - assertTrue(failedLatch.await(2000, MILLISECONDS)); + // Wait for the configured timeout + other possible code delays. + assertTrue(failedLatch.await(ignite1.configuration().getFailureDetectionTimeout() + 50, MILLISECONDS)); assertTrue(coordSegmented.get()); From dc2375627a678008cef6256546af8b333135c941 Mon Sep 17 00:00:00 2001 From: Vladimir Date: Thu, 28 May 2020 10:51:03 +0300 Subject: [PATCH 08/26] IGNITE-13012 : test fix --- .../internal/GridFailFastNodeFailureDetectionSelfTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java index 73cb5f7011687..00d508fdf9dc9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java @@ -107,7 +107,7 @@ public void testFailFast() throws Exception { failNode(ignite1); - assert failLatch.await(1500, MILLISECONDS); + assert failLatch.await(ignite1.configuration().getFailureDetectionTimeout() + 50, MILLISECONDS); } /** From 708934305864f86b40d35a54d71d59c43a248cc5 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 2 Jun 2020 12:46:43 +0300 Subject: [PATCH 09/26] IGNITE-13012 : Reverted tests. Failure detection timeout is shared with previous message. --- .../spi/IgniteSpiOperationTimeoutHelper.java | 16 + .../ignite/spi/discovery/tcp/ServerImpl.java | 45 ++- ...dFailFastNodeFailureDetectionSelfTest.java | 2 +- .../discovery/tcp/ConnectionCheckTest.java | 310 ------------------ .../discovery/tcp/TcpDiscoverySelfTest.java | 3 +- ...TcpDiscoverySpiFailureTimeoutSelfTest.java | 49 +++ 6 files changed, 87 insertions(+), 338 deletions(-) delete mode 100644 modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java index 2e590ce593608..48161baf9f535 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -57,6 +57,22 @@ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) adapter.clientFailureDetectionTimeout(); } + /** + * Creates timeout helper based on time of last related operation. + * + * @param adapter SPI adapter. + * @param srvOp {@code True} if communicates with server node. + * @param lastOperStartNanos Time of last related operation in nanos. + */ + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp, long lastOperStartNanos) { + this(adapter, srvOp); + + this.lastOperStartNanos = lastOperStartNanos; + + if (lastOperStartNanos > 0) + timeout = failureDetectionTimeout; + } + /** * Returns a timeout value to use for the next network operation. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b86e71bec58d5..f4416ea9d76c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -199,6 +199,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); + /** Minimal interval checking connection ot next node in the ring. */ + private static final long MIN_CON_CHECK_INTERVAL = 500; + /** Interval of checking connection to next node in the ring. */ private long connCheckInterval; @@ -373,8 +376,10 @@ class ServerImpl extends TcpDiscoveryImpl { lastRingMsgSentTime = 0; - // Node ping interval is a half of actual failure detection timeout. - connCheckInterval = effectiveExchangeTimeout() / 2; + long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + spi.getSocketTimeout() + spi.getAckTimeout(); + + connCheckInterval = Math.min(msgExchangeTimeout / 4, MIN_CON_CHECK_INTERVAL); utilityPool = new IgniteThreadPoolExecutor("disco-pool", spi.ignite().name(), @@ -1924,12 +1929,6 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { return threads; } - /** @return Total timeout on complete message exchange in network over established connection. */ - protected long effectiveExchangeTimeout() { - return spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : - spi.getSocketTimeout() + spi.getAckTimeout(); - } - /** {@inheritDoc} */ @Override public void updateMetrics(UUID nodeId, ClusterMetrics metrics, @@ -3335,7 +3334,7 @@ else if (log.isTraceEnabled()) while (true) { if (sock == null) { if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime); boolean success = false; @@ -3548,12 +3547,17 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof addFailedNodes(pendingMsg, failedNodes); - if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + if (timeoutHelper == null) { + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, + lastRingMsgSentTime); + } try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); + + spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( + spi.getSocketTimeout())); } finally { clearNodeAddedMessage(pendingMsg); @@ -3593,7 +3597,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof long tsNanos = System.nanoTime(); if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime); addFailedNodes(msg, failedNodes); @@ -3602,24 +3606,15 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been written to socket: " + msg.id()); - boolean ping = msg instanceof TcpDiscoveryConnectionCheckMessage; - - long timeout = timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()); - - // For the ping we take half of actual failure detection. Another half is the interval. spi.writeToSocket(newNextNode ? newNext : next, sock, out, msg, - ping && spi.failureDetectionTimeoutEnabled() ? timeout / 2 : timeout - ); - - timeout = timeoutHelper.nextTimeoutChunk(ackTimeout0); + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); long tsNanos0 = System.nanoTime(); - int res = spi.readReceipt(sock, ping && spi.failureDetectionTimeoutEnabled() ? - timeout / 2 : timeout); + int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); updateLastSentMessageTime(); @@ -6199,7 +6194,7 @@ private void checkMetricsReceiving() { private void checkConnection() { Boolean hasRemoteSrvNodes = null; - long elapsed = (lastRingMsgSentTime + connCheckInterval) - U.currentTimeMillis(); + long elapsed = (lastRingMsgSentTime + U.millisToNanos(connCheckInterval)) - System.nanoTime(); if (elapsed > 0) return; @@ -6219,7 +6214,7 @@ private void checkConnection() { /** Fixates time of last sent message. */ private void updateLastSentMessageTime() { - lastRingMsgSentTime = U.currentTimeMillis(); + lastRingMsgSentTime = System.nanoTime(); } /** Thread that executes {@link TcpServer}'s code. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java index 00d508fdf9dc9..73cb5f7011687 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java @@ -107,7 +107,7 @@ public void testFailFast() throws Exception { failNode(ignite1); - assert failLatch.await(ignite1.configuration().getFailureDetectionTimeout() + 50, MILLISECONDS); + assert failLatch.await(1500, MILLISECONDS); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java deleted file mode 100644 index 69cc55da4c5f6..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ConnectionCheckTest.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; -import java.util.concurrent.Exchanger; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.GridTestClockTimer; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Test; - -import static com.sun.tools.javac.util.Assert.check; - -/** - * Checks pinging next node in the ring relies on configured timeouts. - */ -public class ConnectionCheckTest extends GridCommonAbstractTest { - /** Number of the ping messages to ensure node pinging works well. */ - private static final int PING_MESSAGES_CNT_TO_ENSURE = 15; - - /** Timer granulation in milliseconds. See {@link GridTestClockTimer}. */ - private static final int TIMER_GRANULATION = 10; - - /** - * Maximal additional delay before sending the ping message including timer granulation in and other delays - * like code delays and/or GC. - */ - private static final int ACCEPTABLE_ADDITIONAL_DELAY = TIMER_GRANULATION + 50; - - /** Metric message period. Quite long by default to prevent other but ping discovery messages. */ - private long metricsUpdateFreq = 60 * 60 * 1000; - - /** */ - private long failureDetectionTimeout = 500; - - /** Checks TcpDiscoveryConnectionCheckMessage is send depending on failure detection timeout. */ - @Test - public void testWithFailureDetectionTimeout() throws Exception { - for (long failureDetectionTimeout = 300; failureDetectionTimeout <= 600; failureDetectionTimeout += 100) { - this.failureDetectionTimeout = failureDetectionTimeout; - - IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); - - launchConnectionCheckingTest(cfg); - } - } - - /** Checks TcpDiscoveryConnectionCheckMessage is send depending on socket and acknowledgement timeouts. */ - @Test - public void testWithSocketAndAckTimeouts() throws Exception { - for (long sockTimeout = 300; sockTimeout <= 500; sockTimeout += 100) { - for (long ackTimeout = 300; ackTimeout <= 500; ackTimeout += 100) { - IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(1)); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(sockTimeout); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(sockTimeout); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setReconnectCount(1); - - launchConnectionCheckingTest(cfg); - } - } - } - - /** Checks other than TcpDiscoveryConnectionCheckMessage message detects node failure. */ - @Test - public void testNodeFailureWithoutPing() throws Exception { - // Set metrics frequency more often than failure detection timeout. - failureDetectionTimeout = 5_000; - - metricsUpdateFreq = 300; - - Exchanger errHolder = new Exchanger<>(); - - IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0)); - - cfg.setDiscoverySpi(new TcpDiscoverySpi() { - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - long timeout) throws IOException, IgniteCheckedException { - super.writeToSocket(sock, out, msg, timeout); - - // Connection check message must not appear because of frequent metrics update message. - if (msg instanceof TcpDiscoveryConnectionCheckMessage) { - try { - errHolder.exchange(msg.getClass().getSimpleName() + " must not appear in the message traffic."); - } - catch (InterruptedException e) { - // No-op. - } - } - - // Stop test once failed node detected. - if (msg instanceof TcpDiscoveryNodeFailedMessage) { - TcpDiscoveryNodeFailedMessage nodeFailedMsg = (TcpDiscoveryNodeFailedMessage)msg; - - try { - // We simulate failure of node 2. - errHolder.exchange(nodeFailedMsg.internalOrder() == 2 ? null : - "Wrong order of failed node: " + nodeFailedMsg.internalOrder() + ". Expected: 2"); - } - catch (InterruptedException e) { - // No-op. - } - } - } - }); - - startGrid(cfg); - - startGrid(1); - - startGrid(2); - - // Let cluster breathe. - Thread.sleep(2000); - - // Simulate failure of the second node. - TcpDiscoverySpi disco1 = (TcpDiscoverySpi)grid(1).configuration().getDiscoverySpi(); - - disco1.simulateNodeFailure(); - - String err = errHolder.exchange(null); - - check(err == null, err); - } - - /** */ - private void launchConnectionCheckingTest(IgniteConfiguration cfg) throws Exception { - startGrid(0); - - Exchanger errHolder = new Exchanger<>(); - - AtomicBoolean beginFlag = new AtomicBoolean(); - - TcpDiscoverySpi prevSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - - TcpDiscoverySpi spi = tcpDiscoverySpi(errHolder, beginFlag); - - spi.setIpFinder(LOCAL_IP_FINDER); - - cfg.setDiscoverySpi(spi); - - if (!prevSpi.failureDetectionTimeoutEnabled()) { - spi.setReconnectCount(prevSpi.getReconnectCount()); - - spi.setSocketTimeout(prevSpi.getSocketTimeout()); - - spi.setAckTimeout(prevSpi.getAckTimeout()); - } - - startGrid(cfg); - - // Let cluster breathe. - Thread.sleep(2000); - - beginFlag.set(true); - - String errMsg = errHolder.exchange(null); - - check(errMsg == null, errMsg); - - stopAllGrids(true); - } - - /** - * @return Testing TCP discovery monitoring the message traffic. - */ - private TcpDiscoverySpi tcpDiscoverySpi(Exchanger errHolder, AtomicBoolean beginFlag) { - return new TcpDiscoverySpi() { - /** Last sent message. */ - private final AtomicReference lastMsg = new AtomicReference<>(); - - /** Time of the last sent message. */ - private long lastSentMsgTime; - - /** Cycles counter. */ - private long cycles; - - /** Stop flag. */ - private boolean stop; - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - long timeout) throws IOException, IgniteCheckedException { - super.writeToSocket(sock, out, msg, timeout); - - if (!beginFlag.get()) - return; - - TcpDiscoveryAbstractMessage prevMsg = lastMsg.getAndSet(msg); - - if (msg instanceof TcpDiscoveryConnectionCheckMessage) { - synchronized (lastMsg) { - if (!stop && prevMsg instanceof TcpDiscoveryConnectionCheckMessage) { - long period = System.currentTimeMillis() - lastSentMsgTime; - - lastSentMsgTime = System.currentTimeMillis(); - - long msgExchangeTimeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : - getSocketTimeout() + getAckTimeout(); - - if (period > msgExchangeTimeout / 2 + ACCEPTABLE_ADDITIONAL_DELAY || - period < msgExchangeTimeout / 2 - TIMER_GRANULATION) { - stop("Invalid interval of sending TcpDiscoveryConnectionCheckMessage: " + period + - "ms. Expected value is near " + msgExchangeTimeout / 2 + "ms, half of message " + - "exchange timeout (" + msgExchangeTimeout + "ms)."); - } - else if (failureDetectionTimeoutEnabled() && - timeout > msgExchangeTimeout / 2 + TIMER_GRANULATION) { - stop("Invalid timeout on sending TcpDiscoveryConnectionCheckMessage: " + timeout + - "ms. Expected value is near " + failureDetectionTimeout() / 2 + "ms, half of " + - "IgniteConfiguration.failureDetectionTimeout (" + msgExchangeTimeout + "ms)."); - } - else if (++cycles == PING_MESSAGES_CNT_TO_ENSURE) - stop(null); - } - } - } - } - - /** {@inheritDoc} */ - @Override protected int readReceipt(Socket sock, long timeout) throws IOException { - int res = super.readReceipt(sock, timeout); - - if (!beginFlag.get()) - return res; - - synchronized (lastMsg) { - lastSentMsgTime = System.currentTimeMillis(); - - if ((lastMsg.get() instanceof TcpDiscoveryConnectionCheckMessage) && - failureDetectionTimeoutEnabled() && - timeout > failureDetectionTimeout() / 2 + TIMER_GRANULATION) { - stop("Invalid timeout set on reading acknowledgement for TcpDiscoveryConnectionCheckMessage: " + - timeout + "ms. Expected value is up to " + failureDetectionTimeout() / 2 + "ms, half of " + - "IgniteConfiguration.failureDetectionTimeout (" + failureDetectionTimeout() + "ms)."); - } - } - - return res; - } - - /** Stops watching messages and notifies the exchanger. */ - private void stop(String errMsg) { - stop = true; - - try { - errHolder.exchange(errMsg); - } - catch (InterruptedException e) { - // No-op. - } - } - }; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setSystemWorkerBlockedTimeout(20_000); - - cfg.setMetricsUpdateFrequency(metricsUpdateFreq); - - cfg.setFailureDetectionTimeout(failureDetectionTimeout); - - cfg.setClientFailureDetectionTimeout(cfg.getMetricsUpdateFrequency()); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(LOCAL_IP_FINDER); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 3 * 60 * 1000; - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index c4bc8aa0b2b85..16fd75115aa0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -1976,8 +1976,7 @@ private void checkFailedCoordinatorNode(SegmentationPolicy segPlc) throws Except ignite1.configuration().getDiscoverySpi().failNode(coordId, null); - // Wait for the configured timeout + other possible code delays. - assertTrue(failedLatch.await(ignite1.configuration().getFailureDetectionTimeout() + 50, MILLISECONDS)); + assertTrue(failedLatch.await(2000, MILLISECONDS)); assertTrue(coordSegmented.get()); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 84180d1fb4fc6..3b4fbc7b758fe 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.AbstractDiscoverySelfTest; import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -162,6 +163,54 @@ public void testFailureDetectionOnSocketWrite() throws Exception { } } + /** + * @throws Exception In case of error. + */ + @Test + public void testConnectionCheckMessage() throws Exception { + TestTcpDiscoverySpi nextSpi = null; + + try { + assert firstSpi().connCheckStatusMsgCntSent == 0; + + TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode(); + + assertNotNull(nextNode); + + nextSpi = null; + + for (int i = 1; i < spis.size(); i++) + if (spis.get(i).getLocalNode().id().equals(nextNode.id())) { + nextSpi = (TestTcpDiscoverySpi)spis.get(i); + break; + } + + assertNotNull(nextSpi); + + assert nextSpi.connCheckStatusMsgCntReceived == 0; + + firstSpi().cntConnCheckMsg = true; + nextSpi.cntConnCheckMsg = true; + + Thread.sleep(firstSpi().failureDetectionTimeout()); + + firstSpi().cntConnCheckMsg = false; + nextSpi.cntConnCheckMsg = false; + + int sent = firstSpi().connCheckStatusMsgCntSent; + int received = nextSpi.connCheckStatusMsgCntReceived; + + assert sent >= 15 && sent < 25 : "messages sent: " + sent; + assert received >= 15 && received < 25 : "messages received: " + received; + } + finally { + firstSpi().resetState(); + + if (nextSpi != null) + nextSpi.resetState(); + } + } + /** * Returns the first spi with failure detection timeout enabled. * From 3515f40a973c3fb542af32154a626b1d61e92d8e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 2 Jun 2020 13:18:25 +0300 Subject: [PATCH 10/26] IGNITE-13012 : fix. --- .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f4416ea9d76c2..2733b33affa28 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3555,9 +3555,6 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); - - spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( - spi.getSocketTimeout())); } finally { clearNodeAddedMessage(pendingMsg); From 9dca4f19e4c0a06972fd249d22fb15953d1cb21c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 3 Jun 2020 10:31:46 +0300 Subject: [PATCH 11/26] IGNITE-13012 : + test. --- .../ignite/spi/discovery/tcp/ServerImpl.java | 6 +- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 136 +++++++++++++++++- 2 files changed, 139 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2733b33affa28..070f16221d3c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -199,7 +199,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); - /** Minimal interval checking connection ot next node in the ring. */ + /** Minimal interval of connection check to next node in the ring. */ private static final long MIN_CON_CHECK_INTERVAL = 500; /** Interval of checking connection to next node in the ring. */ @@ -379,6 +379,8 @@ class ServerImpl extends TcpDiscoveryImpl { long msgExchangeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout() + spi.getAckTimeout(); + // Since we take in account time of last sent message, the interval should be quite short to give enough piece + // of failure detection timeout as send-and-acknowledge timeout of the message to send. connCheckInterval = Math.min(msgExchangeTimeout / 4, MIN_CON_CHECK_INTERVAL); utilityPool = new IgniteThreadPoolExecutor("disco-pool", @@ -3334,7 +3336,7 @@ else if (log.isTraceEnabled()) while (true) { if (sock == null) { if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); boolean success = false; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index dfc15e01687d8..372a9cb63358d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -22,16 +22,22 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -88,11 +94,137 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { /** */ private int connectionRecoveryTimeout = -1; + /** */ + private int failureDetectionTimeout = 2_000; + + /** */ + private int metricsUpdateFreq = 1_000; + /** {@inheritDoc} */ @Override protected void afterTest() { stopAllGrids(); } + /** Checks node failure is detected within failure detection timeout. */ + @Test + public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { + // We won't try recovering connection. We'll remove node from the grid asap. + connectionRecoveryTimeout = 0; + + // Makes test faster. + failureDetectionTimeout = 1000; + + // A message traffic. + metricsUpdateFreq = 750; + + // Running several times to be sure. + for (int i = 0; i < 15; ++i) { + // Holder of falure detection delay. Also is test start and end regulator. + final AtomicLong timer = new AtomicLong(); + + // SPI of node 0 detects the failure. + specialSpi = new TcpDiscoverySpi() { + /** */ + private AtomicBoolean detected = new AtomicBoolean(); + + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + + if (msg instanceof TcpDiscoveryNodeFailedMessage && detected.compareAndSet(false, true)) { + synchronized (timer) { + timer.set(System.nanoTime() - timer.get()); + + // Failure detected. Stop the test. + timer.notifyAll(); + } + } + + super.writeToSocket(sock, out, msg, timeout); + } + }; + + IgniteEx grid0 = startGrid(0); + + assert ((TcpDiscoverySpi)grid0.configuration().getDiscoverySpi()).failureDetectionTimeoutEnabled() : + "Failure detection timeout is not active."; + + long nodeDelay = failureDetectionTimeout * 2; + + // SPI of node 1 simulates node failure. + specialSpi = new TcpDiscoverySpi() { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + simulateUnacceptableDelay(); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + + simulateUnacceptableDelay(); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + + simulateUnacceptableDelay(); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** Simulates node delay like GC or unknown network issues. */ + private void simulateUnacceptableDelay() { + if (timer.get() > 0) { + try { + Thread.sleep(nodeDelay); + } + catch (InterruptedException e) { + // No-op. + } + } + } + }; + + startGrid(1); + + specialSpi = null; + + // Other node to send TcpDiscoveryNodeFailedMessage to. + startGrid(2); + + // Wait for exchanging various frequent messages like TcpDiscoveryCustomEventMessage. + awaitPartitionMapExchange(); + + // Randimizes failure time since cluster start. + Thread.sleep(new Random().nextInt(2000)); + + synchronized (timer) { + // Failure simulated. + timer.set(System.nanoTime()); + + // Wait until failure is detected. + timer.wait(getTestTimeout()); + } + + long failureDetectionDelay = U.nanosToMillis(timer.get()); + + if (log.isDebugEnabled()) + log.debug("Failure detection delay: " + failureDetectionDelay); + + assertTrue("Too long failure detection delay: " + failureDetectionDelay + + "ms. Failure detection timeout is: " + failureDetectionTimeout + "ms", + failureDetectionDelay <= failureDetectionTimeout); + + stopAllGrids(true); + } + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -107,7 +239,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { if (connectionRecoveryTimeout >= 0) spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout); - cfg.setFailureDetectionTimeout(2_000); + cfg.setFailureDetectionTimeout(failureDetectionTimeout); + + cfg.setMetricsUpdateFrequency(metricsUpdateFreq); cfg.setDiscoverySpi(spi); From bd00c208bb41836b0d5ef08fdcb6565e517f94eb Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 3 Jun 2020 17:49:48 +0300 Subject: [PATCH 12/26] IGNITE-13012 : + test fix. --- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 372a9cb63358d..3cd5a691f2af6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -117,8 +117,12 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { // A message traffic. metricsUpdateFreq = 750; + int runsCnt = 20; + + int sucessfullRunsCnt = 0; + // Running several times to be sure. - for (int i = 0; i < 15; ++i) { + for (int i = 0; i < runsCnt; ++i) { // Holder of falure detection delay. Also is test start and end regulator. final AtomicLong timer = new AtomicLong(); @@ -217,12 +221,17 @@ private void simulateUnacceptableDelay() { if (log.isDebugEnabled()) log.debug("Failure detection delay: " + failureDetectionDelay); - assertTrue("Too long failure detection delay: " + failureDetectionDelay + - "ms. Failure detection timeout is: " + failureDetectionTimeout + "ms", - failureDetectionDelay <= failureDetectionTimeout); + // Sometimes delays like GC pauses, timer granunalion (10ms) hinders the detection be within the timeout. + if (failureDetectionDelay <= failureDetectionTimeout) + ++sucessfullRunsCnt; stopAllGrids(true); } + + if (sucessfullRunsCnt < 0.8 * runsCnt) { + fail("Few sucessfull runs: " + sucessfullRunsCnt + " of " + runsCnt + ". Expected: 80% (" + + (0.8 * runsCnt) + ")."); + } } /** {@inheritDoc} */ From c4647253ac8e76f9f2f9c39f51f150b101c05917 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 4 Jun 2020 11:50:02 +0300 Subject: [PATCH 13/26] IGNITE-13012 : +10ms as the timer granulation. --- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 3cd5a691f2af6..4207bea88deba 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -218,13 +218,16 @@ private void simulateUnacceptableDelay() { long failureDetectionDelay = U.nanosToMillis(timer.get()); - if (log.isDebugEnabled()) - log.debug("Failure detection delay: " + failureDetectionDelay); - - // Sometimes delays like GC pauses, timer granunalion (10ms) hinders the detection be within the timeout. - if (failureDetectionDelay <= failureDetectionTimeout) + // 10ms is the timer granulation in IgniteUtils. + if (failureDetectionDelay <= failureDetectionTimeout + 10) { ++sucessfullRunsCnt; + if (log.isDebugEnabled()) + log.debug("Failure detection delay: " + failureDetectionDelay); + } + else + log.warning("Long failure detection delay: " + failureDetectionDelay); + stopAllGrids(true); } From 5370831f07938f4b54cd8ec8dcd012d9d58242d5 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 4 Jun 2020 13:34:41 +0300 Subject: [PATCH 14/26] IGNITE-13012 : test fixes. --- .../CacheContinuousQueryFilterDeploymentFailedTest.java | 3 +++ .../spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java | 1 + 2 files changed, 4 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java index f29b05f78a66a..2c0aa013fdce3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterDeploymentFailedTest.java @@ -54,6 +54,9 @@ public class CacheContinuousQueryFilterDeploymentFailedTest extends GridCommonAb @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + // Failure detection timeout > P2P class loading timeout which is set as network timeout. + cfg.setFailureDetectionTimeout(cfg.getNetworkTimeout() * 2); + ((TestTcpDiscoverySpi)cfg.getDiscoverySpi()).discoveryHook(new DiscoveryHook() { @Override public void afterDiscovery(DiscoveryCustomMessage customMsg) { if (customMsg instanceof StopRoutineDiscoveryMessage) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 4207bea88deba..d0928ff9be27e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -231,6 +231,7 @@ private void simulateUnacceptableDelay() { stopAllGrids(true); } + // Let's consider 80% of sucessfull runs. Other 20% we can spare to GC delays. if (sucessfullRunsCnt < 0.8 * runsCnt) { fail("Few sucessfull runs: " + sucessfullRunsCnt + " of " + runsCnt + ". Expected: 80% (" + (0.8 * runsCnt) + ")."); From a9ad35e89369cc452f2955e19377d4c250523637 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 4 Jun 2020 18:15:22 +0300 Subject: [PATCH 15/26] IGNITE-13012 : + 10ms as acceptable code delay. --- .../discovery/tcp/TcpDiscoveryNetworkIssuesTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index d0928ff9be27e..300fed1ffb0e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -218,8 +218,9 @@ private void simulateUnacceptableDelay() { long failureDetectionDelay = U.nanosToMillis(timer.get()); - // 10ms is the timer granulation in IgniteUtils. - if (failureDetectionDelay <= failureDetectionTimeout + 10) { + // 20ms is the timer granulation in IgniteUtils (10ms). Considered time lag between 2 nodes. + // So, the worst case is 2 * 10ms. + if (failureDetectionDelay <= failureDetectionTimeout + 20) { ++sucessfullRunsCnt; if (log.isDebugEnabled()) @@ -231,10 +232,10 @@ private void simulateUnacceptableDelay() { stopAllGrids(true); } - // Let's consider 80% of sucessfull runs. Other 20% we can spare to GC delays. + // Let's consider 80% of sucessfull runs. Other 20% we can spare to GC/platform delays. if (sucessfullRunsCnt < 0.8 * runsCnt) { - fail("Few sucessfull runs: " + sucessfullRunsCnt + " of " + runsCnt + ". Expected: 80% (" - + (0.8 * runsCnt) + ")."); + fail("Few sucessfull runs: " + sucessfullRunsCnt + '/' + runsCnt + ". Expected: " + + (0.8 * runsCnt) + " (80%)."); } } From a8fad436597bfc36d538f8e6897dc6f5ec0a037a Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 5 Jun 2020 10:17:08 +0300 Subject: [PATCH 16/26] IGNITE-13012 : test redeemed. --- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 29 +++++-------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 300fed1ffb0e9..ba452a98d5e56 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -111,18 +111,14 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { // We won't try recovering connection. We'll remove node from the grid asap. connectionRecoveryTimeout = 0; - // Makes test faster. + // Makes test faster. Also the value is closer to previous fixed ping rate 500ms. failureDetectionTimeout = 1000; // A message traffic. metricsUpdateFreq = 750; - int runsCnt = 20; - - int sucessfullRunsCnt = 0; - // Running several times to be sure. - for (int i = 0; i < runsCnt; ++i) { + for (int i = 0; i < 20; ++i) { // Holder of falure detection delay. Also is test start and end regulator. final AtomicLong timer = new AtomicLong(); @@ -218,24 +214,13 @@ private void simulateUnacceptableDelay() { long failureDetectionDelay = U.nanosToMillis(timer.get()); - // 20ms is the timer granulation in IgniteUtils (10ms). Considered time lag between 2 nodes. - // So, the worst case is 2 * 10ms. - if (failureDetectionDelay <= failureDetectionTimeout + 20) { - ++sucessfullRunsCnt; - - if (log.isDebugEnabled()) - log.debug("Failure detection delay: " + failureDetectionDelay); - } - else - log.warning("Long failure detection delay: " + failureDetectionDelay); - stopAllGrids(true); - } - // Let's consider 80% of sucessfull runs. Other 20% we can spare to GC/platform delays. - if (sucessfullRunsCnt < 0.8 * runsCnt) { - fail("Few sucessfull runs: " + sucessfullRunsCnt + '/' + runsCnt + ". Expected: " - + (0.8 * runsCnt) + " (80%)."); + // Previous delay is up to 'failure detection timeout + 500ms'. Where 500ms is fixed ping rate. + // To avoid flaky test, we give anoter 100ms to work with GC pauses, platform delays and the timer + // granulation in IgniteUtils.currentTimeMillis(). + assertTrue("Long failure detection delay: " + failureDetectionDelay, + failureDetectionDelay <= failureDetectionTimeout + 100); } } From 45c426fcd0e63144924c819eadf830ad90392595 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 5 Jun 2020 15:07:33 +0300 Subject: [PATCH 17/26] IGNITE-13016 : faster test. --- .../spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index ba452a98d5e56..09f020457d45b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -118,7 +118,7 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { metricsUpdateFreq = 750; // Running several times to be sure. - for (int i = 0; i < 20; ++i) { + for (int i = 0; i < 10; ++i) { // Holder of falure detection delay. Also is test start and end regulator. final AtomicLong timer = new AtomicLong(); @@ -202,7 +202,7 @@ private void simulateUnacceptableDelay() { awaitPartitionMapExchange(); // Randimizes failure time since cluster start. - Thread.sleep(new Random().nextInt(2000)); + Thread.sleep(new Random().nextInt(1000)); synchronized (timer) { // Failure simulated. From 0d58fe44f892978025bedd323d1394e4ee826f8c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 9 Jun 2020 01:20:21 +0300 Subject: [PATCH 18/26] Revert "IGNITE-13134 : test duration fix. --- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 09f020457d45b..0d0b581208aef 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -100,6 +100,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { /** */ private int metricsUpdateFreq = 1_000; + /** */ + private Long systemWorkerBlockedTimeout; + /** {@inheritDoc} */ @Override protected void afterTest() { stopAllGrids(); @@ -112,13 +115,16 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { connectionRecoveryTimeout = 0; // Makes test faster. Also the value is closer to previous fixed ping rate 500ms. - failureDetectionTimeout = 1000; + failureDetectionTimeout = 800; // A message traffic. - metricsUpdateFreq = 750; + metricsUpdateFreq = 700; - // Running several times to be sure. - for (int i = 0; i < 10; ++i) { + // Avoid useless arns. We do block threadf specually. + systemWorkerBlockedTimeout = 5000L; + + // Running several times to be sure. Let's keep it within 1min. + for (int i = 0; i < 7; ++i) { // Holder of falure detection delay. Also is test start and end regulator. final AtomicLong timer = new AtomicLong(); @@ -202,7 +208,7 @@ private void simulateUnacceptableDelay() { awaitPartitionMapExchange(); // Randimizes failure time since cluster start. - Thread.sleep(new Random().nextInt(1000)); + Thread.sleep(new Random().nextInt(500)); synchronized (timer) { // Failure simulated. @@ -242,6 +248,8 @@ private void simulateUnacceptableDelay() { cfg.setMetricsUpdateFrequency(metricsUpdateFreq); + cfg.setSystemWorkerBlockedTimeout(systemWorkerBlockedTimeout); + cfg.setDiscoverySpi(spi); return cfg; From 7f7a6082f889c34a588f6d3f07a20612dd723dcc Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 9 Jun 2020 10:07:25 +0300 Subject: [PATCH 19/26] IGNITE-13012 : faster test. --- .../spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 0d0b581208aef..f1e20a1f8deb4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -115,10 +115,10 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { connectionRecoveryTimeout = 0; // Makes test faster. Also the value is closer to previous fixed ping rate 500ms. - failureDetectionTimeout = 800; + failureDetectionTimeout = 700; // A message traffic. - metricsUpdateFreq = 700; + metricsUpdateFreq = failureDetectionTimeout; // Avoid useless arns. We do block threadf specually. systemWorkerBlockedTimeout = 5000L; From dde7e7cafb15365b133c133415b54e8a24ae039b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 9 Jun 2020 10:10:13 +0300 Subject: [PATCH 20/26] IGNITE-13012 : minority. --- .../ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index f1e20a1f8deb4..e54a187fcc083 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -118,7 +118,7 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { failureDetectionTimeout = 700; // A message traffic. - metricsUpdateFreq = failureDetectionTimeout; + metricsUpdateFreq = 400; // Avoid useless arns. We do block threadf specually. systemWorkerBlockedTimeout = 5000L; From e1b9735e692e1ff9ff88b2b5c983f9383317726d Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Jun 2020 16:51:49 +0300 Subject: [PATCH 21/26] IGNITE-13012 : spelling fix. --- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index e54a187fcc083..81012bb529f17 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -101,7 +102,7 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { private int metricsUpdateFreq = 1_000; /** */ - private Long systemWorkerBlockedTimeout; + private Long sysWorkerBlockedTimeout; /** {@inheritDoc} */ @Override protected void afterTest() { @@ -120,8 +121,8 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { // A message traffic. metricsUpdateFreq = 400; - // Avoid useless arns. We do block threadf specually. - systemWorkerBlockedTimeout = 5000L; + // Avoid useless warnings. We do block threads specially. + sysWorkerBlockedTimeout = 5000L; // Running several times to be sure. Let's keep it within 1min. for (int i = 0; i < 7; ++i) { @@ -151,7 +152,7 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { IgniteEx grid0 = startGrid(0); - assert ((TcpDiscoverySpi)grid0.configuration().getDiscoverySpi()).failureDetectionTimeoutEnabled() : + assert ((IgniteSpiAdapter)grid0.configuration().getDiscoverySpi()).failureDetectionTimeoutEnabled() : "Failure detection timeout is not active."; long nodeDelay = failureDetectionTimeout * 2; @@ -190,7 +191,7 @@ private void simulateUnacceptableDelay() { try { Thread.sleep(nodeDelay); } - catch (InterruptedException e) { + catch (InterruptedException ignored) { // No-op. } } @@ -248,7 +249,7 @@ private void simulateUnacceptableDelay() { cfg.setMetricsUpdateFrequency(metricsUpdateFreq); - cfg.setSystemWorkerBlockedTimeout(systemWorkerBlockedTimeout); + cfg.setSystemWorkerBlockedTimeout(sysWorkerBlockedTimeout); cfg.setDiscoverySpi(spi); From 1b07dd53e3c313e407f892b2b3fe7681f44a2ca4 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Jun 2020 16:54:04 +0300 Subject: [PATCH 22/26] IGNITE-13012 : empty lines. --- .../ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 81012bb529f17..1656bdaee84d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -170,7 +170,6 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { /** {@inheritDoc} */ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout) throws IOException { - simulateUnacceptableDelay(); super.writeToSocket(msg, sock, res, timeout); @@ -179,7 +178,6 @@ public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException { - simulateUnacceptableDelay(); super.writeToSocket(sock, msg, data, timeout); From a4be00090b35127a537b1959dee4cca2f0ba5732 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Jun 2020 17:37:54 +0300 Subject: [PATCH 23/26] IGNITE-13012 :renaming. --- .../org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 070f16221d3c5..78f3a5dcde850 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -199,8 +199,8 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private static final TcpDiscoveryAbstractMessage WAKEUP = new TcpDiscoveryDummyWakeupMessage(); - /** Minimal interval of connection check to next node in the ring. */ - private static final long MIN_CON_CHECK_INTERVAL = 500; + /** Maximal interval of connection check to next node in the ring. */ + private static final long MAX_CON_CHECK_INTERVAL = 500; /** Interval of checking connection to next node in the ring. */ private long connCheckInterval; @@ -381,7 +381,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Since we take in account time of last sent message, the interval should be quite short to give enough piece // of failure detection timeout as send-and-acknowledge timeout of the message to send. - connCheckInterval = Math.min(msgExchangeTimeout / 4, MIN_CON_CHECK_INTERVAL); + connCheckInterval = Math.min(msgExchangeTimeout / 4, MAX_CON_CHECK_INTERVAL); utilityPool = new IgniteThreadPoolExecutor("disco-pool", spi.ignite().name(), From d9c3108fd15e34c0d9ccc59eedfe41f1b3d256b4 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Jun 2020 17:38:57 +0300 Subject: [PATCH 24/26] IGNITE-13012 :renamings. Removes test. --- .../tcp/TcpDiscoveryNetworkIssuesTest.java | 142 +----------------- 1 file changed, 1 insertion(+), 141 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 1656bdaee84d3..dfc15e01687d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -22,23 +22,16 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.util.Map; -import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -95,140 +88,11 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest { /** */ private int connectionRecoveryTimeout = -1; - /** */ - private int failureDetectionTimeout = 2_000; - - /** */ - private int metricsUpdateFreq = 1_000; - - /** */ - private Long sysWorkerBlockedTimeout; - /** {@inheritDoc} */ @Override protected void afterTest() { stopAllGrids(); } - /** Checks node failure is detected within failure detection timeout. */ - @Test - public void testNodeFailureDetectedWithinConfiguredTimeout() throws Exception { - // We won't try recovering connection. We'll remove node from the grid asap. - connectionRecoveryTimeout = 0; - - // Makes test faster. Also the value is closer to previous fixed ping rate 500ms. - failureDetectionTimeout = 700; - - // A message traffic. - metricsUpdateFreq = 400; - - // Avoid useless warnings. We do block threads specially. - sysWorkerBlockedTimeout = 5000L; - - // Running several times to be sure. Let's keep it within 1min. - for (int i = 0; i < 7; ++i) { - // Holder of falure detection delay. Also is test start and end regulator. - final AtomicLong timer = new AtomicLong(); - - // SPI of node 0 detects the failure. - specialSpi = new TcpDiscoverySpi() { - /** */ - private AtomicBoolean detected = new AtomicBoolean(); - - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - long timeout) throws IOException, IgniteCheckedException { - - if (msg instanceof TcpDiscoveryNodeFailedMessage && detected.compareAndSet(false, true)) { - synchronized (timer) { - timer.set(System.nanoTime() - timer.get()); - - // Failure detected. Stop the test. - timer.notifyAll(); - } - } - - super.writeToSocket(sock, out, msg, timeout); - } - }; - - IgniteEx grid0 = startGrid(0); - - assert ((IgniteSpiAdapter)grid0.configuration().getDiscoverySpi()).failureDetectionTimeoutEnabled() : - "Failure detection timeout is not active."; - - long nodeDelay = failureDetectionTimeout * 2; - - // SPI of node 1 simulates node failure. - specialSpi = new TcpDiscoverySpi() { - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - long timeout) throws IOException, IgniteCheckedException { - simulateUnacceptableDelay(); - - super.writeToSocket(sock, out, msg, timeout); - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, - long timeout) throws IOException { - simulateUnacceptableDelay(); - - super.writeToSocket(msg, sock, res, timeout); - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, - long timeout) throws IOException { - simulateUnacceptableDelay(); - - super.writeToSocket(sock, msg, data, timeout); - } - - /** Simulates node delay like GC or unknown network issues. */ - private void simulateUnacceptableDelay() { - if (timer.get() > 0) { - try { - Thread.sleep(nodeDelay); - } - catch (InterruptedException ignored) { - // No-op. - } - } - } - }; - - startGrid(1); - - specialSpi = null; - - // Other node to send TcpDiscoveryNodeFailedMessage to. - startGrid(2); - - // Wait for exchanging various frequent messages like TcpDiscoveryCustomEventMessage. - awaitPartitionMapExchange(); - - // Randimizes failure time since cluster start. - Thread.sleep(new Random().nextInt(500)); - - synchronized (timer) { - // Failure simulated. - timer.set(System.nanoTime()); - - // Wait until failure is detected. - timer.wait(getTestTimeout()); - } - - long failureDetectionDelay = U.nanosToMillis(timer.get()); - - stopAllGrids(true); - - // Previous delay is up to 'failure detection timeout + 500ms'. Where 500ms is fixed ping rate. - // To avoid flaky test, we give anoter 100ms to work with GC pauses, platform delays and the timer - // granulation in IgniteUtils.currentTimeMillis(). - assertTrue("Long failure detection delay: " + failureDetectionDelay, - failureDetectionDelay <= failureDetectionTimeout + 100); - } - } - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -243,11 +107,7 @@ private void simulateUnacceptableDelay() { if (connectionRecoveryTimeout >= 0) spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout); - cfg.setFailureDetectionTimeout(failureDetectionTimeout); - - cfg.setMetricsUpdateFrequency(metricsUpdateFreq); - - cfg.setSystemWorkerBlockedTimeout(sysWorkerBlockedTimeout); + cfg.setFailureDetectionTimeout(2_000); cfg.setDiscoverySpi(spi); From 71435c2c9bebd50ba06a9acdf33a4a42ab064c1f Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 23 Jun 2020 15:04:20 +0300 Subject: [PATCH 25/26] reverted removal of 'public' --- .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 2 +- .../org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 78f3a5dcde850..8c06eb74ce10b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -362,7 +362,7 @@ class ServerImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ - @Override long connectionCheckInterval() { + @Override public long connectionCheckInterval() { return connCheckInterval; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index b4a71c387f7e8..7f80610846f36 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -303,7 +303,7 @@ public int boundPort() throws IgniteSpiException { /** * @return connection check interval. */ - long connectionCheckInterval() { + public long connectionCheckInterval() { return 0; } From 322242a42206004bf73a8dfa9583329eba87b918 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 23 Jun 2020 16:12:04 +0300 Subject: [PATCH 26/26] IGNITE-13012 : removed redundant hasRemoteSrvNodes --- .../org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 8c06eb74ce10b..3fa8939690873 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -6191,17 +6191,12 @@ private void checkMetricsReceiving() { * Check connection to next node in the ring. */ private void checkConnection() { - Boolean hasRemoteSrvNodes = null; - long elapsed = (lastRingMsgSentTime + U.millisToNanos(connCheckInterval)) - System.nanoTime(); if (elapsed > 0) return; - if (hasRemoteSrvNodes == null) - hasRemoteSrvNodes = ring.hasRemoteServerNodes(); - - if (hasRemoteSrvNodes) + if (ring.hasRemoteServerNodes()) sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); }