From bac023a71ba191e60f43ce3ca01a25d08d0a70c2 Mon Sep 17 00:00:00 2001 From: Lien Date: Thu, 18 Nov 2021 16:16:00 +0800 Subject: [PATCH] fix: ack msg before we delete it --- redisq/queue.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/redisq/queue.go b/redisq/queue.go index ef427a3..e4c1848 100644 --- a/redisq/queue.go +++ b/redisq/queue.go @@ -226,6 +226,11 @@ func (q *Queue) Release(msg *taskq.Message) error { // Make the delete and re-queue operation atomic in case we crash midway // and lose a message. pipe := q.redis.TxPipeline() + // When Release a msg, ack it before we delete msg. + if err := pipe.XAck(msg.Ctx, q.stream, q.streamGroup, msg.ID).Err(); err != nil { + return err + } + err := pipe.XDel(msg.Ctx, q.stream, msg.ID).Err() if err != nil { return err