diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8179647986a..d3a609556c8 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -6712,3 +6712,125 @@ func TestJetStreamClusterInvalidR1Config(t *testing.T) { } } } + +func TestJetStreamClusterMultiLeaderR3Config(t *testing.T) { + conf := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + server_tags: ["test"] + system_account: sys + no_auth_user: js + accounts { + sys { users = [ { user: sys, pass: sys } ] } + js { + jetstream = enabled + users = [ { user: js, pass: js } ] + } + }` + c := createJetStreamClusterWithTemplate(t, conf, "R3TEST", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.servers[0]) + defer nc.Close() + + nc2, js2 := jsClientConnect(t, c.servers[2]) + defer nc2.Close() + + createStreams := func(t *testing.T, js nats.JetStreamContext, n, replicas int) { + for i := 0; i < n; i++ { + sname := fmt.Sprintf("S:%d", i) + js.AddStream(&nats.StreamConfig{ + Name: sname, + MaxMsgsPerSubject: 5, + Replicas: replicas, + Subjects: []string{fmt.Sprintf("A.%d.>", i)}, + }) + time.Sleep(10 * time.Millisecond) + js.AddConsumer(sname, &nats.ConsumerConfig{ + Name: sname, + Durable: sname, + FilterSubject: ">", + }) + js.Publish(fmt.Sprintf("A.%d.foo", i), []byte("one")) + } + } + + // Create streams in parallel with different configs, then + // check whether one of them is in an undefined state. + totalStreams := 5 + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + createStreams(t, js, totalStreams, 3) + }() + + wg.Add(1) + go func() { + defer wg.Done() + createStreams(t, js2, totalStreams, 1) + }() + wg.Wait() + + checkMultiLeader := func(accountName, streamName string) error { + leaders := make(map[string]bool) + for _, srv := range c.servers { + jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + if err != nil { + return err + } + for _, acc := range jsz.AccountDetails { + if acc.Name == accountName { + for _, stream := range acc.Streams { + if stream.Name == streamName { + leaders[stream.Cluster.Leader] = true + } + } + } + } + } + if len(leaders) > 1 { + return fmt.Errorf("There are multiple leaders on %s stream: %+v", streamName, leaders) + } + return nil + } + + var invalidStream string + for i := 0; i < totalStreams; i++ { + ci, err := js.StreamInfo(fmt.Sprintf("S:%d", i)) + require_NoError(t, err) + if ci.Config.Replicas == 1 { + if ci.Cluster != nil { + peers := ci.Cluster.Replicas + if len(peers) > 1 { + invalidStream = ci.Config.Name + t.Errorf("Unexpected stream config drift, 1 replica expected but found %v peers", len(peers)) + } + } + } + } + if len(invalidStream) > 0 { + _, err := js.StreamInfo(invalidStream) + require_NoError(t, err) + + // Restart server where first client is connected and almost all R1 replicas landed. + srv := c.servers[0] + srv.Shutdown() + srv.WaitForShutdown() + time.Sleep(2 * time.Second) + c.restartServer(srv) + c.waitOnClusterReady() + checkFor(t, 30*time.Second, 200*time.Millisecond, func() error { + return checkMultiLeader("js", invalidStream) + }) + } +}