Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3086,17 +3086,20 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if sseq >= o.sseq {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
// than the last stream sequence.
// than the last stream sequence. But could happen if we've just become
// consumer leader, and we are not up-to-date on the stream yet.
var ss StreamState
mset.store.FastState(&ss)
if sseq > ss.LastSeq {
o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d",
o.acc.Name, o.stream, o.name, sseq, ss.LastSeq)
// FIXME(dlc) - For 2.11 onwards should we return an error here to the caller?
o.mu.Unlock()
return false
}
o.sseq = sseq + 1
// Even though another leader must have delivered a message with this sequence, we must not adjust
// the current pointer. This could otherwise result in a stuck consumer, where messages below this
// sequence can't be redelivered, and we'll have incorrect pending state and ack floors.
o.mu.Unlock()
return false
}

// Let the owning stream know if we are interest or workqueue retention based.
Expand Down
72 changes: 72 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7460,6 +7460,78 @@ func TestJetStreamClusterPeerRemoveStreamConsumerDesync(t *testing.T) {
})
}

func TestJetStreamClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Publish some messages into the stream.
for i := 0; i < 3; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

// Ensure all servers are up-to-date.
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
defer sub.Unsubscribe()

// We only fetch 1 message here, since the condition is hard to trigger otherwise.
// But, we're simulating fetching 3 messages and the consumer leader changing while
// deliveries are happening. This will result in the new consumer leader not knowing
// that the last two messages were also delivered (since we don't wait for quorum before delivering).
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)

// The client could send an acknowledgement, while the new consumer leader doesn't know about it
// ever being delivered. It must NOT adjust any state and ignore the request to remain consistent.
_, err = nc.Request("$JS.ACK.TEST.CONSUMER.1.3.3.0.0", nil, time.Second)
require_Error(t, err, nats.ErrTimeout)

// Acknowledging a message that is known to be delivered is accepted still.
_, err = nc.Request("$JS.ACK.TEST.CONSUMER.1.1.1.0.0", nil, time.Second)
require_NoError(t, err)

// Check for consistent consumer info.
ci, err := js.ConsumerInfo("TEST", "CONSUMER")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)

// Fetching for new messages MUST return the two messages the new consumer leader didn't
// know were delivered before. If we wouldn't deliver these we'd have a stuck consumer.
msgs, err = sub.Fetch(2)
require_NoError(t, err)
require_Len(t, len(msgs), 2)
for _, msg := range msgs {
require_NoError(t, msg.AckSync())
}

// Check for consistent consumer info.
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 3)
require_Equal(t, ci.Delivered.Stream, 3)
require_Equal(t, ci.AckFloor.Consumer, 3)
require_Equal(t, ci.AckFloor.Stream, 3)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down