Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,12 @@ private void processAck(UUID from, LatchAckMessage message) {
if (log.isDebugEnabled())
log.debug("Process final ack [latch=" + latchUid + ", from=" + from + "]");

assert serverLatches.containsKey(latchUid) || clientLatches.containsKey(latchUid);
if (!serverLatches.containsKey(latchUid) && !clientLatches.containsKey(latchUid)) {
log.warning("Latch for this acknowledge is completed or never have existed " +
"[latch=" + latchUid + ", from=" + from + "]");

return;
}

if (clientLatches.containsKey(latchUid)) {
ClientLatch latch = clientLatches.get(latchUid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;

import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

Expand All @@ -32,6 +42,102 @@ public class ExchangeLatchManagerTest extends GridCommonAbstractTest {
/** */
private static final String LATCH_NAME = "test";

/** Message are meaning that node getting a stale acknowledge message. */
private static final String STALE_ACK_LOG_MSG = "Latch for this acknowledge is completed or never have existed";

/** Message happens when assertion was broken. */
public static final Pattern ERROR_MSG = Pattern.compile("An error occurred processing the message.*"
+ LatchAckMessage.class.getSimpleName());

/** Grid logger. */
public ListeningTestLogger gridLogger;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setGridLogger(gridLogger)
.setCommunicationSpi(new TestRecordingCommunicationSpi());
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

stopAllGrids();
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids();
}

/**
* Checks reaction of latch on stale acknowledge from new coordinator.
*
* @throws Exception If failed.
*/
@Test
public void testExcessAcknowledgeForNewCoordinator() throws Exception {
gridLogger = new ListeningTestLogger(log);

LogListener staleMessageLsnr = LogListener.matches(STALE_ACK_LOG_MSG).build();
LogListener errorLsnr = LogListener.matches(ERROR_MSG).build();

IgniteEx ignite0 = startGrids(3);

awaitPartitionMapExchange();

TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(ignite0);

spi0.blockMessages((node, msg) ->
msg instanceof LatchAckMessage && node.order() == 2);

spi0.record((node, msg) ->
msg instanceof LatchAckMessage && node.order() == 3);

Ignite ignite1 = G.allGrids().stream().filter(node -> node.cluster().localNode().order() == 2).findAny().get();

assertNotNull("Could not find node with second order.", ignite1);

TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite1);

spi1.blockMessages((node, msg) -> {
if (msg instanceof LatchAckMessage && node.order() == 3) {
LatchAckMessage ack = (LatchAckMessage)msg;

return ack.topVer().topologyVersion() == 3;
}

return false;
});

IgniteInternalFuture exchangeDoingFut = GridTestUtils.runAsync(() ->
ignite0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
);

spi0.waitForBlocked();
spi0.waitForRecorded();

ignite0.close();

spi1.waitForBlocked();

awaitPartitionMapExchange();

assertTrue(exchangeDoingFut.isDone());

gridLogger.registerAllListeners(errorLsnr, staleMessageLsnr);

spi1.stopBlock();

assertTrue(GridTestUtils.waitForCondition(() ->
staleMessageLsnr.check(), 10_000));

assertFalse(errorLsnr.check());
}

/**
* @throws Exception If failed.
*/
Expand Down