Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config change] Support multiple producers in redis streams #2581

Merged
merged 23 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7e49635
Support multiple producers in redis streams
ganeshvanahalli Aug 15, 2024
e155ebb
trim acknotifiers map and use previous keepalive timeouts
ganeshvanahalli Aug 15, 2024
1473c60
Use faster hash function
ganeshvanahalli Aug 16, 2024
85ba40b
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Aug 16, 2024
40e6b9b
address PR comments, handle memory better and add test to cover incor…
ganeshvanahalli Aug 26, 2024
a2d9b03
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Aug 26, 2024
b5fff68
increase TestProducerConfig requestTimeout
ganeshvanahalli Aug 26, 2024
cee4620
fix tests
ganeshvanahalli Aug 26, 2024
2a3dc1d
rectify xautoclaim logic and address PR comments
ganeshvanahalli Aug 27, 2024
ecd2722
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Aug 27, 2024
4028e99
merge master and resolve conflicts
ganeshvanahalli Sep 5, 2024
dbcf908
Merge branch 'master' into multiple-producers-redisstream
tsahee Sep 13, 2024
8525ad4
address PR comments
ganeshvanahalli Sep 13, 2024
4548690
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Sep 13, 2024
a8967d1
remove separate id impl
ganeshvanahalli Sep 16, 2024
9508627
merge master and resolve conflicts
ganeshvanahalli Sep 16, 2024
6123021
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Sep 27, 2024
9a96fbf
address PR comments
ganeshvanahalli Sep 27, 2024
43a54e7
remove unnecessary error log in decrementMsgIdByOne
ganeshvanahalli Sep 27, 2024
8c655e6
merge master and address PR comments
ganeshvanahalli Oct 8, 2024
08e0fc4
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Oct 8, 2024
dc72b58
Merge branch 'master' into multiple-producers-redisstream
ganeshvanahalli Oct 9, 2024
01a1b38
Merge branch 'master' into multiple-producers-redisstream
tsahee Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 61 additions & 17 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"strconv"
"time"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -18,7 +21,7 @@ type ConsumerConfig struct {
// Timeout of result entry in Redis.
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
// Minimum idle time after which messages will be autoclaimed
IdletimeToAutoclaim time.Duration `koanf:"Idletime-to-autoclaim"`
IdletimeToAutoclaim time.Duration `koanf:"idletime-to-autoclaim"`
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

var DefaultConsumerConfig = ConsumerConfig{
Expand All @@ -33,7 +36,7 @@ var TestConsumerConfig = ConsumerConfig{

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".Idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers")
f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers")
}

// Consumer implements a consumer for redis stream provides heartbeat to
Expand Down Expand Up @@ -82,20 +85,54 @@ func (c *Consumer[Request, Response]) StreamName() string {
return c.redisStream
}

func decrementMsgIdByOne(msgId string) string {
id, err := getUintParts(msgId)
if err != nil {
log.Error("Error decrementing start of XAutoClaim by one, defaulting to 0", "err", err)
return "0"
}
if id[1] > 0 {
return strconv.FormatUint(id[0], 10) + "-" + strconv.FormatUint(id[1]-1, 10)
} else if id[0] > 0 {
return strconv.FormatUint(id[0]-1, 10) + "-" + strconv.FormatUint(math.MaxUint64, 10)
} else {
log.Error("Error decrementing start of XAutoClaim by one, defaulting to 0", "err", err)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return "0"
}
}

// Consumer first checks it there exists pending message that is claimed by
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], chan struct{}, error) {
// First try to XAUTOCLAIM, this prioritizes processing PEL messages
// that have been waiting for more than IdletimeToAutoclaim duration
messages, _, err := c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: c.redisGroup,
Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: "0",
Count: 1, // Limit the number of messages to claim
}).Result()
if len(messages) != 1 || err != nil {
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
var messages []redis.XMessage
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: c.redisStream,
Group: c.redisGroup,
Start: "-",
End: "+",
Count: math.MaxInt64,
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
Idle: c.cfg.IdletimeToAutoclaim,
}).Result(); err != nil {
if !errors.Is(err, redis.Nil) {
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "penindlen", len(pendingMsgs))
}
} else if len(pendingMsgs) > 0 {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
idx := rand.Intn(len(pendingMsgs))
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: c.redisGroup,
Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
Count: 1,
}).Result()
if err != nil {
log.Error("error from xautoclaim", "err", err)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
if len(messages) == 0 {
// Fallback to reading new messages
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.redisGroup,
Expand Down Expand Up @@ -123,7 +160,7 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
data, ok = (value).(string)
)
if !ok {
return nil, nil, fmt.Errorf("casting request to string: %w", err)
return nil, nil, errors.New("error casting request to string")
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
Expand All @@ -132,14 +169,18 @@ func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Req
ackNotifier := make(chan struct{})
c.StopWaiter.LaunchThread(func(ctx context.Context) {
for {
if err := c.client.XClaim(ctx, &redis.XClaimArgs{
// Use XClaimJustID so that we would have clear difference between invalid requests that are claimed multiple times due to xautoclaim and
// valid requests that are just being claimed in regular intervals to indicate heartbeat
if ids, err := c.client.XClaimJustID(ctx, &redis.XClaimArgs{
Stream: c.redisStream,
Group: c.redisGroup,
Consumer: c.id,
MinIdle: 0,
Messages: []string{messages[0].ID},
}).Err(); err != nil {
log.Error("error claiming message, it might be possible that other consumers might pick this request", "msgID", messages[0].ID)
}).Result(); err != nil {
log.Error("Error claiming message, it might be possible that other consumers might pick this request", "msgID", messages[0].ID)
} else if len(ids) != 1 {
log.Warn("XClaimJustID returned empty response when indicating hearbeat", "msgID", messages[0].ID)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
select {
case <-ackNotifier:
Expand Down Expand Up @@ -174,5 +215,8 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, id string,
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
}
return nil
}
120 changes: 84 additions & 36 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,26 @@ type ProducerConfig struct {
CheckResultInterval time.Duration `koanf:"check-result-interval"`
// Timeout of entry's written to redis by producer
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
// RequestTimeout is a TTL for any message sent to the redis stream
RequestTimeout time.Duration `koanf:"request-timeout"`
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

var DefaultProducerConfig = ProducerConfig{
CheckResultInterval: 5 * time.Second,
ResponseEntryTimeout: time.Hour,
RequestTimeout: time.Hour, // should we increase this?
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}

var TestProducerConfig = ProducerConfig{
CheckResultInterval: 5 * time.Millisecond,
ResponseEntryTimeout: time.Minute,
RequestTimeout: time.Minute,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".response-entry-timeout", DefaultProducerConfig.ResponseEntryTimeout, "timeout after which responses written from producer to the redis are cleared. Currently used for the key mapping unique request id to redis stream message id")
f.Duration(prefix+".request-timeout", DefaultProducerConfig.RequestTimeout, "timeout after which the message in redis stream is considered as errored, this prevents workers from working on wrong requests indefinitely")
}

func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
Expand All @@ -91,37 +96,59 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream
}, nil
}

func setMaxMsgIdInt(maxMsgIdInt *[2]uint64, msgId string) error {
func getUintParts(msgId string) ([2]uint64, error) {
idParts := strings.Split(msgId, "-")
if len(idParts) != 2 {
return fmt.Errorf("invalid i.d: %v", msgId)
return [2]uint64{}, fmt.Errorf("invalid i.d: %v", msgId)
}
idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d: %v err: %w", msgId, err)
}
if idTimeStamp < maxMsgIdInt[0] {
return nil
return [2]uint64{}, fmt.Errorf("invalid i.d: %v err: %w", msgId, err)
}
idSerial, err := strconv.ParseUint(idParts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d serial: %v err: %w", msgId, err)
return [2]uint64{}, fmt.Errorf("invalid i.d serial: %v err: %w", msgId, err)
}
return [2]uint64{idTimeStamp, idSerial}, nil
}

// cmpMsgId compares two msgid's and returns (0) if equal, (-1) if msgId1 < msgId2, (1) if msgId1 > msgId2, (-2) if not comparable (or error)
func cmpMsgId(msgId1, msgId2 string) int {
id1, err := getUintParts(msgId1)
if err != nil {
log.Trace("error comparing msgIds", "msgId1", msgId1, "msgId2", msgId2)
return -2
}
if idTimeStamp > maxMsgIdInt[0] {
maxMsgIdInt[0] = idTimeStamp
maxMsgIdInt[1] = idSerial
return nil
id2, err := getUintParts(msgId2)
if err != nil {
log.Trace("error comparing msgIds", "msgId1", msgId1, "msgId2", msgId2)
return -2
}
// idTimeStamp == maxMsgIdInt[0]
if idSerial > maxMsgIdInt[1] {
maxMsgIdInt[1] = idSerial
if id1[0] < id2[0] {
return -1
} else if id1[0] > id2[0] {
return 1
} else if id1[1] < id2[1] {
return -1
} else if id1[1] > id2[1] {
return 1
}
return nil
return 0
}

// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
maxMsgIdInt := [2]uint64{0, 0}
pelData, err := p.client.XPending(ctx, p.redisStream, p.redisGroup).Result()
if err != nil {
log.Error("error getting PEL data from xpending, xtrimming is disabled", "err", err)
}
deletePromise := func(id string) {
// Try deleting UNIQUEID_MSGID_MAP_KEY corresponding to this id from redis
if err := p.client.Del(ctx, MessageKeyFor(p.redisStream, id)+UNIQUEID_MSGID_MAP_KEY).Err(); err != nil {
log.Error("Error deleting key from redis that flags that a request is being processed", "err", err)
}
delete(p.promises, id)
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
Expand All @@ -135,16 +162,22 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D
if err != nil {
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
} else {
// The request this producer is waiting for has been past its TTL or is older than current PEL's lower,
// so safe to error and stop tracking this promise
allowedOldestID := fmt.Sprintf("%d-0", time.Now().Add(-p.cfg.RequestTimeout).UnixMilli())
if pelData != nil && pelData.Lower != "" {
allowedOldestID = pelData.Lower
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if cmpMsgId(msgIDAndPromise.msgID, allowedOldestID) == -1 {
msgIDAndPromise.promise.ProduceError(errors.New("error getting response, request has been waiting for too long"))
log.Error("error getting response, request has been waiting past its TTL")
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
errored++
deletePromise(id)
}
}
continue
}
// We keep track of a maxMsgId of a successfully solved request, because messages
// with id lower than this are either ack-ed or in PEL, so its safe to call XTRIMMINID on maxMsgId
errSetId := setMaxMsgIdInt(&maxMsgIdInt, msgIDAndPromise.msgID)
if errSetId != nil {
log.Error("error setting maxMsgId", "err", err)
return p.cfg.CheckResultInterval
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
msgIDAndPromise.promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
Expand All @@ -154,21 +187,36 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D
msgIDAndPromise.promise.Produce(resp)
responded++
}
// Try deleting UNIQUEID_MSGID_MAP_KEY corresponding to this id from redis
if err := p.client.Del(ctx, msgKey+UNIQUEID_MSGID_MAP_KEY).Err(); err != nil {
log.Error("Error deleting key from redis that flags that a request is being processed", "err", err)
}
delete(p.promises, id)
deletePromise(id)
}
var trimmed int64
var trimErr error
maxMsgId := "+"
// If at least response for one promise was found, find the maximum of the found ones and XTRIMMINID from that msg id + 1
if maxMsgIdInt[0] > 0 {
maxMsgId = fmt.Sprintf("%d-%d", maxMsgIdInt[0], maxMsgIdInt[1]+1)
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, maxMsgId).Result()
// XDEL on consumer side already deletes acked messages (mark as deleted) but doesnt claim the memory back, XTRIM helps in claiming this memory in normal conditions
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
// pelData might be outdated when we do the xtrim, but thats ok as the messages are also being trimmed by other producers
if pelData != nil && pelData.Lower != "" {
trimmed, trimErr := p.client.XTrimMinID(ctx, p.redisStream, pelData.Lower).Result()
log.Trace("trimming", "xTrimMinID", pelData.Lower, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
// Check if pelData.Lower has been past its TTL and if it is then ack it to remove from PEL and delete it, once
// its taken out from PEL the producer that sent this request will handle the corresponding promise accordingly (if PEL is non-empty)
allowedOldestID := fmt.Sprintf("%d-0", time.Now().Add(-p.cfg.RequestTimeout).UnixMilli())
if cmpMsgId(pelData.Lower, allowedOldestID) == -1 {
if err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: 0,
Messages: []string{pelData.Lower},
}).Err(); err != nil {
log.Error("error claiming PEL's lower message thats past its TTL", "msgID", pelData.Lower, "err", err)
return p.cfg.CheckResultInterval
}
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, pelData.Lower).Result(); err != nil {
log.Error("error acking PEL's lower message thats past its TTL", "msgID", pelData.Lower, "err", err)
return p.cfg.CheckResultInterval
}
if _, err := p.client.XDel(ctx, p.redisStream, pelData.Lower).Result(); err != nil {
log.Error("error deleting PEL's lower message thats past its TTL", "msgID", pelData.Lower, "err", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return 0 here, so we'll immediately keep clearing old results as long as these exit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant return 0 after the delete succeeded, if err != nil, so we'd immediately be called again because there's a good chance there are more messages that need to be deleted.
In case of errors above it makes sense to return non-zero for backoff.

}
}
log.Trace("trimming", "xTrimMinID", maxMsgId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
return p.cfg.CheckResultInterval
}

Expand Down
Loading
Loading