diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 414eb803e70..a3ab344477a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -468,6 +468,7 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error { return errors.New("stream not found") } + msetNode := mset.raftNode() switch { case mset.cfg.Replicas <= 1: return nil // No further checks for R=1 streams @@ -475,7 +476,11 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error { case node == nil: return errors.New("group node missing") - case node != mset.raftNode(): + case msetNode == nil: + // Can happen when the stream's node is not yet initialized. + return errors.New("stream node missing") + + case node != msetNode: s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName) node.Delete() mset.resetClusteredState(nil) @@ -521,6 +526,7 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum return errors.New("consumer not found") } + oNode := o.raftNode() rc, _ := o.replica() switch { case rc <= 1: @@ -529,7 +535,11 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum case node == nil: return errors.New("group node missing") - case node != o.raftNode(): + case oNode == nil: + // Can happen when the consumer's node is not yet initialized. + return errors.New("consumer node missing") + + case node != oNode: mset.mu.RLock() accName, streamName := mset.acc.GetName(), mset.cfg.Name mset.mu.RUnlock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 340970dfef3..59a292fb73e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7313,6 +7313,80 @@ func TestJetStreamClusterStreamHealthCheckMustNotRecreate(t *testing.T) { checkNodeIsClosed(sa) } +func TestJetStreamClusterStreamHealthCheckMustNotDeleteEarly(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + waitForStreamAssignments := func() { + t.Helper() + checkFor(t, 5*time.Second, time.Second, func() error { + for _, s := range c.servers { + js := s.getJetStream() + js.mu.RLock() + sa := js.streamAssignment(globalAccountName, "TEST") + js.mu.RUnlock() + if sa == nil { + return fmt.Errorf("stream assignment not found on %s", s.Name()) + } + } + return nil + }) + } + getStreamAssignment := func(rs *Server) (*jetStream, *Account, *streamAssignment, *stream) { + acc, err := rs.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NotNil(t, err) + + sjs := rs.getJetStream() + sjs.mu.RLock() + defer sjs.mu.RUnlock() + + sas := sjs.cluster.streams[globalAccountName] + require_True(t, sas != nil) + sa := sas["TEST"] + require_True(t, sa != nil) + sa.Created = time.Time{} + return sjs, acc, sa, mset + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + waitForStreamAssignments() + + // We manually clear the node on the stream. + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + sjs, acc, sa, mset := getStreamAssignment(rs) + mset.mu.Lock() + mset.node = nil + mset.mu.Unlock() + sjs.mu.Lock() + group := sa.Group + if group == nil { + sjs.mu.Unlock() + t.Fatal("sa.Group not initialized") + } + node := group.node + if node == nil { + sjs.mu.Unlock() + t.Fatal("sa.Group.node not initialized") + } + sjs.mu.Unlock() + + // The health check gets the Raft node of the assignment and checks it against the + // Raft node of the stream. We simulate a race condition where the stream's Raft node + // is not yet initialized. The health check MUST NOT delete the node. + sjs.isStreamHealthy(acc, sa) + require_Equal(t, node.State(), Follower) +} + func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -7432,6 +7506,82 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) { checkNodeIsClosed(ca) } +func TestJetStreamClusterConsumerHealthCheckMustNotDeleteEarly(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + waitForConsumerAssignments := func() { + t.Helper() + checkFor(t, 5*time.Second, time.Second, func() error { + for _, s := range c.servers { + if s.getJetStream().consumerAssignment(globalAccountName, "TEST", "CONSUMER") == nil { + return fmt.Errorf("stream assignment not found on %s", s.Name()) + } + } + return nil + }) + } + getConsumerAssignment := func(rs *Server) (*jetStream, *consumerAssignment, *stream, *consumer) { + acc, err := rs.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NotNil(t, err) + o := mset.lookupConsumer("CONSUMER") + + sjs := rs.getJetStream() + sjs.mu.RLock() + defer sjs.mu.RUnlock() + + sas := sjs.cluster.streams[globalAccountName] + require_True(t, sas != nil) + sa := sas["TEST"] + require_True(t, sa != nil) + ca := sa.consumers["CONSUMER"] + require_True(t, ca != nil) + ca.Created = time.Time{} + return sjs, ca, mset, o + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, // Replicated consumers by default + }) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + waitForConsumerAssignments() + + // We manually clear the node on the consumer. + rs := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + sjs, ca, mset, o := getConsumerAssignment(rs) + o.mu.Lock() + o.node = nil + o.mu.Unlock() + sjs.mu.Lock() + group := ca.Group + if group == nil { + sjs.mu.Unlock() + t.Fatal("ca.Group not initialized") + } + node := group.node + if node == nil { + sjs.mu.Unlock() + t.Fatal("ca.Group.node not initialized") + } + sjs.mu.Unlock() + + // The health check gets the Raft node of the assignment and checks it against the + // Raft node of the consumer. We simulate a race condition where the consumer's Raft node + // is not yet initialized. The health check MUST NOT delete the node. + sjs.isConsumerHealthy(mset, "CONSUMER", ca) + require_Equal(t, node.State(), Follower) +} + func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()