diff --git a/server/jetstream_cluster_long_test.go b/server/jetstream_cluster_long_test.go index 3fb8930cbd8..869d57955e7 100644 --- a/server/jetstream_cluster_long_test.go +++ b/server/jetstream_cluster_long_test.go @@ -20,6 +20,7 @@ package server import ( "fmt" "math/rand" + "strings" "testing" "time" @@ -183,3 +184,300 @@ func TestLongKVPutWithServerRestarts(t *testing.T) { ) } } + +// This is a RaftChainOfBlocks test that randomly starts and stops nodes to exercise recovery and snapshots. +func TestLongNRGChainOfBlocks(t *testing.T) { + const ( + ClusterSize = 3 + GroupSize = 3 + ConvergenceTimeout = 30 * time.Second + Duration = 10 * time.Minute + PrintStateInterval = 3 * time.Second + ) + + // Create cluster + c := createJetStreamClusterExplicit(t, "Test", ClusterSize) + defer c.shutdown() + + rg := c.createRaftGroup("ChainOfBlocks", GroupSize, newRaftChainStateMachine) + rg.waitOnLeader() + + // Available operations + type TestOperation string + const ( + StopOne TestOperation = "Stop one active node" + StopAll = "Stop all active nodes" + RestartOne = "Restart one stopped node" + RestartAll = "Restart all stopped nodes" + Snapshot = "Snapshot one active node" + Propose = "Propose a value via one active node" + ProposeLeader = "Propose a value via leader" + Pause = "Let things run undisturbed for a while" + Check = "Wait for nodes to converge" + ) + + // Weighted distribution of operations, one is randomly chosen from this vector in each iteration + opsWeighted := []TestOperation{ + StopOne, + StopOne, + StopOne, + StopAll, + StopAll, + RestartOne, + RestartOne, + RestartOne, + RestartAll, + RestartAll, + RestartAll, + Snapshot, + Snapshot, + Propose, + Propose, + Propose, + Propose, + ProposeLeader, + ProposeLeader, + ProposeLeader, + ProposeLeader, + Pause, + Pause, + Pause, + Pause, + Check, + Check, + Check, + Check, + } + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + // Chose a node from the list (and remove it) + pickRandomNode := func(nodes []stateMachine) ([]stateMachine, stateMachine) { + if len(nodes) == 0 { + // Input list is empty + return nodes, nil + } + // Pick random node + i := rng.Intn(len(nodes)) + node := nodes[i] + // Move last element in its place + nodes[i] = nodes[len(nodes)-1] + // Return slice excluding last element + return nodes[:len(nodes)-1], node + } + + // Create summary status string for all replicas + chainStatusString := func() string { + b := strings.Builder{} + for _, sm := range rg { + csm := sm.(*RCOBStateMachine) + running, blocksCount, blockHash := csm.getCurrentHash() + if running { + b.WriteString( + fmt.Sprintf( + " [%s (%s): %d blocks, hash=%s],", + csm.server().Name(), + csm.node().ID(), + blocksCount, + blockHash, + ), + ) + } else { + b.WriteString( + fmt.Sprintf( + " [%s (%s): STOPPED],", + csm.server().Name(), + csm.node().ID(), + ), + ) + + } + } + return b.String() + } + + // Track the highest number of blocks applied by any of the replicas + highestBlocksCount := uint64(0) + + // Track active and stopped nodes + activeNodes := make([]stateMachine, 0, GroupSize) + stoppedNodes := make([]stateMachine, 0, GroupSize) + + // Initially all nodes are active + activeNodes = append(activeNodes, rg...) + + defer func() { + t.Logf("Final state: %s", chainStatusString()) + }() + + printStateTicker := time.NewTicker(PrintStateInterval) + testTimer := time.NewTimer(Duration) + start := time.Now() + iteration := 0 + + for { + + iteration++ + select { + case <-printStateTicker.C: + t.Logf( + "[%s] State: %s", + time.Since(start).Round(time.Second), + chainStatusString(), + ) + case <-testTimer.C: + // Test completed + return + default: + // Continue + } + + // Choose a random operation to perform in this iteration + nextOperation := opsWeighted[rng.Intn(len(opsWeighted))] + if RCOBOptions.verbose { + t.Logf("Iteration %d: %s", iteration, nextOperation) + } + + switch nextOperation { + + case StopOne: + // Stop an active node (if any are left active) + var n stateMachine + activeNodes, n = pickRandomNode(activeNodes) + if n != nil { + n.stop() + stoppedNodes = append(stoppedNodes, n) + } + + case StopAll: + // Stop all active nodes (if any are active) + for _, node := range activeNodes { + node.stop() + } + stoppedNodes = append(stoppedNodes, activeNodes...) + activeNodes = make([]stateMachine, 0, GroupSize) + + case RestartOne: + // Restart a stopped node (if any are stopped) + var n stateMachine + stoppedNodes, n = pickRandomNode(stoppedNodes) + if n != nil { + n.restart() + activeNodes = append(activeNodes, n) + } + + case RestartAll: + // Restart all stopped nodes (if any) + for _, node := range stoppedNodes { + node.restart() + } + activeNodes = append(activeNodes, stoppedNodes...) + stoppedNodes = make([]stateMachine, 0, GroupSize) + + case Snapshot: + // Choose a random active node and tell it to create a snapshot + if len(activeNodes) > 0 { + n := activeNodes[rng.Intn(len(activeNodes))] + n.(*RCOBStateMachine).createSnapshot() + } + + case Propose: + // Make an active node propose the next block (if any nodes are active) + if len(activeNodes) > 0 { + n := activeNodes[rng.Intn(len(activeNodes))] + n.(*RCOBStateMachine).proposeBlock() + } + + case ProposeLeader: + // Make the leader propose the next block (if a leader is active) + leader := rg.leader() + if leader != nil { + leader.(*RCOBStateMachine).proposeBlock() + } + + case Pause: + // Noop, let things run undisturbed for a little bit + time.Sleep(time.Duration(rng.Intn(250)) * time.Millisecond) + + case Check: + // Restart any stopped node + for _, node := range stoppedNodes { + node.restart() + } + activeNodes = append(activeNodes, stoppedNodes...) + stoppedNodes = make([]stateMachine, 0, GroupSize) + + // Ensure all nodes (eventually) converge + checkFor(t, ConvergenceTimeout, 250*time.Millisecond, + func() error { + referenceNode := rg[0] + // Save block count and hash of first node as reference + _, referenceBlocksCount, referenceHash := referenceNode.(*RCOBStateMachine).getCurrentHash() + + // Compare each node against reference + for _, n := range rg { + sm := n.(*RCOBStateMachine) + running, blocksCount, blockHash := sm.getCurrentHash() + if !running { + return fmt.Errorf( + "node not running: %s (%s)", + sm.server().Name(), + sm.node().ID(), + ) + } + + // Track the highest block delivered by any node + if blocksCount > highestBlocksCount { + if RCOBOptions.verbose { + t.Logf( + "New highest blocks count: %d (%s (%s))", + blocksCount, + sm.s.Name(), + sm.n.ID(), + ) + } + highestBlocksCount = blocksCount + } + + // Each replica must match the reference node + + if blocksCount != referenceBlocksCount { + return fmt.Errorf( + "different number of blocks %d (%s (%s) vs. %d (%s (%s))", + blocksCount, + sm.server().Name(), + sm.node().ID(), + referenceBlocksCount, + referenceNode.server().Name(), + referenceNode.node().ID(), + ) + } else if blockHash != referenceHash { + return fmt.Errorf( + "different hash after %d blocks %s (%s (%s) vs. %s (%s (%s))", + blocksCount, + blockHash, + sm.server().Name(), + sm.node().ID(), + referenceHash, + referenceNode.server().Name(), + referenceNode.node().ID(), + ) + } + } + + // Verify consistency check was against the highest block known + if referenceBlocksCount < highestBlocksCount { + return fmt.Errorf( + "nodes converged below highest known block count: %d: %s", + highestBlocksCount, + chainStatusString(), + ) + } + + // All nodes reached the same state, check passed + return nil + }, + ) + } + } +} diff --git a/server/raft_chain_of_blocks_helpers_test.go b/server/raft_chain_of_blocks_helpers_test.go new file mode 100644 index 00000000000..018a123352f --- /dev/null +++ b/server/raft_chain_of_blocks_helpers_test.go @@ -0,0 +1,369 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Raft Chain of Blocks (RCOB) is a replicated state machine on top of Raft used to test the latter. +// Each value ("block") proposed to raft is a simple array of bytes (stateless, can be small or large). +// Each replica state consists of a hash of all blocks delivered so far. +// This makes it easy to check consistency over time. +// If all replicas deliver the same blocks in the same order (the whole point of Raft!), then the N-th hash on each +// replica should be identical. This invariant is easy to verify in test settings, because if at any point a replica +// delivered (hashed) a different value, then all subsequent hashes will diverge from the rest. + +package server + +import ( + "encoding" + "encoding/json" + "fmt" + "hash" + "hash/crc32" + "math/rand" + "sync" +) + +// Static options (but may be useful to tweak when debugging) +var RCOBOptions = struct { + verbose bool + // Proposed block [data] size is random between 1 and maxBlockSize + maxBlockSize int + // If set to true, the state machine will not take snapshots while in recovery + // (corresponding to the 'ready' state machine member variable) + safeSnapshots bool +}{ + false, + 10240, + true, +} + +// Simple state machine on top of Raft. +// Each value delivered is hashed, N-th value hash should match on all replicas or something went wrong. +// If even a single value in the chain of hash differs on one replica, hashes will diverge. +type RCOBStateMachine struct { + sync.Mutex + s *Server + n RaftNode + cfg *RaftConfig + wg sync.WaitGroup + leader bool + proposalSequence uint64 + rng *rand.Rand + hash hash.Hash + blocksApplied uint64 + blocksAppliedSinceSnapshot uint64 + stopped bool + ready bool +} + +func (sm *RCOBStateMachine) waitGroup() *sync.WaitGroup { + sm.Lock() + defer sm.Unlock() + return &sm.wg +} + +// RCOBBlock is the structure of values replicated through Raft. +// Block data is a random array of bytes. +// Additional proposer metadata is present for debugging and for further ordering invariant checks (but not included +// in the hash). +type RCOBBlock struct { + Proposer string + ProposerSequence uint64 + Data []byte +} + +// logDebug is for fine-grained event logging, useful for debugging but default off +func (sm *RCOBStateMachine) logDebug(format string, args ...any) { + if RCOBOptions.verbose { + fmt.Printf("["+sm.s.Name()+" ("+sm.n.ID()+")] "+format+"\n", args...) + } +} + +func (sm *RCOBStateMachine) server() *Server { + sm.Lock() + defer sm.Unlock() + return sm.s +} + +func (sm *RCOBStateMachine) node() RaftNode { + sm.Lock() + defer sm.Unlock() + return sm.n +} + +func (sm *RCOBStateMachine) propose(data []byte) { + sm.Lock() + defer sm.Unlock() + if !sm.ready { + sm.logDebug("Refusing to propose during recovery") + } + err := sm.n.ForwardProposal(data) + if err != nil { + sm.logDebug("block proposal error: %s", err) + } +} + +func (sm *RCOBStateMachine) applyEntry(ce *CommittedEntry) { + sm.Lock() + defer sm.Unlock() + if ce == nil { + // A nil entry signals that the previous recovery backlog is over + sm.logDebug("Recovery complete") + sm.ready = true + return + } + sm.logDebug("Apply entries #%d (%d entries)", ce.Index, len(ce.Entries)) + for _, entry := range ce.Entries { + if entry.Type == EntryNormal { + sm.applyBlock(entry.Data) + } else if entry.Type == EntrySnapshot { + sm.loadSnapshot(entry.Data) + } else { + panic(fmt.Sprintf("[%s] unknown entry type: %s", sm.s.Name(), entry.Type)) + } + } + // Signal to the node that entries were applied + sm.n.Applied(ce.Index) +} + +func (sm *RCOBStateMachine) leaderChange(isLeader bool) { + if sm.leader && !isLeader { + sm.logDebug("Leader change: no longer leader") + } else if sm.leader && isLeader { + sm.logDebug("Elected leader while already leader") + } else if !sm.leader && isLeader { + sm.logDebug("Leader change: i am leader") + } else { + sm.logDebug("Leader change") + } + sm.leader = isLeader + if isLeader != sm.node().Leader() { + sm.logDebug("⚠️ Leader state out of sync with underlying node") + } +} + +func (sm *RCOBStateMachine) stop() { + sm.node().Stop() + sm.node().WaitForStop() + sm.waitGroup().Wait() + + // Clear state, on restart it will be recovered from snapshot or peers + sm.Lock() + defer sm.Unlock() + + sm.stopped = true + sm.blocksApplied = 0 + sm.hash.Reset() + sm.leader = false + sm.logDebug("Stopped") +} + +func (sm *RCOBStateMachine) restart() { + sm.Lock() + defer sm.Unlock() + + sm.logDebug("Restarting") + + sm.stopped = false + sm.ready = false + if sm.n.State() != Closed { + return + } + + // The filestore is stopped as well, so need to extract the parts to recreate it. + rn := sm.n.(*raft) + fs := rn.wal.(*fileStore) + + var err error + sm.cfg.Log, err = newFileStore(fs.fcfg, fs.cfg.StreamConfig) + if err != nil { + panic(err) + } + sm.n, err = sm.s.startRaftNode(globalAccountName, sm.cfg, pprofLabels{}) + if err != nil { + panic(err) + } + // Finally restart the driver. + go smLoop(sm) +} + +func (sm *RCOBStateMachine) proposeBlock() { + // Keep track how many blocks this replica proposed + sm.proposalSequence += 1 + + // Create a block + blockSize := 1 + sm.rng.Intn(RCOBOptions.maxBlockSize) + block := RCOBBlock{ + Proposer: sm.s.Name(), + ProposerSequence: sm.proposalSequence, + Data: make([]byte, blockSize), + } + + // Data is random bytes + sm.rng.Read(block.Data) + + // Serialize block to JSON + blockData, err := json.Marshal(block) + if err != nil { + panic(fmt.Sprintf("serialization error: %s", err)) + } + + sm.logDebug( + "Proposing block <%s, %d, [%dB]>", + block.Proposer, + block.ProposerSequence, + len(block.Data), + ) + + // Propose block (may fail, and that's ok) + sm.propose(blockData) +} + +func (sm *RCOBStateMachine) applyBlock(data []byte) { + // Deserialize block + var block RCOBBlock + err := json.Unmarshal(data, &block) + if err != nil { + panic(fmt.Sprintf("deserialization error: %s", err)) + } + + sm.logDebug("Applying block <%s, %d>", block.Proposer, block.ProposerSequence) + + // Hash the data on top of the existing running hash + n, err := sm.hash.Write(block.Data) + if n != len(block.Data) { + panic(fmt.Sprintf("unexpected checksum written %d data block size: %d", n, len(block.Data))) + } else if err != nil { + panic(fmt.Sprintf("checksum error: %s", err)) + } + + // Track numbers of blocks applied + sm.blocksApplied += 1 + sm.blocksAppliedSinceSnapshot += 1 + + sm.logDebug("Hash after %d blocks: %X ", sm.blocksApplied, sm.hash.Sum(nil)) +} + +func (sm *RCOBStateMachine) getCurrentHash() (bool, uint64, string) { + sm.Lock() + defer sm.Unlock() + + // Return running, the number of blocks applied and the current running hash + return !sm.stopped, sm.blocksApplied, fmt.Sprintf("%X", sm.hash.Sum(nil)) +} + +// RCOBSnapshot structure for RCOB snapshots +// Consists of the hash (32b) after BlocksCount blocks were hashed. +// TODO: Could start storing last N blocks to make snapshot larger and exercise more code paths. +// (a big chunk of random data would also do the trick) +type RCOBSnapshot struct { + SourceNode string + HashData []byte + BlocksCount uint64 +} + +// createSnapshot asks the state machine to create a snapshot. +// The request may be ignored if no blocks were applied since the last snapshot. +func (sm *RCOBStateMachine) createSnapshot() { + sm.Lock() + defer sm.Unlock() + + if sm.blocksAppliedSinceSnapshot == 0 { + sm.logDebug("Skip snapshot, no new entries") + return + } + + if RCOBOptions.safeSnapshots && !sm.ready { + sm.logDebug("Skip snapshot, still recovering") + return + } + + sm.logDebug( + "Snapshot (with %d blocks applied, %d since last snapshot)", + sm.blocksApplied, + sm.blocksAppliedSinceSnapshot, + ) + + // Serialize the internal state of the hash block + serializedHash, err := sm.hash.(encoding.BinaryMarshaler).MarshalBinary() + if err != nil { + panic(fmt.Sprintf("failed to marshal hash: %s", err)) + } + + // Create snapshot + snapshot := RCOBSnapshot{ + SourceNode: fmt.Sprintf("%s (%s)", sm.s.Name(), sm.n.ID()), + HashData: serializedHash, + BlocksCount: sm.blocksApplied, + } + + // Serialize snapshot as JSON + snapshotData, err := json.Marshal(snapshot) + if err != nil { + panic(fmt.Sprintf("failed to marshal snapshot: %s", err)) + } + + // InstallSnapshot is actually "save the snapshot", which is an operation delegated to the node + err = sm.n.InstallSnapshot(snapshotData) + if err != nil { + panic(fmt.Sprintf("failed to snapshot: %s", err)) + } + + // Reset counter since last snapshot + sm.blocksAppliedSinceSnapshot = 0 +} + +func (sm *RCOBStateMachine) loadSnapshot(data []byte) { + // Deserialize snapshot from JSON + var snapshot RCOBSnapshot + err := json.Unmarshal(data, &snapshot) + if err != nil { + panic(fmt.Sprintf("failed to unmarshal snapshot: %s", err)) + } + + sm.logDebug( + "Applying snapshot (created by %s) taken after %d blocks", + snapshot.SourceNode, + snapshot.BlocksCount, + ) + + // Load internal hash state + err = sm.hash.(encoding.BinaryUnmarshaler).UnmarshalBinary(snapshot.HashData) + if err != nil { + panic(fmt.Sprintf("failed to unmarshal hash data: %s", err)) + } + + // Load block counter + sm.blocksApplied = snapshot.BlocksCount + + // Reset number of blocks applied since snapshot + sm.blocksAppliedSinceSnapshot = 0 + + sm.logDebug("Hash after snapshot with %d blocks: %X ", sm.blocksApplied, sm.hash.Sum(nil)) +} + +// Factory function to create state machine on top of the given server/node +func newRaftChainStateMachine(s *Server, cfg *RaftConfig, n RaftNode) stateMachine { + // Create RNG seed based on server name and node id + var seed int64 + for _, c := range []byte(s.Name()) { + seed += int64(c) + } + for _, c := range []byte(n.ID()) { + seed += int64(c) + } + rng := rand.New(rand.NewSource(seed)) + + // Initialize empty hash block + hashBlock := crc32.NewIEEE() + + return &RCOBStateMachine{s: s, n: n, cfg: cfg, rng: rng, hash: hashBlock} +} diff --git a/server/raft_test.go b/server/raft_test.go index 2b6dcddf29c..c760664ce16 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1945,3 +1945,250 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { n.Applied(2) require_True(t, n.Healthy()) } + +// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before +// proposing the next one. +// The test may fail if: +// - Replicas hash diverge +// - One replica never applies the N-th block applied by the rest +// - The given number of blocks cannot be applied within some amount of time +func TestNRGChainOfBlocksRunInLockstep(t *testing.T) { + const iterations = 50 + const applyTimeout = 3 * time.Second + const testTimeout = iterations * time.Second + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newRaftChainStateMachine) + rg.waitOnLeader() + + testTimer := time.NewTimer(testTimeout) + + for iteration := uint64(1); iteration <= iterations; iteration++ { + select { + case <-testTimer.C: + t.Fatalf("Timeout, completed %d/%d iterations", iteration-1, iterations) + default: + // Continue + } + + // Propose the next block (this test assumes the proposal always goes through) + rg.randomMember().(*RCOBStateMachine).proposeBlock() + + var currentHash string + + // Wait on participants to converge + checkFor(t, applyTimeout, 500*time.Millisecond, func() error { + var previousNodeName string + var previousNodeHash string + for _, sm := range rg { + stateMachine := sm.(*RCOBStateMachine) + nodeName := fmt.Sprintf( + "%s/%s", + stateMachine.server().Name(), + stateMachine.node().ID(), + ) + + running, blocksCount, hash := stateMachine.getCurrentHash() + // All nodes always running + if !running { + return fmt.Errorf( + "node %s is not running", + nodeName, + ) + } + // Node is behind + if blocksCount != iteration { + return fmt.Errorf( + "node %s applied %d blocks out of %d expected", + nodeName, + blocksCount, + iteration, + ) + } + // Make sure hash is not empty + if hash == "" { + return fmt.Errorf( + "node %s has empty hash after applying %d blocks", + nodeName, + blocksCount, + ) + } + // Check against previous node hash, unless this is the first node and we don't have anyone to compare + if previousNodeHash != "" && previousNodeHash != hash { + return fmt.Errorf( + "hash mismatch after %d blocks: %s hash: %s != %s hash: %s", + iteration, + nodeName, + hash, + previousNodeName, + previousNodeHash, + ) + } + // Set node name and hash for next node to compare against + previousNodeName, previousNodeHash = nodeName, hash + + } + // All replicas applied the last block and their hashes match + currentHash = previousNodeHash + return nil + }) + if RCOBOptions.verbose { + t.Logf( + "Verified chain hash %s for %d/%d nodes after %d/%d iterations", + currentHash, + len(rg), + len(rg), + iteration, + iterations, + ) + } + } +} + +// This is a RaftChainOfBlocks test where one of the replicas is stopped before proposing a short burst of blocks. +// Upon resuming the replica, we check it is able to catch up to the rest. +// The test may fail if: +// - Replicas hash diverge +// - One replica never applies the N-th block applied by the rest +// - The given number of blocks cannot be applied within some amount of time +func TestNRGChainOfBlocksStopAndCatchUp(t *testing.T) { + const iterations = 50 + const blocksPerIteration = 3 + const applyTimeout = 3 * time.Second + const testTimeout = 2 * iterations * time.Second + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newRaftChainStateMachine) + rg.waitOnLeader() + + testTimer := time.NewTimer(testTimeout) + highestBlockSeen := uint64(0) + + for iteration := uint64(1); iteration <= iterations; iteration++ { + + select { + case <-testTimer.C: + t.Fatalf("Timeout, completed %d/%d iterations", iteration-1, iterations) + default: + // Continue + } + + // Stop a random node + stoppedNode := rg.randomMember() + leader := stoppedNode.node().Leader() + stoppedNode.stop() + + // Snapshot a random (non-stopped) node + snapshotNode := rg.randomMember() + for snapshotNode == stoppedNode { + snapshotNode = rg.randomMember() + } + snapshotNode.(*RCOBStateMachine).createSnapshot() + + if RCOBOptions.verbose { + t.Logf( + "Iteration %d/%d: stopping node: %s/%s (leader: %v)", + iteration, + iterations, + stoppedNode.server().Name(), + stoppedNode.node().ID(), + leader, + ) + } + + // Propose some new blocks + rg.waitOnLeader() + for i := 0; i < blocksPerIteration; i++ { + proposer := rg.randomMember() + // Pick again if we randomly chose the stopped node + for proposer == stoppedNode { + proposer = rg.randomMember() + } + + proposer.(*RCOBStateMachine).proposeBlock() + } + + // Restart the stopped node + stoppedNode.restart() + + // Wait on participants to converge + expectedBlocks := iteration * blocksPerIteration + var currentHash string + checkFor(t, applyTimeout, 250*time.Millisecond, func() error { + var previousNodeName string + var previousNodeHash string + for _, sm := range rg { + stateMachine := sm.(*RCOBStateMachine) + nodeName := fmt.Sprintf( + "%s/%s", + stateMachine.server().Name(), + stateMachine.node().ID(), + ) + running, blocksCount, currentHash := stateMachine.getCurrentHash() + // Track the highest block seen by any replica + if blocksCount > highestBlockSeen { + highestBlockSeen = blocksCount + // Must check all replicas again + return fmt.Errorf("updated highest block to %d (%s)", highestBlockSeen, nodeName) + } + // All nodes should be running + if !running { + return fmt.Errorf( + "node %s not running", + nodeName, + ) + } + // Node is behind + if blocksCount != expectedBlocks { + return fmt.Errorf( + "node %s applied %d blocks out of %d expected", + nodeName, + blocksCount, + expectedBlocks, + ) + } + // Make sure hash is not empty + if currentHash == "" { + return fmt.Errorf( + "node %s has empty hash after applying %d blocks", + nodeName, + blocksCount, + ) + } + // Check against previous node hash, unless this is the first node to be checked + if previousNodeHash != "" && previousNodeHash != currentHash { + return fmt.Errorf( + "hash mismatch after %d blocks: %s hash: %s != %s hash: %s", + expectedBlocks, + nodeName, + currentHash, + previousNodeName, + previousNodeHash, + ) + } + // Set node name and hash for next node to compare against + previousNodeName, previousNodeHash = nodeName, currentHash + } + // All is well + currentHash = previousNodeHash + return nil + }) + + if RCOBOptions.verbose { + t.Logf( + "Verified chain hash %s for %d/%d nodes after %d blocks, %d/%d iterations (%d lost proposals)", + currentHash, + len(rg), + len(rg), + highestBlockSeen, + iteration, + iterations, + expectedBlocks-highestBlockSeen, + ) + } + } +}