You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm pretty sure this is some very edge case, but it took me a few hours to figure out what was wrong. I wrote a simple program to reproduce the behavior, see steps to reproduce.
When using a jetstream stream with WorkQueuePolicy retention policy, when a consumer is deleted before a message received from it is acknowledged:
Ack() does not fail.
Consumer keeps receiving the same message over and over again when recreated.
The behavior has not been tested with plain NATS or other retention policies.
Expected behavior
One of:
Ack() fails, so it's clearly visible that something goes off.
Ack() does not fail, but the message is deleted from the stream.
package main
import (
"context""fmt""time""github.com/samber/lo""github.com/google/uuid""github.com/nats-io/nats.go""github.com/nats-io/nats.go/jetstream"
)
const (
stream="EVENTS"subject="events"
)
var (
nc=lo.Must(nats.Connect(nats.DefaultURL))
js=lo.Must(jetstream.New(nc))
ctx=context.Background()
)
funcconsumer() {
for {
cons:=lo.Must(js.CreateOrUpdateConsumer(ctx, stream, jetstream.ConsumerConfig{
Durable: "consumer",
FilterSubject: subject,
}))
msg:=lo.Must(cons.Next())
// If consumer is deleted before the message is acknowledged// Ack does not fail, but the message is not removed from the stream either.lo.Must0(js.DeleteConsumer(ctx, stream, "consumer"))
fmt.Printf("Received message: %s\n", string(msg.Data()))
lo.Must0(msg.Ack())
}
}
funcproducer() {
for {
msg:=uuid.NewString()
lo.Must(js.Publish(context.Background(), subject, []byte(msg)))
fmt.Printf("Message published %s\n", msg)
time.Sleep(100*time.Millisecond)
}
}
funcmain() {
lo.Must(js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: stream,
Retention: jetstream.WorkQueuePolicy,
Subjects: []string{subject},
}))
goconsumer()
producer()
}
Run the program, observe the consumer to log the same message over and over again, no panics.
Remove lo.Must0(js.DeleteConsumer(ctx, stream, "consumer")), observe the consumer to log different meessages.
The text was updated successfully, but these errors were encountered:
zhulik
changed the title
Acknowledging a message after consumer is deleted does not fail, but the message is not removed from the stream either.
Acknowledging a message after consumer is deleted does not fail, but the message is not removed from the stream either
Feb 2, 2025
Observed behavior
I'm pretty sure this is some very edge case, but it took me a few hours to figure out what was wrong. I wrote a simple program to reproduce the behavior, see steps to reproduce.
When using a jetstream stream with
WorkQueuePolicy
retention policy, when a consumer is deleted before a message received from it is acknowledged:Ack()
does not fail.The behavior has not been tested with plain NATS or other retention policies.
Expected behavior
One of:
Ack()
fails, so it's clearly visible that something goes off.Ack()
does not fail, but the message is deleted from the stream.Server and client version
v1.38.0
v2.10.25
, Docker imagenats:latest
, it was latest when pulled anyway.Host environment
OS: Arch Linux
CPU: AMD Ryzen 7 PRO 5850U and AMD Ryzen 5 5600
Docker:
Steps to reproduce
Run the program, observe the consumer to log the same message over and over again, no panics.
Remove
lo.Must0(js.DeleteConsumer(ctx, stream, "consumer"))
, observe the consumer to log different meessages.The text was updated successfully, but these errors were encountered: