diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9e7de392609..87ad2c60956 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -641,12 +641,12 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error { case !mset.isMonitorRunning(): return errors.New("monitor goroutine not running") - case !node.Healthy(): - return errors.New("group node unhealthy") - case mset.isCatchingUp(): return errors.New("stream catching up") + case !node.Healthy(): + return errors.New("group node unhealthy") + default: return nil } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 80257d0f163..795e968a366 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7497,6 +7497,44 @@ func TestJetStreamClusterStreamHealthCheckOnlyReportsSkew(t *testing.T) { require_NotEqual(t, node.State(), Closed) } +func TestJetStreamClusterStreamHealthCheckStreamCatchup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + sjs := sl.getJetStream() + acc := sl.globalAccount() + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + sjs.mu.Lock() + sa := sjs.streamAssignment(globalAccountName, "TEST") + sjs.mu.Unlock() + + require_NoError(t, sjs.isStreamHealthy(acc, sa)) + + // Check we can report unhealthy. + n := mset.raftNode().(*raft) + n.Lock() + n.commit = 0 + n.Unlock() + require_Error(t, sjs.isStreamHealthy(acc, sa), errors.New("group node unhealthy")) + + // Catching up should have precedence. + mset.setCatchingUp() + require_True(t, mset.isCatchingUp()) + require_Error(t, sjs.isStreamHealthy(acc, sa), errors.New("stream catching up")) +} + func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()