diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4321637476c..c8ecda0745d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3578,7 +3578,6 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { - mset.setLeader(false) js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ "type": "stream", "account": mset.accName(), @@ -3598,10 +3597,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss } else if numReplicas == 1 && alreadyRunning { // We downgraded to R1. Make sure we cleanup the raft node and the stream monitor. mset.removeNode() - // Make sure we are leader now that we are R1. - needsSetLeader = true // In case we need to shutdown the cluster specific subs, etc. - mset.setLeader(false) + mset.mu.Lock() + // Stop responding to sync requests. + mset.stopClusterSubs() + // Clear catchup state + mset.clearAllCatchupPeers() + mset.mu.Unlock() + // Remove from meta layer. js.mu.Lock() rg.node = nil js.mu.Unlock() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index ca77bff985e..063aa716d6a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -27,6 +27,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -1616,3 +1617,503 @@ NextServer: t.Fatalf("Received %v redeliveries after a successful ack", extraRedeliveries) } } + +func TestJetStreamClusterBusyStreams(t *testing.T) { + t.Skip("Too long for CI at the moment") + type streamSetup struct { + config *nats.StreamConfig + consumers []*nats.ConsumerConfig + subjects []string + } + type job func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) + type testParams struct { + cluster string + streams []*streamSetup + producers int + consumers int + restartAny bool + restartWait time.Duration + ldmRestart bool + rolloutRestart bool + restarts int + checkHealthz bool + jobs []job + expect job + duration time.Duration + producerMsgs int + producerMsgSize int + } + test := func(t *testing.T, test *testParams) { + conf := ` + listen: 127.0.0.1:-1 + http: 127.0.0.1:-1 + server_name: %s + jetstream: { + domain: "cloud" + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + server_tags: ["test"] + system_account: sys + + no_auth_user: js + accounts { + sys { users = [ { user: sys, pass: sys } ] } + + js { jetstream = enabled + users = [ { user: js, pass: js } ] + } + }` + c := createJetStreamClusterWithTemplate(t, conf, test.cluster, 3) + defer c.shutdown() + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.LameDuckDuration = 15 * time.Second + s.opts.LameDuckGracePeriod = -15 * time.Second + s.optsMu.Unlock() + } + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + var wg sync.WaitGroup + for _, stream := range test.streams { + stream := stream + wg.Add(1) + go func() { + defer wg.Done() + _, err := js.AddStream(stream.config) + require_NoError(t, err) + + for _, consumer := range stream.consumers { + _, err := js.AddConsumer(stream.config.Name, consumer) + require_NoError(t, err) + } + }() + } + wg.Wait() + + ctx, cancel := context.WithTimeout(context.Background(), test.duration) + defer cancel() + for _, stream := range test.streams { + payload := []byte(strings.Repeat("A", test.producerMsgSize)) + stream := stream + subjects := stream.subjects + + // Create publishers on different connections that sends messages + // to all the consumers subjects. + var n atomic.Uint64 + for i := 0; i < test.producers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for range time.NewTicker(1 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + for _, subject := range subjects { + _, err := js.Publish(subject, payload, nats.AckWait(200*time.Millisecond)) + if err == nil { + if nn := n.Add(1); int(nn) >= test.producerMsgs { + return + } + } + } + } + }() + } + + // Create multiple parallel pull subscribers per consumer config. + for i := 0; i < test.consumers; i++ { + for _, consumer := range stream.consumers { + wg.Add(1) + + consumer := consumer + go func() { + defer wg.Done() + + for attempts := 0; attempts < 60; attempts++ { + _, err := js.ConsumerInfo(stream.config.Name, consumer.Name) + if err != nil { + t.Logf("WRN: Failed creating pull subscriber: %v - %v - %v - %v", + consumer.FilterSubject, stream.config.Name, consumer.Name, err) + } + } + sub, err := js.PullSubscribe(consumer.FilterSubject, "", nats.Bind(stream.config.Name, consumer.Name)) + if err != nil { + t.Logf("WRN: Failed creating pull subscriber: %v - %v - %v - %v", + consumer.FilterSubject, stream.config.Name, consumer.Name, err) + return + } + require_NoError(t, err) + + for range time.NewTicker(100 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + msgs, err := sub.Fetch(1, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + + msgs, err = sub.Fetch(100, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + } + } + } + + for _, job := range test.jobs { + go job(t, nc, js, c) + } + if test.restarts > 0 { + wg.Add(1) + time.AfterFunc(test.restartWait, func() { + defer wg.Done() + for i := 0; i < test.restarts; i++ { + switch { + case test.restartAny: + s := c.servers[rand.Intn(len(c.servers))] + if test.ldmRestart { + s.lameDuckMode() + } else { + s.Shutdown() + } + s.WaitForShutdown() + c.restartServer(s) + case test.rolloutRestart: + for _, s := range c.servers { + if test.ldmRestart { + s.lameDuckMode() + } else { + s.Shutdown() + } + s.WaitForShutdown() + s = c.restartServer(s) + + if test.checkHealthz { + hctx, hcancel := context.WithTimeout(ctx, 15*time.Second) + defer hcancel() + + Healthz: + for range time.NewTicker(2 * time.Second).C { + select { + case <-hctx.Done(): + break Healthz + default: + } + + status := s.healthz(nil) + if status.StatusCode == 200 { + break Healthz + } + } + } + } + } + c.waitOnClusterReady() + } + }) + } + test.expect(t, nc, js, c) + cancel() + wg.Wait() + } + stepDown := func(nc *nats.Conn, streamName string) { + nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, streamName), nil, time.Second) + } + getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { + t.Helper() + srv := c.streamLeader(accountName, streamName) + jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + require_NoError(t, err) + for _, acc := range jsz.AccountDetails { + if acc.Name == accountName { + for _, stream := range acc.Streams { + if stream.Name == streamName { + return &stream + } + } + } + } + t.Error("Could not find account details") + return nil + } + checkMsgsEqual := func(t *testing.T, c *cluster, accountName, streamName string) { + state := getStreamDetails(t, c, accountName, streamName).State + var msets []*stream + for _, s := range c.servers { + acc, err := s.LookupAccount(accountName) + require_NoError(t, err) + mset, err := acc.lookupStream(streamName) + require_NoError(t, err) + msets = append(msets, mset) + } + for seq := state.FirstSeq; seq <= state.LastSeq; seq++ { + var msgId string + var smv StoreMsg + for _, mset := range msets { + mset.mu.RLock() + sm, err := mset.store.LoadMsg(seq, &smv) + mset.mu.RUnlock() + require_NoError(t, err) + if msgId == _EMPTY_ { + msgId = string(sm.hdr) + } else if msgId != string(sm.hdr) { + t.Fatalf("MsgIds do not match for seq %d: %q vs %q", seq, msgId, sm.hdr) + } + } + } + } + checkConsumer := func(t *testing.T, c *cluster, accountName, streamName, consumerName string) { + t.Helper() + var leader string + for _, s := range c.servers { + jsz, err := s.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + require_NoError(t, err) + for _, acc := range jsz.AccountDetails { + if acc.Name == accountName { + for _, stream := range acc.Streams { + if stream.Name == streamName { + for _, consumer := range stream.Consumer { + if leader == "" { + leader = consumer.Cluster.Leader + } else if leader != consumer.Cluster.Leader { + t.Errorf("There are two leaders for %s/%s: %s vs %s", + stream.Name, consumer.Name, leader, consumer.Cluster.Leader) + } + } + } + } + } + } + } + } + + t.Run("R1F/rescale/R3F/sources:10/limits", func(t *testing.T) { + testDuration := 3 * time.Minute + totalStreams := 10 + streams := make([]*streamSetup, totalStreams) + sources := make([]*nats.StreamSource, totalStreams) + for i := 0; i < totalStreams; i++ { + name := fmt.Sprintf("test:%d", i) + st := &streamSetup{ + config: &nats.StreamConfig{ + Name: name, + Subjects: []string{fmt.Sprintf("test.%d.*", i)}, + Replicas: 1, + Retention: nats.LimitsPolicy, + }, + } + st.subjects = append(st.subjects, fmt.Sprintf("test.%d.0", i)) + sources[i] = &nats.StreamSource{Name: name} + streams[i] = st + } + + // Create Source consumer. + sourceSetup := &streamSetup{ + config: &nats.StreamConfig{ + Name: "source-test", + Replicas: 1, + Retention: nats.LimitsPolicy, + Sources: sources, + }, + consumers: make([]*nats.ConsumerConfig, 0), + } + cc := &nats.ConsumerConfig{ + Name: "A", + Durable: "A", + FilterSubject: "test.>", + AckPolicy: nats.AckExplicitPolicy, + } + sourceSetup.consumers = append(sourceSetup.consumers, cc) + streams = append(streams, sourceSetup) + + scale := func(replicas int, wait time.Duration) job { + return func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) { + config := sourceSetup.config + time.AfterFunc(wait, func() { + config.Replicas = replicas + for i := 0; i < 10; i++ { + _, err := js.UpdateStream(config) + if err == nil { + return + } + time.Sleep(1 * time.Second) + } + }) + } + } + + expect := func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) { + // The source stream should not be stuck or be different from the other streams. + time.Sleep(testDuration + 1*time.Minute) + accName := "js" + streamName := "source-test" + + // Check a few times to see if there are no changes in the number of messages. + var changed bool + var prevMsgs uint64 + for i := 0; i < 10; i++ { + sinfo, err := js.StreamInfo(streamName) + if err != nil { + t.Logf("Error: %v", err) + time.Sleep(2 * time.Second) + continue + } + prevMsgs = sinfo.State.Msgs + } + for i := 0; i < 10; i++ { + sinfo, err := js.StreamInfo(streamName) + if err != nil { + t.Logf("Error: %v", err) + time.Sleep(2 * time.Second) + continue + } + changed = prevMsgs != sinfo.State.Msgs + prevMsgs = sinfo.State.Msgs + time.Sleep(2 * time.Second) + } + if !changed { + // Doing a leader step down should not cause the messages to change. + stepDown(nc, streamName) + + for i := 0; i < 10; i++ { + sinfo, err := js.StreamInfo(streamName) + if err != nil { + t.Logf("Error: %v", err) + time.Sleep(2 * time.Second) + continue + } + changed = prevMsgs != sinfo.State.Msgs + prevMsgs = sinfo.State.Msgs + time.Sleep(2 * time.Second) + } + if changed { + t.Error("Stream msgs changed after the step down") + } + } + + ///////////////////////////////////////////////////////////////////////////////////////// + // // + // The number of messages sourced should match the count from all the other streams. // + // // + ///////////////////////////////////////////////////////////////////////////////////////// + var expectedMsgs uint64 + for i := 0; i < totalStreams; i++ { + name := fmt.Sprintf("test:%d", i) + sinfo, err := js.StreamInfo(name) + require_NoError(t, err) + expectedMsgs += sinfo.State.Msgs + } + sinfo, err := js.StreamInfo(streamName) + require_NoError(t, err) + + gotMsgs := sinfo.State.Msgs + if gotMsgs != expectedMsgs { + t.Errorf("stream with sources has %v messages, but total sourced messages should be %v", gotMsgs, expectedMsgs) + } + checkConsumer(t, c, accName, streamName, "A") + checkMsgsEqual(t, c, accName, streamName) + } + test(t, &testParams{ + cluster: t.Name(), + streams: streams, + producers: 10, + consumers: 10, + restarts: 1, + rolloutRestart: true, + ldmRestart: true, + checkHealthz: true, + // TODO(dlc) - If this overlaps with the scale jobs this test will fail. + // Leaders will be elected with partial state. + restartWait: 65 * time.Second, + jobs: []job{ + scale(3, 15*time.Second), + scale(1, 30*time.Second), + scale(3, 60*time.Second), + }, + expect: expect, + duration: testDuration, + producerMsgSize: 1024, + producerMsgs: 100_000, + }) + }) + + t.Run("R3F/streams:30/limits", func(t *testing.T) { + testDuration := 3 * time.Minute + totalStreams := 30 + consumersPerStream := 5 + streams := make([]*streamSetup, totalStreams) + for i := 0; i < totalStreams; i++ { + name := fmt.Sprintf("test:%d", i) + st := &streamSetup{ + config: &nats.StreamConfig{ + Name: name, + Subjects: []string{fmt.Sprintf("test.%d.*", i)}, + Replicas: 3, + Retention: nats.LimitsPolicy, + }, + consumers: make([]*nats.ConsumerConfig, 0), + } + for j := 0; j < consumersPerStream; j++ { + subject := fmt.Sprintf("test.%d.%d", i, j) + name := fmt.Sprintf("A:%d:%d", i, j) + cc := &nats.ConsumerConfig{ + Name: name, + Durable: name, + FilterSubject: subject, + AckPolicy: nats.AckExplicitPolicy, + } + st.consumers = append(st.consumers, cc) + st.subjects = append(st.subjects, subject) + } + streams[i] = st + } + expect := func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext, c *cluster) { + time.Sleep(testDuration + 1*time.Minute) + accName := "js" + for i := 0; i < totalStreams; i++ { + streamName := fmt.Sprintf("test:%d", i) + checkMsgsEqual(t, c, accName, streamName) + } + } + test(t, &testParams{ + cluster: t.Name(), + streams: streams, + producers: 10, + consumers: 10, + restarts: 1, + rolloutRestart: true, + ldmRestart: true, + checkHealthz: true, + restartWait: 45 * time.Second, + expect: expect, + duration: testDuration, + producerMsgSize: 1024, + producerMsgs: 100_000, + }) + }) +} diff --git a/server/stream.go b/server/stream.go index 180b5a535ff..6193a9ccf05 100644 --- a/server/stream.go +++ b/server/stream.go @@ -847,9 +847,10 @@ func (mset *stream) setLeader(isLeader bool) error { // Make sure we are listening for sync requests. // TODO(dlc) - Original design was that all in sync members of the group would do DQ. mset.startClusterSubs() - // Setup subscriptions + + // Setup subscriptions if we were not already the leader. if err := mset.subscribeToStream(); err != nil { - if isLeader && mset.isClustered() { + if mset.isClustered() { // Stepdown since we have an error. mset.node.StepDown() } @@ -2797,6 +2798,11 @@ func (mset *stream) cancelSourceInfo(si *sourceInfo) { si.msgs.drain() si.msgs.unregister() } + // If we have a schedule setup go ahead and delete that. + if t := mset.sourceSetupSchedules[si.iname]; t != nil { + t.Stop() + delete(mset.sourceSetupSchedules, si.iname) + } } const sourceConsumerRetryThreshold = 2 * time.Second @@ -3454,9 +3460,11 @@ func (mset *stream) setStartingSequenceForSources(iNames map[string]struct{}) { } } -// lock should be held. // Resets the SourceInfo for all the sources +// lock should be held. func (mset *stream) resetSourceInfo() { + // Reset if needed. + mset.stopSourceConsumers() mset.sources = make(map[string]*sourceInfo) for _, ssi := range mset.cfg.Sources { @@ -3622,7 +3630,7 @@ func (mset *stream) subscribeToStream() error { mset.mirror.trs = trs // delay the actual mirror consumer creation for after a delay mset.scheduleSetupMirrorConsumerRetry() - } else if len(mset.cfg.Sources) > 0 { + } else if len(mset.cfg.Sources) > 0 && mset.sourcesConsumerSetup == nil { // Setup the initial source infos for the sources mset.resetSourceInfo() // Delay the actual source consumer(s) creation(s) for after a delay