Skip to content

[eval] Improve consumer pending count tracking during stream contention#2

Open
Uzay-G wants to merge 1 commit intoeval/base-pr-6297from
eval/upstream-pr-6297
Open

[eval] Improve consumer pending count tracking during stream contention#2
Uzay-G wants to merge 1 commit intoeval/base-pr-6297from
eval/upstream-pr-6297

Conversation

@Uzay-G
Copy link
Copy Markdown

@Uzay-G Uzay-G commented Feb 21, 2026

Mirror of nats-io#6297 (MERGED) for Orpheus review evaluation.

Upstream: nats-io#6297


Original PR description:

The drifting tests would occasionally fail due to the consumer pending count drifting. This was due to a race condition described on checkNumPending:

// Does some sanity checks to see if we should re-calculate.
// Since there is a race when decrementing when there is contention at the beginning of the stream.
// The race is a getNextMsg skips a deleted msg, and then the decStreamPending call fires.
// This does some quick sanity checks to see if we should re-calculate num pending.
// Lock should be held.
func (o *consumer) checkNumPending() uint64 {

This PR doesn't fix this race condition, but improves the tracking which improves test reliability. If the race condition happens we can still check if the deleted message is between o.asflr and the o.sseq that's skipped ahead. In which case we can still decrement the pending count (o.npc) if the message is not pending/delivered. This improves the reliability of the pending count tracking as long as the ack floor hasn't moved up yet.

Signed-off-by: Maurice van Veen github@mauricevanveen.com

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@orpheus-by-fulcrum-dev-uzay-g
Copy link
Copy Markdown

Review started.

Copy link
Copy Markdown

@orpheus-by-fulcrum-dev-uzay-g orpheus-by-fulcrum-dev-uzay-g bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All checks passed.

Expected behavior

  • When a stream message is deleted before the consumer delivers it, num_pending decrements correctly
  • When getNextMsg skips past deleted messages (advancing sseq), and decStreamPending fires afterward, the pending count still decrements correctly (the race condition scenario)
  • Messages already in the consumer's pending map (delivered but not yet acked) are not double-decremented when deleted from the stream
  • Messages below the ack floor are not decremented (already accounted for)
  • Filtered consumers only decrement for matching subjects
  • AckNone consumers with fast-advancing ack floor still track deletions above the floor

What happens

  • ✅ Happy path works: publishing, deleting, and consuming messages produces accurate num_pending at every step (3→2→1→0 after sequential deletions)
  • ✅ Concurrent contention works: 20 iterations of concurrent publish/delete/consume with 50 messages each all finish with num_pending=0
  • ✅ Edge cases correct: filtered consumers ignore non-matching deletes, AckNone consumers track deletions above ack floor, deleting already-acked messages has no effect on pending count, stream purge resets to 0
  • ✅ All 77 TestJetStreamConsumer* tests pass including the new TestJetStreamConsumerDecrementPendingCountOnSkippedMsg test, with no regressions
Detailed evidence

Setup

export PATH=$PATH:/usr/local/go/bin:$HOME/go/bin
cd /home/agent/nats-server

# Build
go build -o nats-server .

# Start server with JetStream
./nats-server -js -p 4222 -store_dir /tmp/nats-data &

Server started successfully:

[INF] Starting nats-server
[INF]   Version:  2.11.0-dev
[INF]   Git:      [0d0720d]
[INF] Starting JetStream
[INF] Listening for client connections on 0.0.0.0:4222
[INF] Server is ready

Demo 1: Happy path — pending count tracks message deletions

# Create stream and consumer
nats request '$JS.API.STREAM.CREATE.TEST' '{"name":"TEST","subjects":["foo"],...}'
nats request '$JS.API.CONSUMER.DURABLE.CREATE.TEST.CONSUMER' '{"stream_name":"TEST","config":{"durable_name":"CONSUMER","ack_policy":"explicit",...}}'

# Publish 3 messages
nats pub foo "message 1"  # seq=1
nats pub foo "message 2"  # seq=2
nats pub foo "message 3"  # seq=3

Consumer info after publish:

"num_pending":3

Delete seq=2:

nats request '$JS.API.STREAM.MSG.DELETE.TEST' '{"seq":2}'
# => "num_pending":2

Delete seq=1:

nats request '$JS.API.STREAM.MSG.DELETE.TEST' '{"seq":1}'
# => "num_pending":1

Fetch remaining message:

nats request '$JS.API.CONSUMER.MSG.NEXT.TEST.CONSUMER' '{"batch":1}'
# => "message 3"
# Consumer info: "num_pending":0, "num_ack_pending":1

Demo 2: Interleaved deletes and fetches

Created stream with 10 messages. Deleted messages 1, 3, 5, 7 (4 deletions):

num_pending after deletes: 6  (correct: 10 - 4 = 6)

Fetched 6 remaining messages: 2, 4, 6, 8, 9, 10. Final:

num_pending: 0

Demo 3: Concurrent contention stress test (Go program)

Ran a Go program that concurrently:

  • Publishes 100 messages
  • Deletes odd-numbered messages 1-49 in one goroutine
  • Fetches and acks messages in another goroutine
Published 100 messages
Initial num_pending: 100
Fetched and acked 80 messages
Deleted odd messages 1-49
Final state:
{
  "ack_floor_stream_seq": 82,
  "delivered_stream_seq": 82,
  "num_ack_pending": 0,
  "num_pending": 18,
  "stream_msgs": 75
}
Fetched 18 remaining messages
Final num_pending: 0 (should be 0)
Final num_ack_pending: 0 (should be 0)
SUCCESS: pending count is accurate

Demo 4: Aggressive stress test (20 iterations)

PASS iter=0: stream_msgs=31, fetched=45, deleted=19
PASS iter=1: stream_msgs=35, fetched=48, deleted=15
PASS iter=2: stream_msgs=34, fetched=47, deleted=16
PASS iter=3: stream_msgs=33, fetched=48, deleted=17
PASS iter=4: stream_msgs=33, fetched=47, deleted=17
PASS iter=5: stream_msgs=35, fetched=47, deleted=15
PASS iter=6: stream_msgs=33, fetched=46, deleted=17
PASS iter=7: stream_msgs=34, fetched=45, deleted=16
PASS iter=8: stream_msgs=35, fetched=48, deleted=15
PASS iter=9: stream_msgs=32, fetched=48, deleted=18
PASS iter=10: stream_msgs=33, fetched=47, deleted=17
PASS iter=11: stream_msgs=33, fetched=45, deleted=17
PASS iter=12: stream_msgs=37, fetched=47, deleted=13
PASS iter=13: stream_msgs=33, fetched=46, deleted=17
PASS iter=14: stream_msgs=35, fetched=48, deleted=15
PASS iter=15: stream_msgs=33, fetched=48, deleted=17
PASS iter=16: stream_msgs=35, fetched=47, deleted=15
PASS iter=17: stream_msgs=35, fetched=47, deleted=15
PASS iter=18: stream_msgs=32, fetched=45, deleted=18
PASS iter=19: stream_msgs=34, fetched=45, deleted=16

Results: 20 passed, 0 failed out of 20 iterations

Demo 5: Edge cases

Filtered consumer — consumer filters on filter.foo, stream has filter.>:

After 3 matching + 2 non-matching: num_pending=3 (expect 3)
After deleting matching seq=1: num_pending=2 (expect 2)
After deleting non-matching seq=2: num_pending=2 (expect 2, should stay same)
After deleting matching seq=3: num_pending=1 (expect 1)
Fetched: match 3
Final: num_pending=0 (expect 0)

AckNone consumer — ack floor advances immediately on delivery:

Before consume: num_pending=5 (expect 5)
After consuming 2: num_pending=3, ack_floor_stream=2 (expect pending=3, floor=2)
After deleting seq=3 (above floor): num_pending=2 (expect 2)
Final: num_pending=0 (expect 0)

Delete after partial ack — some messages acked, then delete from both sides of ack floor:

After acking 5: num_pending=5, ack_floor=5 (expect pending=5, floor=5)
After deleting seq=6 (above floor): num_pending=4 (expect 4)
After deleting acked seq=3 (below floor): num_pending=4 (expect 4, unchanged)
Final: num_pending=0 (expect 0)

Stream purge:

Before purge: num_pending=20 (expect 20)
After purge: num_pending=0 (expect 0)
After new publish: num_pending=5 (expect 5)

Test suite

New test:

=== RUN   TestJetStreamConsumerDecrementPendingCountOnSkippedMsg
--- PASS: TestJetStreamConsumerDecrementPendingCountOnSkippedMsg (0.01s)

All consumer tests (77 tests):

ok  	github.com/nats-io/nats-server/v2/server	121.618s

All pending-specific tests:

--- PASS: TestJetStreamConsumerPendingBugWithKV (0.01s)
--- PASS: TestJetStreamConsumerPendingCountWithRedeliveries (0.12s)
--- PASS: TestJetStreamConsumerDecrementPendingCountOnSkippedMsg (0.01s)
--- PASS: TestJetStreamWouldExceedLimits (0.00s)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants