import delay "github.com/yaodongen/delay-queue"
- safe, fast and nice concurrency support
- Benchmark with 8 thread in local can read 7079 ns/op, equals to 141262 reads / s. (
go test -bench=. -run=none
)
package main
import (
"context"
"github.com/go-redis/redis/v8"
delay "github.com/yaodongen/delay-queue"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// producer
err := delay.AddToQueue(ctx, rdb, "key", "123", 5, 86400)
if err != nil {
// your own logic
}
// consumer
go func() {
resCh, errCh := delay.GetFromQueue(ctx, rdb, "key")
for res := range resCh {
// your own logic
_ = res
}
for err := range errCh {
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
// your own logic
}
}
}()
}
- add the timePiece(sample: "1645614542") to sorted set
- rpush the real data to timePiece
- get a timePiece from sorted set which is before time.Now()
- lpop the real data from timePiece
- 把时间片(样例: "1645614542") 添加到 zset 中
- 把需要存储的数据 rpush 到时间片中
- 从 zset 中取出早于当前时间戳的一个时间片
- 从时间片中 lpop 对应的数据