Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,22 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
js.mu.RUnlock()
return errors.New("consumer assignment or group missing")
}
if ca.deleted {
js.mu.RUnlock()
return nil // No further checks, consumer was deleted in the meantime.
}
created := ca.Created
node := ca.Group.node
js.mu.RUnlock()

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
if time.Since(created) < 5*time.Second {
// No further checks, consumer is not available yet but should be soon.
// We'll start erroring once we're sure this consumer is actually broken.
return nil
}
return errors.New("consumer not found")
}

Expand Down
51 changes: 51 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7774,6 +7774,57 @@ func TestJetStreamClusterConsumerHealthCheckOnlyReportsSkew(t *testing.T) {
require_NotEqual(t, node.State(), Closed)
}

// https://github.com/nats-io/nats-server/issues/7149
func TestJetStreamClusterConsumerHealthCheckDeleted(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)

cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
require_NotNil(t, cl)
mset, err := cl.globalAccount().lookupStream("TEST")
require_NoError(t, err)

sjs := cl.getJetStream()
sjs.mu.Lock()
ca := sjs.consumerAssignment(globalAccountName, "TEST", "CONSUMER")
if ca == nil {
sjs.mu.Unlock()
t.Fatal("ca not found")
}
// Reset created time, simulating the consumer existed already for a while.
ca.Created = time.Time{}
sjs.mu.Unlock()

// The health check gathers all assignments and does checking after.
// If the consumer was deleted in the meantime, it should not report an error.
require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER"))
require_NoError(t, sjs.isConsumerHealthy(mset, "CONSUMER", ca))

// The health check could run earlier than we're able to create the consumer.
// In that case, wait before erroring.
sjs.mu.Lock()
if !ca.deleted {
sjs.mu.Unlock()
t.Fatal("ca.deleted not set")
}
ca.deleted = false
ca.Created = time.Now()
sjs.mu.Unlock()
require_NoError(t, sjs.isConsumerHealthy(mset, "CONSUMER", ca))
}

func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
Loading