-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
69 lines (64 loc) · 1.84 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package rabbitmq
import (
"context"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Publisher struct {
Channel *amqp.Channel
ExchangeName string
Key string
ContentType string
}
func NewPublisher(channel *amqp.Channel, exchangeName string, key string, contentType string) (*Publisher, error) {
if len(key) == 0 {
key = "info"
}
if len(contentType) == 0 {
contentType = "text/plain"
}
return &Publisher{Channel: channel, ExchangeName: exchangeName, Key: key, ContentType: contentType}, nil
}
func NewPublisherByConfig(config PublisherConfig) (*Publisher, error) {
channel, er1 := NewChannel(config.Url)
if er1 != nil {
return nil, er1
}
er2 := channel.ExchangeDeclare(config.ExchangeName, config.ExchangeKind, true, config.AutoDelete, false, false, nil)
if er2 != nil {
return nil, er2
}
return NewPublisher(channel, config.ExchangeName, config.Key, config.ContentType)
}
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error {
opts := MapToTable(attributes)
msg := amqp.Publishing{
Headers: opts,
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: p.ContentType,
Body: data,
}
return p.Channel.Publish(p.ExchangeName, p.Key, false, false, msg)
}
func (p *Publisher) PublishBody(ctx context.Context, data []byte) error {
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: p.ContentType,
Body: data,
}
return p.Channel.Publish(p.ExchangeName, p.Key, false, false, msg)
}
func (p *Publisher) PublishMessage(msg amqp.Publishing) error {
return p.Channel.Publish(p.ExchangeName, p.Key, false, false, msg)
}
func MapToTable(attributes map[string]string) amqp.Table {
opts := amqp.Table{}
if attributes != nil {
for k, v := range attributes {
opts[k] = v
}
}
return opts
}