Skip to content

Commit 166ea6a

Browse files
authored
Merge pull request #21 from marselester/observe-err
Update `Observe` in `Broker` interface to return an error
2 parents 1863dce + f47b946 commit 166ea6a

File tree

7 files changed

+63
-44
lines changed

7 files changed

+63
-44
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,12 @@ $ go run ./consumer/
109109
received a=fizz b=bazz
110110
```
111111

112-
To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
112+
To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument.
113+
113114
```sh
114115
$ python producer.py --protocol=1
115116
```
117+
116118
</details>
117119

118120
<details>
@@ -218,10 +220,12 @@ $ go run ./consumer/
218220
received a=fizz b=bazz
219221
```
220222

221-
To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
223+
To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument.
224+
222225
```sh
223226
$ python producer.py --protocol=1
224227
```
228+
225229
</details>
226230

227231
## Testing

celery.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Broker interface {
3434
Send(msg []byte, queue string) error
3535
// Observe sets the queues from which the tasks should be received.
3636
// Note, the method is not concurrency safe.
37-
Observe(queues []string)
37+
Observe(queues []string) error
3838
// Receive returns a raw message from one of the queues.
3939
// It blocks until there is a message available for consumption.
4040
// Note, the method is not concurrency safe.
@@ -156,7 +156,11 @@ func (a *App) Run(ctx context.Context) error {
156156
}
157157
}
158158

159-
a.conf.broker.Observe(qq)
159+
err := a.conf.broker.Observe(qq)
160+
if err != nil {
161+
return err
162+
}
163+
160164
level.Debug(a.conf.logger).Log("msg", "observing queues", "queues", qq)
161165

162166
// Tasks are processed concurrently only if there are multiple workers.

celery_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,13 @@ func TestGoredisProduceAndConsume100times(t *testing.T) {
245245
}
246246

247247
func TestRabbitmqProduceAndConsume100times(t *testing.T) {
248+
br, err := rabbitmq.NewBroker()
249+
if err != nil {
250+
t.Fatal(err)
251+
}
252+
248253
app := NewApp(
249-
WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))),
254+
WithBroker(br),
250255
WithLogger(log.NewJSONLogger(os.Stderr)),
251256
)
252257

goredis/broker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ func (br *Broker) Send(m []byte, q string) error {
7070

7171
// Observe sets the queues from which the tasks should be received.
7272
// Note, the method is not concurrency safe.
73-
func (br *Broker) Observe(queues []string) {
73+
func (br *Broker) Observe(queues []string) error {
7474
br.queues = queues
75+
return nil
7576
}
7677

7778
// Receive fetches a Celery task message from a tail of one of the queues in Redis.

rabbitmq/broker.go

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ package rabbitmq
55
import (
66
"encoding/base64"
77
"encoding/json"
8-
"fmt"
9-
"log"
108
"time"
119

1210
amqp "github.com/rabbitmq/amqp091-go"
@@ -61,7 +59,7 @@ func WithClient(c *amqp.Connection) BrokerOption {
6159

6260
// NewBroker creates a broker backed by RabbitMQ.
6361
// By default, it connects to localhost.
64-
func NewBroker(options ...BrokerOption) *Broker {
62+
func NewBroker(options ...BrokerOption) (*Broker, error) {
6563
br := Broker{
6664
amqpUri: DefaultAmqpUri,
6765
receiveTimeout: DefaultReceiveTimeout * time.Second,
@@ -75,32 +73,34 @@ func NewBroker(options ...BrokerOption) *Broker {
7573
if br.conn == nil {
7674
conn, err := amqp.Dial(br.amqpUri)
7775
if err != nil {
78-
log.Panicf("Failed to connect to RabbitMQ: %s", err)
79-
return nil
76+
return nil, err
8077
}
78+
8179
br.conn = conn
8280
}
8381

8482
channel, err := br.conn.Channel()
8583
if err != nil {
86-
log.Panicf("Failed to open a channel: %s", err)
87-
return nil
84+
return nil, err
8885
}
86+
8987
br.channel = channel
9088

91-
return &br
89+
return &br, nil
9290
}
9391

9492
// Send inserts the specified message at the head of the queue.
9593
// Note, the method is safe to call concurrently.
9694
func (br *Broker) Send(m []byte, q string) error {
97-
var headers map[string]interface{}
98-
var body []byte
99-
var contentType string
100-
var contentEncoding string
101-
var deliveryMode uint8
102-
var correlationId string
103-
var replyTo string
95+
var (
96+
headers map[string]interface{}
97+
body []byte
98+
contentType string
99+
contentEncoding string
100+
deliveryMode uint8
101+
correlationId string
102+
replyTo string
103+
)
104104

105105
if br.rawMode {
106106
headers = make(amqp.Table)
@@ -131,7 +131,7 @@ func (br *Broker) Send(m []byte, q string) error {
131131
replyTo = properties_in["reply_to"].(string)
132132
}
133133

134-
err := br.channel.Publish(
134+
return br.channel.Publish(
135135
"", // exchange
136136
q, // routing key
137137
false, // mandatory
@@ -145,21 +145,22 @@ func (br *Broker) Send(m []byte, q string) error {
145145
ReplyTo: replyTo,
146146
Body: body,
147147
})
148-
149-
return err
150148
}
151149

152150
// Observe sets the queues from which the tasks should be received.
153151
// Note, the method is not concurrency safe.
154-
func (br *Broker) Observe(queues []string) {
152+
func (br *Broker) Observe(queues []string) error {
155153
br.queues = queues
156-
for _, queue := range queues {
157-
durable := true
158-
autoDelete := false
159-
exclusive := false
160-
noWait := false
161154

155+
var (
156+
durable = true
157+
autoDelete = false
158+
exclusive = false
159+
noWait = false
160+
)
161+
for _, queue := range queues {
162162
// Check whether the queue exists.
163+
// If the queue doesn't exist, attempt to create it.
163164
_, err := br.channel.QueueDeclarePassive(
164165
queue,
165166
durable,
@@ -168,32 +169,33 @@ func (br *Broker) Observe(queues []string) {
168169
noWait,
169170
nil,
170171
)
171-
172-
// If the queue doesn't exist, attempt to create it.
173172
if err != nil {
174-
// QueueDeclarePassive() will close the channel if the queue does not exist, so we have to create a new channel when this happens.
173+
// QueueDeclarePassive() will close the channel if the queue does not exist,
174+
// so we have to create a new channel when this happens.
175175
if br.channel.IsClosed() {
176176
channel, err := br.conn.Channel()
177177
if err != nil {
178-
log.Panicf("Failed to open a channel: %s", err)
178+
return err
179179
}
180+
180181
br.channel = channel
181182
}
182183

183-
_, err := br.channel.QueueDeclare(
184+
_, err = br.channel.QueueDeclare(
184185
queue,
185186
durable,
186187
autoDelete,
187188
exclusive,
188189
noWait,
189190
nil,
190191
)
191-
192192
if err != nil {
193-
log.Panicf("Failed to declare a queue: %s", err)
193+
return err
194194
}
195195
}
196196
}
197+
198+
return nil
197199
}
198200

199201
// Receive fetches a Celery task message from a tail of one of the queues in RabbitMQ.
@@ -205,8 +207,8 @@ func (br *Broker) Receive() ([]byte, error) {
205207

206208
var err error
207209

208-
delivery, delivery_exists := br.delivery[queue]
209-
if !delivery_exists {
210+
delivery, deliveryExists := br.delivery[queue]
211+
if !deliveryExists {
210212
delivery, err = br.channel.Consume(
211213
queue, // queue
212214
"", // consumer
@@ -216,7 +218,6 @@ func (br *Broker) Receive() ([]byte, error) {
216218
false, // noWait
217219
nil, // args
218220
)
219-
220221
if err != nil {
221222
return nil, err
222223
}
@@ -254,10 +255,9 @@ func (br *Broker) Receive() ([]byte, error) {
254255
var result []byte
255256
result, err := json.Marshal(imsg)
256257
if err != nil {
257-
err_str := fmt.Errorf("%w", err)
258-
log.Printf("json encode: %s", err_str)
259258
return nil, err
260259
}
260+
261261
return result, nil
262262

263263
case <-time.After(br.receiveTimeout):

rabbitmq/broker_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ func TestReceive(t *testing.T) {
3434

3535
for name, tc := range tests {
3636
t.Run(name, func(t *testing.T) {
37-
br := NewBroker(WithReceiveTimeout(time.Second))
37+
br, err := NewBroker(WithReceiveTimeout(time.Second))
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
3842
br.rawMode = true
3943
br.Observe([]string{q})
4044

redis/broker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ func (br *Broker) Send(m []byte, q string) error {
8282

8383
// Observe sets the queues from which the tasks should be received.
8484
// Note, the method is not concurrency safe.
85-
func (br *Broker) Observe(queues []string) {
85+
func (br *Broker) Observe(queues []string) error {
8686
br.queues = queues
87+
return nil
8788
}
8889

8990
// Receive fetches a Celery task message from a tail of one of the queues in Redis.

0 commit comments

Comments
 (0)