diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
index 08a4acbb1cdf2..0af75e51d29dd 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java
@@ -49,7 +49,7 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContextImpl;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
+import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
@@ -336,7 +336,7 @@ protected void createTestSnapshot() throws Exception {
for (String snpName : Arrays.asList(SNAPSHOT_WITH_HOLES, SNAPSHOT_WITHOUT_HOLES)) {
snp(ignite).createSnapshot(snpName, null, false, onlyPrimary).get(TIMEOUT);
- IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(snpName, null)
+ IdleVerifyResult res = ignite.context().cache().context().snapshotMgr().checkSnapshot(snpName, null)
.get().idleVerifyResult();
StringBuilder b = new StringBuilder();
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckIncrementalSnapshotTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckIncrementalSnapshotTest.java
index cbee596138c41..21fe7d4091304 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckIncrementalSnapshotTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckIncrementalSnapshotTest.java
@@ -29,7 +29,7 @@
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
+import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
@@ -378,7 +378,7 @@ public void atomicCachesAreSkippedDuringTheCheck() throws Exception {
grid(0).snapshot().restoreSnapshot(atomicSnp, null, 1).get();
- IdleVerifyResultV2 idleVerRes = idleVerify(grid(0), CACHE, atomicCache);
+ IdleVerifyResult idleVerRes = idleVerify(grid(0), CACHE, atomicCache);
idleVerRes.print(System.out::println, true);
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index ae6d68b2bb866..024cb3f154e28 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -84,7 +84,7 @@
import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
import org.apache.ignite.internal.management.cache.FindAndDeleteGarbageInPersistenceTaskResult;
import org.apache.ignite.internal.management.cache.IdleVerifyDumpTask;
-import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
import org.apache.ignite.internal.management.tx.TxInfo;
import org.apache.ignite.internal.management.tx.TxTaskResult;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -743,7 +743,7 @@ public void testIdleVerifyOnInactiveClusterWithPersistence() throws Exception {
assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "idle_verify"));
- assertContains(log, testOut.toString(), VerifyBackupPartitionsTaskV2.IDLE_VERIFY_ON_INACTIVE_CLUSTER_ERROR_MESSAGE);
+ assertContains(log, testOut.toString(), VerifyBackupPartitionsTask.IDLE_VERIFY_ON_INACTIVE_CLUSTER_ERROR_MESSAGE);
assertContains(log, testOut.toString(), "Failed to perform operation");
srv.cluster().state(ACTIVE);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheFilterEnum.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheFilterEnum.java
index 08ace4012f79d..db362918c65c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheFilterEnum.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheFilterEnum.java
@@ -22,7 +22,7 @@
/**
* Represents a type of cache(s) that can be used for comparing update counters and checksums between primary and backup partitions.
*
- * @see VerifyBackupPartitionsTaskV2
+ * @see VerifyBackupPartitionsTask
*/
public enum CacheFilterEnum {
/** Default - user only, or all caches specified by name. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheIdleVerifyCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheIdleVerifyCommand.java
index 867d37b1628fd..4dd5971729723 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheIdleVerifyCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/CacheIdleVerifyCommand.java
@@ -36,8 +36,8 @@
/** Checks consistency of primary and backup partitions assuming no concurrent updates are happening in the cluster. */
@CliSubcommandsWithPrefix
public class CacheIdleVerifyCommand
- extends CommandRegistryImpl
- implements ComputeCommand {
+ extends CommandRegistryImpl
+ implements ComputeCommand {
/** */
public static final String IDLE_VERIFY_FILE_PREFIX = "idle_verify-";
@@ -72,7 +72,7 @@ public CacheIdleVerifyCommand() {
}
/** {@inheritDoc} */
- @Override public void printResult(CacheIdleVerifyCommandArg arg, IdleVerifyResultV2 res, Consumer printer) {
+ @Override public void printResult(CacheIdleVerifyCommandArg arg, IdleVerifyResult res, Consumer printer) {
logParsedArgs(arg, printer);
StringBuilder sb = new StringBuilder();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyException.java
index 5f4b430b9a5de..e54fe3ad2a0b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyException.java
@@ -23,7 +23,7 @@
import org.apache.ignite.internal.util.typedef.F;
/**
- * This exception is used to collect exceptions occured in {@link VerifyBackupPartitionsTaskV2} execution.
+ * This exception is used to collect exceptions occured in {@link VerifyBackupPartitionsTask} execution.
*/
public class IdleVerifyException extends IgniteException {
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyResultV2.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyResult.java
similarity index 75%
rename from modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyResultV2.java
rename to modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyResult.java
index f6dfb7afb7a83..0451a6ab60d38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyResultV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyResult.java
@@ -45,9 +45,9 @@
import static org.apache.ignite.internal.util.IgniteUtils.nl;
/**
- * Encapsulates result of {@link VerifyBackupPartitionsTaskV2}.
+ * Encapsulates result of {@link VerifyBackupPartitionsTask}.
*/
-public class IdleVerifyResultV2 extends VisorDataTransferObject {
+public class IdleVerifyResult extends VisorDataTransferObject {
/** */
private static final long serialVersionUID = 0L;
@@ -82,36 +82,24 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
/**
* Default constructor for Externalizable.
*/
- public IdleVerifyResultV2() {
+ public IdleVerifyResult() {
+ // No-op.
}
/**
- * @param exceptions Occurred exceptions.
+ * @see Builder
*/
- public IdleVerifyResultV2(Map exceptions) {
+ private IdleVerifyResult(Map exceptions) {
this.exceptions = exceptions;
}
/**
- * @param txHashConflicts Transaction hashes conflicts.
+ * @see Builder
*/
- public IdleVerifyResultV2(
+ private IdleVerifyResult(
Map> clusterHashes,
@Nullable List> txHashConflicts,
- @Nullable Map> partiallyCommittedTxs
- ) {
- this(clusterHashes, Collections.emptyMap());
-
- this.txHashConflicts = txHashConflicts;
- this.partiallyCommittedTxs = partiallyCommittedTxs;
- }
-
- /**
- * @param clusterHashes Map of cluster partition hashes.
- * @param exceptions Exceptions on each cluster node.
- */
- public IdleVerifyResultV2(
- Map> clusterHashes,
+ @Nullable Map> partiallyCommittedTxs,
Map exceptions
) {
for (Map.Entry> e : clusterHashes.entrySet()) {
@@ -151,6 +139,8 @@ public IdleVerifyResultV2(
}
this.exceptions = exceptions;
+ this.txHashConflicts = txHashConflicts;
+ this.partiallyCommittedTxs = partiallyCommittedTxs;
}
/** {@inheritDoc} */
@@ -390,7 +380,7 @@ private void printConflicts(Consumer printer) {
if (o == null || getClass() != o.getClass())
return false;
- IdleVerifyResultV2 v2 = (IdleVerifyResultV2)o;
+ IdleVerifyResult v2 = (IdleVerifyResult)o;
return Objects.equals(cntrConflicts, v2.cntrConflicts) && Objects.equals(hashConflicts, v2.hashConflicts) &&
Objects.equals(movingPartitions, v2.movingPartitions) && Objects.equals(lostPartitions, v2.lostPartitions) &&
@@ -412,6 +402,135 @@ private void printConflicts(Consumer printer) {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(IdleVerifyResultV2.class, this);
+ return S.toString(IdleVerifyResult.class, this);
+ }
+
+ /** @return A fresh result builder. */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder of {@link IdleVerifyResult}. Is not thread-safe. */
+ public static final class Builder {
+ /** */
+ private @Nullable Map> partHashes;
+
+ /** */
+ private @Nullable List> txHashConflicts;
+
+ /** */
+ private @Nullable Map> partiallyCommittedTxs;
+
+ /** */
+ private Map exceptions;
+
+ /** */
+ private Builder() {
+ // No-op.
+ }
+
+ /** Build the final result. */
+ public IdleVerifyResult build() {
+ if (partHashes == null)
+ partHashes = Collections.emptyMap();
+
+ if (exceptions == null)
+ exceptions = Collections.emptyMap();
+
+ return new IdleVerifyResult(partHashes, txHashConflicts, partiallyCommittedTxs, exceptions);
+ }
+
+ /** Stores an exception if none is assigned for {@code node}. */
+ public Builder addException(ClusterNode node, Exception e) {
+ assert e != null;
+
+ if (exceptions == null)
+ exceptions = new HashMap<>();
+
+ exceptions.putIfAbsent(node, e);
+
+ return this;
+ }
+
+ /** Sets the exceptions. */
+ public Builder exceptions(Map exceptions) {
+ assert this.exceptions == null : "Exceptions are already set.";
+ assert exceptions != null;
+
+ this.exceptions = exceptions;
+
+ return this;
+ }
+
+ /** Stores a collection of partition hashes for partition key {@code key}. */
+ private Builder addPartitionHashes(PartitionKeyV2 key, Collection newHashes) {
+ if (partHashes == null)
+ partHashes = new HashMap<>();
+
+ partHashes.compute(key, (key0, hashes0) -> {
+ if (hashes0 == null)
+ hashes0 = new ArrayList<>();
+
+ hashes0.addAll(newHashes);
+
+ return hashes0;
+ });
+
+ return this;
+ }
+
+ /** Stores a partition hashes map. */
+ public void addPartitionHashes(Map newHashes) {
+ newHashes.forEach((key, hash) -> addPartitionHashes(key, Collections.singletonList(hash)));
+ }
+
+ /** Stores a single partition hash for partition key {@code key}. */
+ public Builder addPartitionHash(PartitionKeyV2 key, PartitionHashRecordV2 newHash) {
+ addPartitionHashes(key, Collections.singletonList(newHash));
+
+ return this;
+ }
+
+ /** Sets partition hashes. */
+ public Builder partitionHashes(Map> partHashes) {
+ assert F.isEmpty(this.partHashes) : "Partition hashes are already set.";
+ assert partHashes != null;
+
+ this.partHashes = partHashes;
+
+ return this;
+ }
+
+ /** Adds transaction conflicts. */
+ public Builder addTxConflicts(List newTxConflicts) {
+ if (txHashConflicts == null)
+ txHashConflicts = new ArrayList<>();
+
+ txHashConflicts.add(newTxConflicts);
+
+ return this;
+ }
+
+ /** Adds partially commited transactions. */
+ public Builder addPartiallyCommited(ClusterNode node, Collection newVerisons) {
+ if (partiallyCommittedTxs == null)
+ partiallyCommittedTxs = new HashMap<>();
+
+ partiallyCommittedTxs.compute(node, (node0, versions0) -> {
+ if (versions0 == null)
+ versions0 = new ArrayList<>();
+
+ versions0.addAll(newVerisons);
+
+ return versions0;
+ });
+
+ return this;
+ }
+
+ /** @return {@code True} if any error is stopre. {@code False} otherwise. */
+ public boolean hasErrors() {
+ return !F.isEmpty(exceptions);
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyTaskV2.java
index 1e03a864e0ecf..250757df90ea8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyTaskV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyTaskV2.java
@@ -26,15 +26,15 @@
* Task to verify checksums of backup partitions.
*/
@GridInternal
-public class IdleVerifyTaskV2 extends VisorOneNodeTask {
+public class IdleVerifyTaskV2 extends VisorOneNodeTask {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected VisorJob job(CacheIdleVerifyCommandArg arg) {
+ @Override protected VisorJob job(CacheIdleVerifyCommandArg arg) {
if (!ignite.context().state().publicApiActiveState(true))
- throw new IgniteException(VerifyBackupPartitionsTaskV2.IDLE_VERIFY_ON_INACTIVE_CLUSTER_ERROR_MESSAGE);
+ throw new IgniteException(VerifyBackupPartitionsTask.IDLE_VERIFY_ON_INACTIVE_CLUSTER_ERROR_MESSAGE);
- return new IdleVerifyJob<>(arg, debug, VerifyBackupPartitionsTaskV2.class);
+ return new IdleVerifyJob<>(arg, debug, VerifyBackupPartitionsTask.class);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/NoMatchingCachesException.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/NoMatchingCachesException.java
index 323c75992f441..4aa9bf269d737 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/NoMatchingCachesException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/NoMatchingCachesException.java
@@ -19,7 +19,7 @@
import org.apache.ignite.IgniteException;
/**
- * Runtime exception that can be thrown in {@link VerifyBackupPartitionsTaskV2} when no caches matching given
+ * Runtime exception that can be thrown in {@link VerifyBackupPartitionsTask} when no caches matching given
* filter options can be found.
*/
public class NoMatchingCachesException extends IgniteException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsDumpTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsDumpTask.java
index 7ac73ad9e88e8..8e4711116d2d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsDumpTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsDumpTask.java
@@ -64,7 +64,7 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter> partitions,
- IdleVerifyResultV2 conflictRes,
+ IdleVerifyResult conflictRes,
int skippedRecords
) throws IgniteException {
String wd = ignite.configuration().getWorkDirectory();
@@ -204,7 +204,7 @@ private String writeHashes(
/** */
private void writeResult(
Map> partitions,
- IdleVerifyResultV2 conflictRes,
+ IdleVerifyResult conflictRes,
int skippedRecords,
PrintWriter writer
) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsTask.java
similarity index 96%
rename from modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsTaskV2.java
rename to modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsTask.java
index 048b39020b7a5..8777823695081 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsTaskV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/VerifyBackupPartitionsTask.java
@@ -52,7 +52,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotChecker;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -76,13 +75,13 @@
*
* Argument: Set of cache names, 'null' will trigger verification for all caches.
*
- * Result: {@link IdleVerifyResultV2} with conflict partitions.
+ * Result: {@link IdleVerifyResult} with conflict partitions.
*
* Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being
* concurrently updated.
*/
@GridInternal
-public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter {
+public class VerifyBackupPartitionsTask extends ComputeTaskAdapter {
/** First version of Ignite that is capable of executing Idle Verify V2. */
public static final IgniteProductVersion V2_SINCE_VER = IgniteProductVersion.fromString("2.5.3");
@@ -111,7 +110,7 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter results) throws IgniteException {
+ @Nullable @Override public IdleVerifyResult reduce(List results) throws IgniteException {
return reduce0(results);
}
@@ -144,21 +143,20 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter results) {
- Map ex = new HashMap<>();
- Map> hashes = new HashMap<>();
+ public static IdleVerifyResult reduce0(List results) {
+ IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
for (ComputeJobResult res : results) {
if (res.getException() != null) {
- ex.put(res.getNode(), res.getException());
+ bldr.addException(res.getNode(), res.getException());
continue;
}
- hashes.put(res.getNode(), res.getData());
+ bldr.addPartitionHashes(res.getData());
}
- return SnapshotChecker.reduceHashesResults(hashes, ex);
+ return bldr.build();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index f83ce0e595021..0e5225029edbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -99,7 +99,7 @@
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DistributedConfigurationUtils;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
+import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
@@ -1840,7 +1840,7 @@ public IgniteFuture cancelLocalRestoreTask(String name) {
* @param name Snapshot name.
* @param snpPath Snapshot directory path.
* @return Future with the result of execution snapshot partitions verify task, which besides calculating partition
- * hashes of {@link IdleVerifyResultV2} also contains the snapshot metadata distribution across the cluster.
+ * hashes of {@link IdleVerifyResult} also contains the snapshot metadata distribution across the cluster.
*/
public IgniteInternalFuture checkSnapshot(String name, @Nullable String snpPath) {
return checkSnapshot(name, snpPath, -1);
@@ -1853,7 +1853,7 @@ public IgniteInternalFuture checkSnapshot(String
* @param snpPath Snapshot directory path.
* @param incIdx Incremental snapshot index.
* @return Future with the result of execution snapshot partitions verify task, which besides calculating partition
- * hashes of {@link IdleVerifyResultV2} also contains the snapshot metadata distribution across the cluster.
+ * hashes of {@link IdleVerifyResult} also contains the snapshot metadata distribution across the cluster.
*/
public IgniteInternalFuture checkSnapshot(String name, @Nullable String snpPath, int incIdx) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
@@ -1884,7 +1884,7 @@ public IgniteInternalFuture checkSnapshot(String
* @param incIdx Incremental snapshot index.
* @param check If {@code true} check snapshot integrity.
* @return Future with the result of execution snapshot partitions verify task, which besides calculating partition
- * hashes of {@link IdleVerifyResultV2} also contains the snapshot metadata distribution across the cluster.
+ * hashes of {@link IdleVerifyResult} also contains the snapshot metadata distribution across the cluster.
*/
public IgniteInternalFuture checkSnapshot(
String name,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index 130d21c79f21f..ca58e1126e350 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -18,16 +18,20 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -37,7 +41,7 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
+import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
@@ -146,50 +150,136 @@ private IgniteInternalFuture> reduceValidatePartsAndFinish(
if (clusterOpFut == null)
return new GridFinishedFuture<>();
- assert results.values().stream().noneMatch(res -> res != null && res.metas != null);
+ if (ctx.req.incrementalIndex() > 0)
+ reduceIncrementalResults(ctx.req.nodes(), ctx.clusterMetas, results, errors, clusterOpFut);
+ else if (ctx.req.allRestoreHandlers())
+ reduceCustomHandlersResults(ctx, results, errors, clusterOpFut);
+ else
+ reducePartitionsHashesResults(ctx.clusterMetas, results, errors, clusterOpFut);
+ return new GridFinishedFuture<>();
+ }
+
+ /** */
+ private void reduceIncrementalResults(
+ Set requiredNodes,
+ Map> clusterMetas,
+ Map results,
+ Map errors,
+ GridFutureAdapter fut
+ ) {
SnapshotChecker checker = kctx.cache().context().snapshotMgr().checker();
- if (ctx.req.incrementalIndex() > 0) {
- IdleVerifyResultV2 chkRes = checker.reduceIncrementalResults(
- mapResults(results, ctx.req.nodes(), SnapshotCheckResponse::incrementalResult),
- mapErrors(errors)
- );
+ Map> reduced = new HashMap<>();
- clusterOpFut.onDone(new SnapshotPartitionsVerifyResult(ctx.clusterMetas, chkRes));
+ for (Map.Entry resEntry : results.entrySet()) {
+ UUID nodeId = resEntry.getKey();
+
+ SnapshotCheckResponse incResp = resEntry.getValue();
+
+ if (incResp == null || !requiredNodes.contains(nodeId))
+ continue;
+
+ ClusterNode node = kctx.cluster().get().node(nodeId);
+
+ Map incRes = incResp.result();
+
+ incRes.forEach((consId, res) -> reduced.computeIfAbsent(node, nid -> new ArrayList<>()).add(res));
+
+ if (F.isEmpty(incResp.exceptions()))
+ continue;
+
+ errors.putIfAbsent(nodeId, asException(F.firstValue(incResp.exceptions())));
}
- else if (ctx.req.allRestoreHandlers()) {
- try {
- if (!errors.isEmpty())
- throw F.firstValue(errors);
- Map>> cstRes = mapResults(results, ctx.req.nodes(),
- SnapshotCheckResponse::customHandlersResults);
+ IdleVerifyResult chkRes = checker.reduceIncrementalResults(reduced, mapErrors(errors));
+
+ fut.onDone(new SnapshotPartitionsVerifyResult(clusterMetas, chkRes));
+ }
+
+ /** */
+ private void reduceCustomHandlersResults(
+ SnapshotCheckContext ctx,
+ Map results,
+ Map errors,
+ GridFutureAdapter fut
+ ) {
+ try {
+ if (!errors.isEmpty())
+ throw F.firstValue(errors);
- checker.checkCustomHandlersResults(ctx.req.snapshotName(), cstRes);
+ SnapshotChecker snpChecker = kctx.cache().context().snapshotMgr().checker();
- clusterOpFut.onDone(new SnapshotPartitionsVerifyResult(ctx.clusterMetas, null));
- }
- catch (Throwable err) {
- clusterOpFut.onDone(err);
+ // Check responses: checking node -> snapshot part's consistent id -> handler name -> handler result.
+ Map>>> reduced = new HashMap<>();
+
+ for (Map.Entry respEntry : results.entrySet()) {
+ SnapshotCheckResponse nodeResp = respEntry.getValue();
+
+ if (nodeResp == null)
+ continue;
+
+ if (!F.isEmpty(nodeResp.exceptions()))
+ throw F.firstValue(nodeResp.exceptions());
+
+ UUID nodeId = respEntry.getKey();
+
+ Map>> cstHndRes = nodeResp.result();
+
+ cstHndRes.forEach((consId, respPerConsIdMap) -> {
+ // Reduced map of the handlers results per snapshot part's consistent id for certain node.
+ Map