-
Notifications
You must be signed in to change notification settings - Fork 23
/
kafka.go
153 lines (129 loc) · 3.18 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// THIS IS AN EXPERIMENTAL DRIVER, PLEASE USE WITH CAUTION
package kafka
import (
"context"
"log"
"sync"
"time"
"io/ioutil"
"github.com/gofrs/uuid"
"github.com/jpillora/backoff"
"github.com/lileio/pubsub/v2"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
)
var (
mutex = &sync.Mutex{}
_ pubsub.Provider = &Provider{}
)
// Provider is a Kafka based pubsub provider
type Provider struct {
writers map[string]*kafka.Writer
Brokers []string
Balancer kafka.Balancer
Reader *kafka.Reader
}
func (p *Provider) Shutdown() {
p.Reader.Close()
}
// Publish publishes a message to Kafka with a uuid as the key
func (p *Provider) Publish(ctx context.Context, topic string, m *pubsub.Msg) error {
w, err := p.writerForTopic(ctx, topic)
if err != nil {
return err
}
u1, err := uuid.NewV1()
if err != nil {
return err
}
return w.WriteMessages(ctx, kafka.Message{
Key: u1.Bytes(),
Value: m.Data,
})
}
// Subscribe implements Subscribe
func (p *Provider) Subscribe(opts pubsub.HandlerOptions, h pubsub.MsgHandler) {
logrus.Infof("Subscribing to %s successWriter/name %s", opts.Topic, opts.ServiceName+"."+opts.Name)
logSuccess := logrus.WithField("pubsub", "kafka")
successWriter := logSuccess.Writer()
p.Reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: p.Brokers,
GroupID: opts.ServiceName + "." + opts.Name,
Topic: opts.Topic,
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second,
MaxWait: 50 * time.Millisecond,
Logger: log.New(successWriter, "", log.Lshortfile),
ErrorLogger: log.New(ioutil.Discard, "", log.Lshortfile),
})
b := &backoff.Backoff{
Min: 200 * time.Millisecond,
Max: 600 * time.Second,
Factor: 2,
Jitter: true,
}
go func() {
for {
ctx := context.Background()
m, err := p.Reader.FetchMessage(ctx)
if err != nil {
d := b.Duration()
logrus.Errorf(
"Subscription receive to topic %s failed, reconnecting in %v. Err: %v",
opts.Topic, d, err,
)
time.Sleep(d)
}
b.Reset()
msg := pubsub.Msg{
ID: string(m.Key),
Data: m.Value,
Ack: func() {
p.Reader.CommitMessages(ctx, m)
},
Nack: func() {},
}
err = h(ctx, msg)
if err != nil {
break
}
if opts.AutoAck {
msg.Ack()
}
logrus.Debugf("message at topic/partition/offset %v/%v/%v\n",
m.Topic, m.Partition, m.Offset)
}
successWriter.Close()
}()
}
func (p *Provider) writerForTopic(ctx context.Context, topic string) (*kafka.Writer, error) {
mutex.Lock()
defer mutex.Unlock()
if p.writers == nil {
p.writers = map[string]*kafka.Writer{}
}
if p.writers[topic] != nil {
return p.writers[topic], nil
}
if len(p.Brokers) > 0 {
c, err := kafka.DefaultDialer.Dial("tcp", p.Brokers[0])
if err != nil {
return nil, err
}
logrus.Debugf("Creating Topic %s in Kafka", topic)
err = c.CreateTopics(kafka.TopicConfig{
Topic: topic,
})
if err != nil {
logrus.Errorf("Error creating Topic %s in Kafka, err %s", topic, err.Error())
return nil, err
}
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: p.Brokers,
Topic: topic,
Balancer: p.Balancer,
})
p.writers[topic] = w
return w, nil
}