From b1266dffedaf541a8c55e74694431d6a26859c0e Mon Sep 17 00:00:00 2001 From: ibessonov Date: Tue, 8 Dec 2020 12:30:15 +0300 Subject: [PATCH 1/8] IGNITE-13101 Fixed uncompleted futures leak on node stop in distributed metastorage. --- .../DistributedMetaStorageImpl.java | 71 ++++++++++++++++--- .../DistributedMetaStorageTest.java | 15 ++++ 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 4ffd0caaa7396..1ec0476be3b50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Predicate; @@ -175,6 +176,12 @@ public class DistributedMetaStorageImpl extends GridProcessorAdapter */ private final ConcurrentMap> updateFuts = new ConcurrentHashMap<>(); + /** */ + private final ReadWriteLock updateFutsStopLock = new ReentrantReadWriteLock(); + + /** */ + private boolean stopped; + /** * Lock to access/update data and component's state. */ @@ -287,7 +294,7 @@ public DistributedMetaStorageImpl(GridKernalContext ctx) { finally { lock.writeLock().unlock(); - cancelUpdateFutures(); + cancelUpdateFutures("Node is stopping.", true); } } @@ -914,7 +921,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData ver = INITIAL_VERSION; - cancelUpdateFutures(); + cancelUpdateFutures("Client was disconnected during the operation.", false); } finally { lock.writeLock().unlock(); @@ -923,12 +930,23 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData /** * Cancel all waiting futures and clear the map. + * + * @param msg Error message. */ - private void cancelUpdateFutures() { - for (GridFutureAdapter fut : updateFuts.values()) - fut.onDone(new IgniteCheckedException("Client was disconnected during the operation.")); + private void cancelUpdateFutures(String msg, boolean stop) { + updateFutsStopLock.writeLock().lock(); - updateFuts.clear(); + try { + stopped = stop; + + for (GridFutureAdapter fut : updateFuts.values()) + fut.onDone(new IgniteCheckedException(msg)); + + updateFuts.clear(); + } + finally { + updateFutsStopLock.writeLock().unlock(); + } } /** {@inheritDoc} */ @@ -1040,7 +1058,20 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni GridFutureAdapter fut = new GridFutureAdapter<>(); - updateFuts.put(reqId, fut); + updateFutsStopLock.readLock().lock(); + + try { + if (stopped) { + fut.onDone(new IgniteCheckedException("Node is stopped.")); + + return fut; + } + + updateFuts.put(reqId, fut); + } + finally { + updateFutsStopLock.readLock().unlock(); + } DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes); @@ -1061,7 +1092,20 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte GridFutureAdapter fut = new GridFutureAdapter<>(); - updateFuts.put(reqId, fut); + updateFutsStopLock.readLock().lock(); + + try { + if (stopped) { + fut.onDone(new IgniteCheckedException("Node is stopped.")); + + return fut; + } + + updateFuts.put(reqId, fut); + } + finally { + updateFutsStopLock.readLock().unlock(); + } DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes); @@ -1119,7 +1163,16 @@ private void onAckMessage( ClusterNode node, DistributedMetaStorageUpdateAckMessage msg ) { - GridFutureAdapter fut = updateFuts.remove(msg.requestId()); + GridFutureAdapter fut; + + updateFutsStopLock.readLock().lock(); + + try { + fut = updateFuts.remove(msg.requestId()); + } + finally { + updateFutsStopLock.readLock().unlock(); + } if (fut != null) { String errorMsg = msg.errorMessage(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java index 763a806c3a8e2..9e10288972136 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java @@ -22,8 +22,10 @@ import java.util.Comparator; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -133,6 +135,19 @@ public void testSingleNode() throws Exception { metastorage.remove("key"); assertNull(metastorage.read("key")); + + stopGrid(0); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> { + metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS); + + return null; + }, + IgniteCheckedException.class, + "Node is stopped." + ); } /** From e802099042f55671b12bb0c413652d6ea2e74a16 Mon Sep 17 00:00:00 2001 From: ibessonov Date: Wed, 9 Dec 2020 12:17:53 +0300 Subject: [PATCH 2/8] IGNITE-13101 Zookeeper tests failed. --- .../DistributedMetaStorageTest.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java index 9e10288972136..1948b9bacce7b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java @@ -34,7 +34,9 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; @@ -138,16 +140,16 @@ public void testSingleNode() throws Exception { stopGrid(0); - GridTestUtils.assertThrowsAnyCause( - log, - () -> { - metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS); + try { + metastorage.writeAsync("key", "value").get(10, TimeUnit.SECONDS); - return null; - }, - IgniteCheckedException.class, - "Node is stopped." - ); + fail("Exception is expected"); + } + catch (Exception e) { + assertTrue(X.hasCause(e, IgniteCheckedException.class) || X.hasCause(e, IgniteSpiException.class)); + + assertTrue(e.getMessage().contains("Node is stopped.") || e.getMessage().contains("Node stopped.")); + } } /** From 592c3ebcdcb08cc47ba2a383825a7f14df86407b Mon Sep 17 00:00:00 2001 From: ibessonov Date: Thu, 10 Dec 2020 11:17:08 +0300 Subject: [PATCH 3/8] IGNITE-13101 Cache stop won't throw exception when node is stopping. --- .../ignite/internal/processors/metric/GridMetricManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java index fddf7ff91fab2..c45723b58100e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java @@ -440,7 +440,7 @@ else if (m instanceof HistogramMetric) opsFut.get(); } catch (IgniteCheckedException e) { - throw new IgniteException(e); + log.error("Failed to remove metrics configuration.", e); } } From 07ffeebcc98a430920ede8434763260e74baea3c Mon Sep 17 00:00:00 2001 From: ibessonov Date: Thu, 10 Dec 2020 11:54:57 +0300 Subject: [PATCH 4/8] IGNITE-13101 Better fix? --- .../persistence/DistributedMetaStorageImpl.java | 15 +++++++-------- .../processors/metric/GridMetricManager.java | 3 +++ .../metastorage/DistributedMetaStorageTest.java | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 1ec0476be3b50..117728b7b531b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -294,7 +295,7 @@ public DistributedMetaStorageImpl(GridKernalContext ctx) { finally { lock.writeLock().unlock(); - cancelUpdateFutures("Node is stopping.", true); + cancelUpdateFutures(new NodeStoppingException("Node is stopping."), true); } } @@ -921,7 +922,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData ver = INITIAL_VERSION; - cancelUpdateFutures("Client was disconnected during the operation.", false); + cancelUpdateFutures(new IgniteCheckedException("Client was disconnected during the operation."), false); } finally { lock.writeLock().unlock(); @@ -930,17 +931,15 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData /** * Cancel all waiting futures and clear the map. - * - * @param msg Error message. */ - private void cancelUpdateFutures(String msg, boolean stop) { + private void cancelUpdateFutures(Exception e, boolean stop) { updateFutsStopLock.writeLock().lock(); try { stopped = stop; for (GridFutureAdapter fut : updateFuts.values()) - fut.onDone(new IgniteCheckedException(msg)); + fut.onDone(e); updateFuts.clear(); } @@ -1062,7 +1061,7 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni try { if (stopped) { - fut.onDone(new IgniteCheckedException("Node is stopped.")); + fut.onDone(new NodeStoppingException("Node is stopping.")); return fut; } @@ -1096,7 +1095,7 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte try { if (stopped) { - fut.onDone(new IgniteCheckedException("Node is stopped.")); + fut.onDone(new NodeStoppingException("Node is stopping.")); return fut; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java index c45723b58100e..267b2a11ccab2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java @@ -439,6 +439,9 @@ else if (m instanceof HistogramMetric) opsFut.markInitialized(); opsFut.get(); } + catch (NodeStoppingException ignored) { + // No-op. + } catch (IgniteCheckedException e) { log.error("Failed to remove metrics configuration.", e); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java index 1948b9bacce7b..6e7d0f92f3b46 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java @@ -148,7 +148,7 @@ public void testSingleNode() throws Exception { catch (Exception e) { assertTrue(X.hasCause(e, IgniteCheckedException.class) || X.hasCause(e, IgniteSpiException.class)); - assertTrue(e.getMessage().contains("Node is stopped.") || e.getMessage().contains("Node stopped.")); + assertTrue(e.getMessage().contains("Node is stopping.") || e.getMessage().contains("Node stopped.")); } } From 8fb0221a4b1ed771c576e3d21c0c76196c61b394 Mon Sep 17 00:00:00 2001 From: ibessonov Date: Thu, 10 Dec 2020 11:56:30 +0300 Subject: [PATCH 5/8] IGNITE-13101 Refactoring + that stupid exception in Zookeeper discovery. --- .../DistributedMetaStorageImpl.java | 51 ++++++++++++++++--- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 117728b7b531b..94afc155dd63f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -61,12 +61,14 @@ import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.IgniteNodeValidationResult; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; @@ -295,7 +297,7 @@ public DistributedMetaStorageImpl(GridKernalContext ctx) { finally { lock.writeLock().unlock(); - cancelUpdateFutures(new NodeStoppingException("Node is stopping."), true); + cancelUpdateFutures(nodeStoppingException(), true); } } @@ -948,6 +950,12 @@ private void cancelUpdateFutures(Exception e, boolean stop) { } } + /** */ + private static NodeStoppingException nodeStoppingException() { + return new NodeStoppingException("Node is stopping."); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) { assert isClient; @@ -1050,8 +1058,10 @@ else if (!isClient && ver.id() > 0) { * @throws IgniteCheckedException If there was an error while sending discovery message. */ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws IgniteCheckedException { - if (!isSupported(ctx)) - throw new IgniteCheckedException(NOT_SUPPORTED_MSG); + GridFutureAdapter validationRes = validateBeforeWrite(key); + + if (validationRes != null) + return validationRes; UUID reqId = UUID.randomUUID(); @@ -1061,7 +1071,7 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni try { if (stopped) { - fut.onDone(new NodeStoppingException("Node is stopping.")); + fut.onDone(nodeStoppingException()); return fut; } @@ -1084,8 +1094,10 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni */ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte[] newValBytes) throws IgniteCheckedException { - if (!isSupported(ctx)) - throw new IgniteCheckedException(NOT_SUPPORTED_MSG); + GridFutureAdapter validationRes = validateBeforeWrite(key); + + if (validationRes != null) + return validationRes; UUID reqId = UUID.randomUUID(); @@ -1095,7 +1107,7 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte try { if (stopped) { - fut.onDone(new NodeStoppingException("Node is stopping.")); + fut.onDone(nodeStoppingException()); return fut; } @@ -1113,6 +1125,31 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte return fut; } + /** */ + private GridFutureAdapter validateBeforeWrite(String key) throws IgniteCheckedException { + boolean supported; + + try { + supported = isSupported(ctx); + } + catch (Exception e) { + if (X.hasCause(e, IgniteSpiException.class) && e.getMessage() != null && e.getMessage().contains("Node stopped.")) { + GridFutureAdapter fut = new GridFutureAdapter<>(); + + fut.onDone(nodeStoppingException()); + + return fut; + } + + throw e; + } + + if (!supported) + throw new IgniteCheckedException(NOT_SUPPORTED_MSG); + + return null; + } + /** * Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on * current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason. From d61db3b8fbe3d1898752cb7e2f444287f2a1a07f Mon Sep 17 00:00:00 2001 From: ibessonov Date: Thu, 10 Dec 2020 15:03:49 +0300 Subject: [PATCH 6/8] IGNITE-13101 Fixes after review. --- .../persistence/DistributedMetaStorageImpl.java | 11 ++++++++++- .../metastorage/DistributedMetaStorageTest.java | 5 ++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 94afc155dd63f..f11a2ec20214a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -1125,7 +1125,16 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte return fut; } - /** */ + /** + * This method will perform some preliminary checks before starting write or cas operation. + * + * Tricky part is exception handling from "isSupported" method. It can be thrown by + * {@code ZookeeperDiscoveryImpl#checkState()} method, but we can't just leave it as is. + * There are components that rely on distributed metastorage throwing {@link NodeStoppingException}. + * + * @return Future that must be returned immediately or {@code null}. + * @throws IgniteCheckedException If cluster can't perform this update. + */ private GridFutureAdapter validateBeforeWrite(String key) throws IgniteCheckedException { boolean supported; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java index 6e7d0f92f3b46..8301b5ab50688 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; @@ -146,9 +145,9 @@ public void testSingleNode() throws Exception { fail("Exception is expected"); } catch (Exception e) { - assertTrue(X.hasCause(e, IgniteCheckedException.class) || X.hasCause(e, IgniteSpiException.class)); + assertTrue(X.hasCause(e, NodeStoppingException.class)); - assertTrue(e.getMessage().contains("Node is stopping.") || e.getMessage().contains("Node stopped.")); + assertTrue(e.getMessage().contains("Node is stopping.")); } } From fbceec60538f6d0db55e90b298522e3500dcad62 Mon Sep 17 00:00:00 2001 From: ibessonov Date: Thu, 10 Dec 2020 15:06:13 +0300 Subject: [PATCH 7/8] IGNITE-13101 Imports. --- .../processors/metastorage/DistributedMetaStorageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java index 8301b5ab50688..298e20213b9c2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -33,6 +32,7 @@ import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; From 5af0ace7649f29632d942e15a403ac04f3bfb3bb Mon Sep 17 00:00:00 2001 From: ibessonov Date: Fri, 11 Dec 2020 10:21:46 +0300 Subject: [PATCH 8/8] IGNITE-13101 Code refactored according to review. --- .../DistributedMetaStorageImpl.java | 79 +++++++------------ 1 file changed, 27 insertions(+), 52 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index f11a2ec20214a..880f6454eb610 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -1058,29 +1058,12 @@ else if (!isClient && ver.id() > 0) { * @throws IgniteCheckedException If there was an error while sending discovery message. */ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws IgniteCheckedException { - GridFutureAdapter validationRes = validateBeforeWrite(key); - - if (validationRes != null) - return validationRes; - UUID reqId = UUID.randomUUID(); - GridFutureAdapter fut = new GridFutureAdapter<>(); - - updateFutsStopLock.readLock().lock(); - - try { - if (stopped) { - fut.onDone(nodeStoppingException()); + GridFutureAdapter fut = prepareWriteFuture(key, reqId); - return fut; - } - - updateFuts.put(reqId, fut); - } - finally { - updateFutsStopLock.readLock().unlock(); - } + if (fut.isDone()) + return fut; DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes); @@ -1094,29 +1077,12 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni */ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte[] newValBytes) throws IgniteCheckedException { - GridFutureAdapter validationRes = validateBeforeWrite(key); - - if (validationRes != null) - return validationRes; - UUID reqId = UUID.randomUUID(); - GridFutureAdapter fut = new GridFutureAdapter<>(); + GridFutureAdapter fut = prepareWriteFuture(key, reqId); - updateFutsStopLock.readLock().lock(); - - try { - if (stopped) { - fut.onDone(nodeStoppingException()); - - return fut; - } - - updateFuts.put(reqId, fut); - } - finally { - updateFutsStopLock.readLock().unlock(); - } + if (fut.isDone()) + return fut; DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes); @@ -1127,6 +1093,7 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte /** * This method will perform some preliminary checks before starting write or cas operation. + * It also updates {@link #updateFuts} in case if everything's ok. * * Tricky part is exception handling from "isSupported" method. It can be thrown by * {@code ZookeeperDiscoveryImpl#checkState()} method, but we can't just leave it as is. @@ -1135,7 +1102,7 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte * @return Future that must be returned immediately or {@code null}. * @throws IgniteCheckedException If cluster can't perform this update. */ - private GridFutureAdapter validateBeforeWrite(String key) throws IgniteCheckedException { + private GridFutureAdapter prepareWriteFuture(String key, UUID reqId) throws IgniteCheckedException { boolean supported; try { @@ -1156,7 +1123,24 @@ private GridFutureAdapter validateBeforeWrite(String key) throws Ignite if (!supported) throw new IgniteCheckedException(NOT_SUPPORTED_MSG); - return null; + GridFutureAdapter fut = new GridFutureAdapter<>(); + + updateFutsStopLock.readLock().lock(); + + try { + if (stopped) { + fut.onDone(nodeStoppingException()); + + return fut; + } + + updateFuts.put(reqId, fut); + } + finally { + updateFutsStopLock.readLock().unlock(); + } + + return fut; } /** @@ -1208,16 +1192,7 @@ private void onAckMessage( ClusterNode node, DistributedMetaStorageUpdateAckMessage msg ) { - GridFutureAdapter fut; - - updateFutsStopLock.readLock().lock(); - - try { - fut = updateFuts.remove(msg.requestId()); - } - finally { - updateFutsStopLock.readLock().unlock(); - } + GridFutureAdapter fut = updateFuts.remove(msg.requestId()); if (fut != null) { String errorMsg = msg.errorMessage();