Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
Expand All @@ -41,6 +42,7 @@
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
Expand All @@ -59,12 +61,14 @@
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
Expand Down Expand Up @@ -175,6 +179,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter
*/
private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();

/** */
private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock();

/** */
private boolean stopped;

/**
* Lock to access/update data and component's state.
*/
Expand Down Expand Up @@ -287,7 +297,7 @@ public DistributedMetaStorageImpl(GridKernalContext ctx) {
finally {
lock.writeLock().unlock();

cancelUpdateFutures();
cancelUpdateFutures(nodeStoppingException(), true);
}
}

Expand Down Expand Up @@ -914,7 +924,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData

ver = INITIAL_VERSION;

cancelUpdateFutures();
cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false);
}
finally {
lock.writeLock().unlock();
Expand All @@ -924,13 +934,28 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData
/**
* Cancel all waiting futures and clear the map.
*/
private void cancelUpdateFutures() {
for (GridFutureAdapter<Boolean> fut : updateFuts.values())
fut.onDone(new IgniteCheckedException("Client was disconnected during the operation."));
private void cancelUpdateFutures(Exception e, boolean stop) {
updateFutsStopLock.writeLock().lock();

try {
stopped = stop;

for (GridFutureAdapter<Boolean> fut : updateFuts.values())
fut.onDone(e);

updateFuts.clear();
updateFuts.clear();
}
finally {
updateFutsStopLock.writeLock().unlock();
}
}

/** */
private static NodeStoppingException nodeStoppingException() {
return new NodeStoppingException("Node is stopping.");
}


/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) {
assert isClient;
Expand Down Expand Up @@ -1033,14 +1058,29 @@ else if (!isClient && ver.id() > 0) {
* @throws IgniteCheckedException If there was an error while sending discovery message.
*/
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
if (!isSupported(ctx))
throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
GridFutureAdapter<?> validationRes = validateBeforeWrite(key);

if (validationRes != null)
return validationRes;

UUID reqId = UUID.randomUUID();

GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

updateFuts.put(reqId, fut);
updateFutsStopLock.readLock().lock();

try {
if (stopped) {
fut.onDone(nodeStoppingException());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this into 'validateBeforeWrite' ?
'validateBeforeWrite' can be executed under read-lock as in normal case it always return null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, thank you!

return fut;
}

updateFuts.put(reqId, fut);
}
finally {
updateFutsStopLock.readLock().unlock();
}

DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);

Expand All @@ -1054,14 +1094,29 @@ private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws Igni
*/
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
throws IgniteCheckedException {
if (!isSupported(ctx))
throw new IgniteCheckedException(NOT_SUPPORTED_MSG);
GridFutureAdapter<Boolean> validationRes = validateBeforeWrite(key);

if (validationRes != null)
return validationRes;

UUID reqId = UUID.randomUUID();

GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

updateFuts.put(reqId, fut);
updateFutsStopLock.readLock().lock();

try {
if (stopped) {
fut.onDone(nodeStoppingException());

return fut;
}

updateFuts.put(reqId, fut);
}
finally {
updateFutsStopLock.readLock().unlock();
}

DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);

Expand All @@ -1070,6 +1125,31 @@ private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte
return fut;
}

/** */
private GridFutureAdapter<Boolean> validateBeforeWrite(String key) throws IgniteCheckedException {
boolean supported;

try {
supported = isSupported(ctx);
Copy link
Member

@AMashenkov AMashenkov Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to validate all the nodes on every operation?
Do we allow old nodes to connect to new grid? If no, then may be we can cache 'supported' flag and skip this check if once found all the nodes support the feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every component does this, changing the approach is out of the scope. And yes, older node can enter the cluster sometimes.

}
catch (Exception e) {
if (X.hasCause(e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();

fut.onDone(nodeStoppingException());

return fut;
}

throw e;
}

if (!supported)
throw new IgniteCheckedException(NOT_SUPPORTED_MSG);

return null;
}

/**
* Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on
* current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason.
Expand Down Expand Up @@ -1119,7 +1199,16 @@ private void onAckMessage(
ClusterNode node,
DistributedMetaStorageUpdateAckMessage msg
) {
GridFutureAdapter<Boolean> fut = updateFuts.remove(msg.requestId());
GridFutureAdapter<Boolean> fut;

updateFutsStopLock.readLock().lock();

try {
fut = updateFuts.remove(msg.requestId());
}
finally {
updateFutsStopLock.readLock().unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readlock looks useless here.
Anyway, you'll finish the future in few lines below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, did that automatically.

}

if (fut != null) {
String errorMsg = msg.errorMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,11 @@ else if (m instanceof HistogramMetric)
opsFut.markInitialized();
opsFut.get();
}
catch (NodeStoppingException ignored) {
// No-op.
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
log.error("Failed to remove metrics configuration.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
Expand All @@ -32,7 +34,9 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
Expand Down Expand Up @@ -133,6 +137,19 @@ public void testSingleNode() throws Exception {
metastorage.remove("key");

assertNull(metastorage.read("key"));

stopGrid(0);

try {
metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS);

fail("Exception is expected");
}
catch (Exception e) {
assertTrue(X.hasCause(e, IgniteCheckedException.class) || X.hasCause(e, IgniteSpiException.class));

assertTrue(e.getMessage().contains("Node is stopping.") || e.getMessage().contains("Node stopped."));
}
}

/**
Expand Down