Skip to content

Commit

Permalink
fix: ack msg before we delete it
Browse files Browse the repository at this point in the history
  • Loading branch information
lilien1010 committed Nov 18, 2021
1 parent 8fb8fe4 commit bac023a
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions redisq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ func (q *Queue) Release(msg *taskq.Message) error {
// Make the delete and re-queue operation atomic in case we crash midway
// and lose a message.
pipe := q.redis.TxPipeline()
// When Release a msg, ack it before we delete msg.
if err := pipe.XAck(msg.Ctx, q.stream, q.streamGroup, msg.ID).Err(); err != nil {
return err
}

err := pipe.XDel(msg.Ctx, q.stream, msg.ID).Err()
if err != nil {
return err
Expand Down

0 comments on commit bac023a

Please sign in to comment.