Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 298 additions & 0 deletions server/jetstream_cluster_long_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package server
import (
"fmt"
"math/rand"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -183,3 +184,300 @@ func TestLongKVPutWithServerRestarts(t *testing.T) {
)
}
}

// This is a RaftChainOfBlocks test that randomly starts and stops nodes to exercise recovery and snapshots.
func TestLongNRGChainOfBlocks(t *testing.T) {
const (
ClusterSize = 3
GroupSize = 3
ConvergenceTimeout = 30 * time.Second
Duration = 10 * time.Minute
PrintStateInterval = 3 * time.Second
)

// Create cluster
c := createJetStreamClusterExplicit(t, "Test", ClusterSize)
defer c.shutdown()

rg := c.createRaftGroup("ChainOfBlocks", GroupSize, newRaftChainStateMachine)
rg.waitOnLeader()

// Available operations
type TestOperation string
const (
StopOne TestOperation = "Stop one active node"
StopAll = "Stop all active nodes"
RestartOne = "Restart one stopped node"
RestartAll = "Restart all stopped nodes"
Snapshot = "Snapshot one active node"
Propose = "Propose a value via one active node"
ProposeLeader = "Propose a value via leader"
Pause = "Let things run undisturbed for a while"
Check = "Wait for nodes to converge"
)

// Weighted distribution of operations, one is randomly chosen from this vector in each iteration
opsWeighted := []TestOperation{
StopOne,
StopOne,
StopOne,
StopAll,
StopAll,
RestartOne,
RestartOne,
RestartOne,
RestartAll,
RestartAll,
RestartAll,
Snapshot,
Snapshot,
Propose,
Propose,
Propose,
Propose,
ProposeLeader,
ProposeLeader,
ProposeLeader,
ProposeLeader,
Pause,
Pause,
Pause,
Pause,
Check,
Check,
Check,
Check,
}

rng := rand.New(rand.NewSource(time.Now().UnixNano()))

// Chose a node from the list (and remove it)
pickRandomNode := func(nodes []stateMachine) ([]stateMachine, stateMachine) {
if len(nodes) == 0 {
// Input list is empty
return nodes, nil
}
// Pick random node
i := rng.Intn(len(nodes))
node := nodes[i]
// Move last element in its place
nodes[i] = nodes[len(nodes)-1]
// Return slice excluding last element
return nodes[:len(nodes)-1], node
}

// Create summary status string for all replicas
chainStatusString := func() string {
b := strings.Builder{}
for _, sm := range rg {
csm := sm.(*RCOBStateMachine)
running, blocksCount, blockHash := csm.getCurrentHash()
if running {
b.WriteString(
fmt.Sprintf(
" [%s (%s): %d blocks, hash=%s],",
csm.server().Name(),
csm.node().ID(),
blocksCount,
blockHash,
),
)
} else {
b.WriteString(
fmt.Sprintf(
" [%s (%s): STOPPED],",
csm.server().Name(),
csm.node().ID(),
),
)

}
}
return b.String()
}

// Track the highest number of blocks applied by any of the replicas
highestBlocksCount := uint64(0)

// Track active and stopped nodes
activeNodes := make([]stateMachine, 0, GroupSize)
stoppedNodes := make([]stateMachine, 0, GroupSize)

// Initially all nodes are active
activeNodes = append(activeNodes, rg...)

defer func() {
t.Logf("Final state: %s", chainStatusString())
}()

printStateTicker := time.NewTicker(PrintStateInterval)
testTimer := time.NewTimer(Duration)
start := time.Now()
iteration := 0

for {

iteration++
select {
case <-printStateTicker.C:
t.Logf(
"[%s] State: %s",
time.Since(start).Round(time.Second),
chainStatusString(),
)
case <-testTimer.C:
// Test completed
return
default:
// Continue
}

// Choose a random operation to perform in this iteration
nextOperation := opsWeighted[rng.Intn(len(opsWeighted))]
if RCOBOptions.verbose {
t.Logf("Iteration %d: %s", iteration, nextOperation)
}

switch nextOperation {

case StopOne:
// Stop an active node (if any are left active)
var n stateMachine
activeNodes, n = pickRandomNode(activeNodes)
if n != nil {
n.stop()
stoppedNodes = append(stoppedNodes, n)
}

case StopAll:
// Stop all active nodes (if any are active)
for _, node := range activeNodes {
node.stop()
}
stoppedNodes = append(stoppedNodes, activeNodes...)
activeNodes = make([]stateMachine, 0, GroupSize)

case RestartOne:
// Restart a stopped node (if any are stopped)
var n stateMachine
stoppedNodes, n = pickRandomNode(stoppedNodes)
if n != nil {
n.restart()
activeNodes = append(activeNodes, n)
}

case RestartAll:
// Restart all stopped nodes (if any)
for _, node := range stoppedNodes {
node.restart()
}
activeNodes = append(activeNodes, stoppedNodes...)
stoppedNodes = make([]stateMachine, 0, GroupSize)

case Snapshot:
// Choose a random active node and tell it to create a snapshot
if len(activeNodes) > 0 {
n := activeNodes[rng.Intn(len(activeNodes))]
n.(*RCOBStateMachine).createSnapshot()
}

case Propose:
// Make an active node propose the next block (if any nodes are active)
if len(activeNodes) > 0 {
n := activeNodes[rng.Intn(len(activeNodes))]
n.(*RCOBStateMachine).proposeBlock()
}

case ProposeLeader:
// Make the leader propose the next block (if a leader is active)
leader := rg.leader()
if leader != nil {
leader.(*RCOBStateMachine).proposeBlock()
}

case Pause:
// Noop, let things run undisturbed for a little bit
time.Sleep(time.Duration(rng.Intn(250)) * time.Millisecond)

case Check:
// Restart any stopped node
for _, node := range stoppedNodes {
node.restart()
}
activeNodes = append(activeNodes, stoppedNodes...)
stoppedNodes = make([]stateMachine, 0, GroupSize)

// Ensure all nodes (eventually) converge
checkFor(t, ConvergenceTimeout, 250*time.Millisecond,
func() error {
referenceNode := rg[0]
// Save block count and hash of first node as reference
_, referenceBlocksCount, referenceHash := referenceNode.(*RCOBStateMachine).getCurrentHash()

// Compare each node against reference
for _, n := range rg {
sm := n.(*RCOBStateMachine)
running, blocksCount, blockHash := sm.getCurrentHash()
if !running {
return fmt.Errorf(
"node not running: %s (%s)",
sm.server().Name(),
sm.node().ID(),
)
}

// Track the highest block delivered by any node
if blocksCount > highestBlocksCount {
if RCOBOptions.verbose {
t.Logf(
"New highest blocks count: %d (%s (%s))",
blocksCount,
sm.s.Name(),
sm.n.ID(),
)
}
highestBlocksCount = blocksCount
}

// Each replica must match the reference node

if blocksCount != referenceBlocksCount {
return fmt.Errorf(
"different number of blocks %d (%s (%s) vs. %d (%s (%s))",
blocksCount,
sm.server().Name(),
sm.node().ID(),
referenceBlocksCount,
referenceNode.server().Name(),
referenceNode.node().ID(),
)
} else if blockHash != referenceHash {
return fmt.Errorf(
"different hash after %d blocks %s (%s (%s) vs. %s (%s (%s))",
blocksCount,
blockHash,
sm.server().Name(),
sm.node().ID(),
referenceHash,
referenceNode.server().Name(),
referenceNode.node().ID(),
)
}
}

// Verify consistency check was against the highest block known
if referenceBlocksCount < highestBlocksCount {
return fmt.Errorf(
"nodes converged below highest known block count: %d: %s",
highestBlocksCount,
chainStatusString(),
)
}

// All nodes reached the same state, check passed
return nil
},
)
}
}
}
Loading