This repository has been archived by the owner on May 18, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscription.go
104 lines (90 loc) · 2.54 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package rabbitmq
import (
"log"
"strconv"
"time"
"github.com/smartystreets/messaging/v2"
"github.com/streadway/amqp"
)
type Subscription struct {
channel Consumer
queue string
consumer string
bindings []string
deliveryCount uint64
latestTag uint64
control chan<- interface{}
output chan<- messaging.Delivery
}
func newSubscription(
channel Consumer, queue string, bindings []string,
control chan<- interface{}, output chan<- messaging.Delivery,
) *Subscription {
return &Subscription{
channel: channel,
queue: queue,
consumer: strconv.FormatInt(time.Now().UTC().UnixNano(), 10),
bindings: bindings,
control: control,
output: output,
}
}
func (this *Subscription) Listen() {
input := this.open()
this.listen(input)
this.control <- subscriptionClosed{
DeliveryCount: this.deliveryCount,
LatestDeliveryTag: this.latestTag,
LatestConsumer: this.channel,
}
}
func (this *Subscription) listen(input <-chan amqp.Delivery) {
if input == nil {
return
}
for item := range input {
this.deliveryCount++
this.latestTag = item.DeliveryTag
this.output <- fromAMQPDelivery(item, this.channel)
}
}
func (this *Subscription) open() <-chan amqp.Delivery {
_ = this.channel.ConfigureChannelBuffer(cap(this.output))
queue, _ := this.declareQueue(this.queue)
this.bind(queue)
if len(this.queue) > 0 {
return this.consume()
}
this.queue = queue
return this.exclusiveConsume()
}
func (this *Subscription) declareQueue(name string) (string, error) {
if len(name) == 0 {
return this.channel.DeclareTransientQueue()
} else if err := this.channel.DeclareQueue(name); err != nil {
log.Printf("[ERROR] Unable to declare queue [%s]: %s", name, err)
return "", err
}
return name, nil
}
func (this *Subscription) bind(name string) {
for _, exchange := range this.bindings {
if err := this.channel.DeclareExchange(exchange, "fanout"); err != nil {
log.Printf("[ERROR] Unable to create [%s] exchange [%s]: %s", "fanout", exchange, err)
}
if err := this.channel.BindExchangeToQueue(name, exchange); err != nil {
log.Printf("[ERROR] Unable to bind exchange [%s] to queue [%s]: %s", exchange, name, err)
}
}
}
func (this *Subscription) consume() <-chan amqp.Delivery {
queue, _ := this.channel.Consume(this.queue, this.consumer)
return queue
}
func (this *Subscription) exclusiveConsume() <-chan amqp.Delivery {
queue, _ := this.channel.ExclusiveConsume(this.queue, this.consumer)
return queue
}
func (this *Subscription) Close() {
_ = this.channel.CancelConsumer(this.consumer)
}