Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,19 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) error {
return errors.New("stream not found")
}

msetNode := mset.raftNode()
switch {
case mset.cfg.Replicas <= 1:
return nil // No further checks for R=1 streams

case node == nil:
return errors.New("group node missing")

case node != mset.raftNode():
case msetNode == nil:
// Can happen when the stream's node is not yet initialized.
return errors.New("stream node missing")

case node != msetNode:
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
Expand Down Expand Up @@ -521,6 +526,7 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
return errors.New("consumer not found")
}

oNode := o.raftNode()
rc, _ := o.replica()
switch {
case rc <= 1:
Expand All @@ -529,7 +535,11 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
case node == nil:
return errors.New("group node missing")

case node != o.raftNode():
case oNode == nil:
// Can happen when the consumer's node is not yet initialized.
return errors.New("consumer node missing")

case node != oNode:
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
Expand Down
150 changes: 150 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7313,6 +7313,80 @@ func TestJetStreamClusterStreamHealthCheckMustNotRecreate(t *testing.T) {
checkNodeIsClosed(sa)
}

func TestJetStreamClusterStreamHealthCheckMustNotDeleteEarly(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

waitForStreamAssignments := func() {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
js := s.getJetStream()
js.mu.RLock()
sa := js.streamAssignment(globalAccountName, "TEST")
js.mu.RUnlock()
if sa == nil {
return fmt.Errorf("stream assignment not found on %s", s.Name())
}
}
return nil
})
}
getStreamAssignment := func(rs *Server) (*jetStream, *Account, *streamAssignment, *stream) {
acc, err := rs.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NotNil(t, err)

sjs := rs.getJetStream()
sjs.mu.RLock()
defer sjs.mu.RUnlock()

sas := sjs.cluster.streams[globalAccountName]
require_True(t, sas != nil)
sa := sas["TEST"]
require_True(t, sa != nil)
sa.Created = time.Time{}
return sjs, acc, sa, mset
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
waitForStreamAssignments()

// We manually clear the node on the stream.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
sjs, acc, sa, mset := getStreamAssignment(rs)
mset.mu.Lock()
mset.node = nil
mset.mu.Unlock()
sjs.mu.Lock()
group := sa.Group
if group == nil {
sjs.mu.Unlock()
t.Fatal("sa.Group not initialized")
}
node := group.node
if node == nil {
sjs.mu.Unlock()
t.Fatal("sa.Group.node not initialized")
}
sjs.mu.Unlock()

// The health check gets the Raft node of the assignment and checks it against the
// Raft node of the stream. We simulate a race condition where the stream's Raft node
// is not yet initialized. The health check MUST NOT delete the node.
sjs.isStreamHealthy(acc, sa)
require_Equal(t, node.State(), Follower)
}

func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down Expand Up @@ -7432,6 +7506,82 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
checkNodeIsClosed(ca)
}

func TestJetStreamClusterConsumerHealthCheckMustNotDeleteEarly(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

waitForConsumerAssignments := func() {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
if s.getJetStream().consumerAssignment(globalAccountName, "TEST", "CONSUMER") == nil {
return fmt.Errorf("stream assignment not found on %s", s.Name())
}
}
return nil
})
}
getConsumerAssignment := func(rs *Server) (*jetStream, *consumerAssignment, *stream, *consumer) {
acc, err := rs.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NotNil(t, err)
o := mset.lookupConsumer("CONSUMER")

sjs := rs.getJetStream()
sjs.mu.RLock()
defer sjs.mu.RUnlock()

sas := sjs.cluster.streams[globalAccountName]
require_True(t, sas != nil)
sa := sas["TEST"]
require_True(t, sa != nil)
ca := sa.consumers["CONSUMER"]
require_True(t, ca != nil)
ca.Created = time.Time{}
return sjs, ca, mset, o
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Retention: nats.InterestPolicy, // Replicated consumers by default
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)
waitForConsumerAssignments()

// We manually clear the node on the consumer.
rs := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER")
sjs, ca, mset, o := getConsumerAssignment(rs)
o.mu.Lock()
o.node = nil
o.mu.Unlock()
sjs.mu.Lock()
group := ca.Group
if group == nil {
sjs.mu.Unlock()
t.Fatal("ca.Group not initialized")
}
node := group.node
if node == nil {
sjs.mu.Unlock()
t.Fatal("ca.Group.node not initialized")
}
sjs.mu.Unlock()

// The health check gets the Raft node of the assignment and checks it against the
// Raft node of the consumer. We simulate a race condition where the consumer's Raft node
// is not yet initialized. The health check MUST NOT delete the node.
sjs.isConsumerHealthy(mset, "CONSUMER", ca)
require_Equal(t, node.State(), Follower)
}

func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
Loading