-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathproducer.go
127 lines (114 loc) · 3.55 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package gq
import (
"context"
"fmt"
"strings"
"time"
"github.com/cenkalti/backoff"
"github.com/jmoiron/sqlx"
"github.com/rs/zerolog/log"
)
const (
messageBufferSize = 100
pushBatchSize = 1
defaultPushPeriod = time.Millisecond * 50
defaultMaxRetryPeriods = 3
maxBatchQuerySize = (1 << 16) - 1
)
// ProducerOptions represents the options which can be used to tailor producer behaviour
type ProducerOptions struct {
// PushPeriod is the period with which messages should be pushed (default: 50ms).
// This can be tuned to achieve the desired throughput/latency tradeoff
PushPeriod time.Duration
// MaxRetryPeriods is the maximum number of push periods to retry a batch of messages for before discarding them (default: 3)
MaxRetryPeriods int
// Concurrency is the number of concurrent goroutines to push messages from (default: 1)
Concurrency int
}
func defaultProducerOpts() ProducerOptions {
return ProducerOptions{
PushPeriod: defaultPushPeriod,
MaxRetryPeriods: defaultMaxRetryPeriods,
Concurrency: 1,
}
}
// Producer represents a message queue producer
type Producer struct {
db *sqlx.DB
msgChan chan []byte
opts ProducerOptions
}
func newProducer(ctx context.Context, db *sqlx.DB, opts *ProducerOptions) (*Producer, error) {
p := &Producer{db: db, msgChan: make(chan []byte)}
if opts != nil {
p.opts = *opts
} else {
p.opts = defaultProducerOpts()
}
for i := 0; i < p.opts.Concurrency; i++ {
go p.startPushingMessages(ctx)
}
return p, nil
}
// Push pushes a message onto the queue
func (p *Producer) Push(message []byte) {
p.msgChan <- message
}
func (p Producer) startPushingMessages(ctx context.Context) {
buf := make([][]byte, 0, messageBufferSize)
ticker := time.NewTicker(p.opts.PushPeriod)
retryTimeout := p.opts.PushPeriod * time.Duration(p.opts.MaxRetryPeriods)
for {
select {
case <-ctx.Done():
log.Debug().Msgf("stopping message pushing: %s", ctx.Err())
return
case m := <-p.msgChan:
buf = append(buf, m)
if len(buf) == maxBatchQuerySize {
p.pushMessagesWithRetryTimeout(ctx, buf, retryTimeout)
buf = clear(buf)
}
case <-ticker.C:
if len(buf) == 0 {
continue
}
p.pushMessagesWithRetryTimeout(ctx, buf, retryTimeout)
buf = clear(buf)
}
}
}
func clear(buffer [][]byte) [][]byte {
for i := range buffer {
buffer[i] = []byte{} // allow elements to be garbage-collected
}
return buffer[:0]
}
func (p Producer) pushMessagesWithRetryTimeout(ctx context.Context, messages [][]byte, retryTimeout time.Duration) {
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
if err := backoff.Retry(func() error { return p.pushMessages(messages) }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)); err != nil {
log.Err(err).Msg("error pushing messages")
}
cancel() // release ctx resources if timeout hasn't expired
}
func (p Producer) pushMessages(messages [][]byte) error {
log.Debug().Msgf("pushing %d messages onto queue", len(messages))
valuesListBuilder := strings.Builder{}
valuesListBuilder.Grow(len(messages) * len([]byte("(?), ")))
args := make([]interface{}, len(messages))
for i := range messages {
if i == 0 {
valuesListBuilder.WriteString("(?)")
} else {
valuesListBuilder.WriteString(", (?)")
}
args[i] = messages[i]
}
query := fmt.Sprintf("INSERT INTO message (payload) VALUES %s", valuesListBuilder.String())
query = p.db.Rebind(query)
if _, err := p.db.Exec(query, args...); err != nil {
return fmt.Errorf("error INSERTING messages: %s", err)
}
log.Debug().Msg("successfully pushed messages onto queue")
return nil
}