Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6712,3 +6712,125 @@ func TestJetStreamClusterInvalidR1Config(t *testing.T) {
}
}
}

func TestJetStreamClusterMultiLeaderR3Config(t *testing.T) {
conf := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {
store_dir: '%s',
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
server_tags: ["test"]
system_account: sys
no_auth_user: js
accounts {
sys { users = [ { user: sys, pass: sys } ] }
js {
jetstream = enabled
users = [ { user: js, pass: js } ]
}
}`
c := createJetStreamClusterWithTemplate(t, conf, "R3TEST", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.servers[0])
defer nc.Close()

nc2, js2 := jsClientConnect(t, c.servers[2])
defer nc2.Close()

createStreams := func(t *testing.T, js nats.JetStreamContext, n, replicas int) {
for i := 0; i < n; i++ {
sname := fmt.Sprintf("S:%d", i)
js.AddStream(&nats.StreamConfig{
Name: sname,
MaxMsgsPerSubject: 5,
Replicas: replicas,
Subjects: []string{fmt.Sprintf("A.%d.>", i)},
})
time.Sleep(10 * time.Millisecond)
js.AddConsumer(sname, &nats.ConsumerConfig{
Name: sname,
Durable: sname,
FilterSubject: ">",
})
js.Publish(fmt.Sprintf("A.%d.foo", i), []byte("one"))
}
}

// Create streams in parallel with different configs, then
// check whether one of them is in an undefined state.
totalStreams := 5

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
createStreams(t, js, totalStreams, 3)
}()

wg.Add(1)
go func() {
defer wg.Done()
createStreams(t, js2, totalStreams, 1)
}()
wg.Wait()

checkMultiLeader := func(accountName, streamName string) error {
leaders := make(map[string]bool)
for _, srv := range c.servers {
jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true})
if err != nil {
return err
}
for _, acc := range jsz.AccountDetails {
if acc.Name == accountName {
for _, stream := range acc.Streams {
if stream.Name == streamName {
leaders[stream.Cluster.Leader] = true
}
}
}
}
}
if len(leaders) > 1 {
return fmt.Errorf("There are multiple leaders on %s stream: %+v", streamName, leaders)
}
return nil
}

var invalidStream string
for i := 0; i < totalStreams; i++ {
ci, err := js.StreamInfo(fmt.Sprintf("S:%d", i))
require_NoError(t, err)
if ci.Config.Replicas == 1 {
if ci.Cluster != nil {
peers := ci.Cluster.Replicas
if len(peers) > 1 {
invalidStream = ci.Config.Name
t.Errorf("Unexpected stream config drift, 1 replica expected but found %v peers", len(peers))
}
}
}
}
if len(invalidStream) > 0 {
_, err := js.StreamInfo(invalidStream)
require_NoError(t, err)

// Restart server where first client is connected and almost all R1 replicas landed.
srv := c.servers[0]
srv.Shutdown()
srv.WaitForShutdown()
time.Sleep(2 * time.Second)
c.restartServer(srv)
c.waitOnClusterReady()
checkFor(t, 30*time.Second, 200*time.Millisecond, func() error {
return checkMultiLeader("js", invalidStream)
})
}
}
Loading