Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10447,6 +10447,122 @@ func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) {
}
}

func TestJetStreamClusterNoInterestDesyncOnConsumerCreate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Pick a random server that will not know about the new consumer being created.
// If servers determine "no interest" individually, these servers will desync.
rs := c.randomNonLeader()
sjs := rs.getJetStream()
meta := sjs.getMetaGroup()
require_NoError(t, meta.PauseApply())

sub, err := js.PullSubscribe(_EMPTY_, "DURABLE", nats.BindStream("TEST"))
require_NoError(t, err)
defer sub.Drain()

checkConsumersAssigned := func(expected int) {
t.Helper()
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
var count int
for _, s := range c.servers {
_, _, jsa := s.globalAccount().getJetStreamFromAccount()
if jsa.consumerAssigned("TEST", "DURABLE") {
count++
}
}
if count != expected {
return fmt.Errorf("expected %d, got %d", expected, count)
}
return nil
})
}
// Confirm only two servers know about the consumer.
checkConsumersAssigned(2)
c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE")

// Publish a single message. All servers will receive this, but only two will store it.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
checkLastSeq := func(lseq uint64) {
t.Helper()
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
for _, s := range c.servers {
mset, err := s.globalAccount().lookupStream("TEST")
if err != nil {
return err
}
if seq := mset.lastSeq(); seq != lseq {
return fmt.Errorf("expected %d, got %d", lseq, seq)
}
}
return nil
})
}
checkLastSeq(1)

// Resume the meta layer such that the consumer gets created on the remaining server.
meta.ResumeApply()
checkConsumersAssigned(3)

// All servers will now store another published message.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
checkLastSeq(2)

// Make sure the consumer leader is on the same server that didn't store the first message.
cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE")
if cl != rs {
mset, err := cl.globalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("DURABLE")
require_NotNil(t, o)
n := o.raftNode()
require_NoError(t, n.StepDown(rs.NodeName()))
c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE")
cl = c.consumerLeader(globalAccountName, "TEST", "DURABLE")
require_Equal(t, cl, rs)
}

// Since the consumer leader is the same as the server that didn't store the first message,
// it can only receive and ack the second message.
msgs, err := sub.Fetch(1, nats.MaxWait(time.Second))
require_NoError(t, err)
require_Len(t, len(msgs), 1)
metadata, err := msgs[0].Metadata()
require_NoError(t, err)
require_Equal(t, metadata.Sequence.Stream, 2)
require_Equal(t, metadata.NumPending, 0)
require_NoError(t, msgs[0].AckSync())

checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
// The servers will eventually be synced up again, but this relies on the interest state being checked.
for _, s := range c.servers {
if s == rs {
continue
}
mset, err := s.globalAccount().lookupStream("TEST")
if err != nil {
return err
}
mset.checkInterestState()
}
return checkState(t, c, globalAccountName, "TEST")
})
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
12 changes: 12 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7607,7 +7607,19 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) bool {
// Only propose message deletion to the stream if we're consumer leader, otherwise all followers would also propose.
// We must be the consumer leader, since we know for sure we've stored the message and don't register as pre-ack.
if o != nil && !o.IsLeader() {
// Currently, interest-based streams can race on "no interest" because consumer creates/updates go over
// the meta layer and published messages go over the stream layer. Some servers could then either store
// or not store some initial set of messages that gained new interest. To get the stream back in sync,
// we allow moving the first sequence up.
// TODO(mvv): later on only the stream leader should determine "no interest"
interestRaiseFirst := mset.cfg.Retention == InterestPolicy && seq == state.FirstSeq
mset.mu.Unlock()
if interestRaiseFirst {
if _, err := store.RemoveMsg(seq); err == ErrStoreEOF {
// This should not happen, but being pedantic.
mset.registerPreAckLock(o, seq)
}
}
// Must still mark as removal if follower. If we become leader later, we must be able to retry the proposal.
return true
}
Expand Down