Skip to content

Commit b4a3f9a

Browse files
committed
Update Observe interface to return an error
1 parent 1863dce commit b4a3f9a

File tree

7 files changed

+62
-40
lines changed

7 files changed

+62
-40
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,12 @@ $ go run ./consumer/
109109
received a=fizz b=bazz
110110
```
111111

112-
To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
112+
To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument.
113+
113114
```sh
114115
$ python producer.py --protocol=1
115116
```
117+
116118
</details>
117119

118120
<details>
@@ -218,10 +220,12 @@ $ go run ./consumer/
218220
received a=fizz b=bazz
219221
```
220222

221-
To send a task with Celery Protocol version 1, run *producer.py* with the *--protocol=1* command-line argument:
223+
To send a task with Celery Protocol version 1, run *producer.py* with the `--protocol=1` command-line argument.
224+
222225
```sh
223226
$ python producer.py --protocol=1
224227
```
228+
225229
</details>
226230

227231
## Testing

celery.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Broker interface {
3434
Send(msg []byte, queue string) error
3535
// Observe sets the queues from which the tasks should be received.
3636
// Note, the method is not concurrency safe.
37-
Observe(queues []string)
37+
Observe(queues []string) error
3838
// Receive returns a raw message from one of the queues.
3939
// It blocks until there is a message available for consumption.
4040
// Note, the method is not concurrency safe.
@@ -156,7 +156,11 @@ func (a *App) Run(ctx context.Context) error {
156156
}
157157
}
158158

159-
a.conf.broker.Observe(qq)
159+
err := a.conf.broker.Observe(qq)
160+
if err != nil {
161+
return err
162+
}
163+
160164
level.Debug(a.conf.logger).Log("msg", "observing queues", "queues", qq)
161165

162166
// Tasks are processed concurrently only if there are multiple workers.

celery_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,13 @@ func TestGoredisProduceAndConsume100times(t *testing.T) {
245245
}
246246

247247
func TestRabbitmqProduceAndConsume100times(t *testing.T) {
248+
br, err := rabbitmq.NewBroker()
249+
if err != nil {
250+
t.Fatal(err)
251+
}
252+
248253
app := NewApp(
249-
WithBroker(rabbitmq.NewBroker(rabbitmq.WithAmqpUri("amqp://guest:guest@localhost:5672/"))),
254+
WithBroker(br),
250255
WithLogger(log.NewJSONLogger(os.Stderr)),
251256
)
252257

goredis/broker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ func (br *Broker) Send(m []byte, q string) error {
7070

7171
// Observe sets the queues from which the tasks should be received.
7272
// Note, the method is not concurrency safe.
73-
func (br *Broker) Observe(queues []string) {
73+
func (br *Broker) Observe(queues []string) error {
7474
br.queues = queues
75+
return nil
7576
}
7677

7778
// Receive fetches a Celery task message from a tail of one of the queues in Redis.

rabbitmq/broker.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func WithClient(c *amqp.Connection) BrokerOption {
6161

6262
// NewBroker creates a broker backed by RabbitMQ.
6363
// By default, it connects to localhost.
64-
func NewBroker(options ...BrokerOption) *Broker {
64+
func NewBroker(options ...BrokerOption) (*Broker, error) {
6565
br := Broker{
6666
amqpUri: DefaultAmqpUri,
6767
receiveTimeout: DefaultReceiveTimeout * time.Second,
@@ -75,32 +75,34 @@ func NewBroker(options ...BrokerOption) *Broker {
7575
if br.conn == nil {
7676
conn, err := amqp.Dial(br.amqpUri)
7777
if err != nil {
78-
log.Panicf("Failed to connect to RabbitMQ: %s", err)
79-
return nil
78+
return nil, err
8079
}
80+
8181
br.conn = conn
8282
}
8383

8484
channel, err := br.conn.Channel()
8585
if err != nil {
86-
log.Panicf("Failed to open a channel: %s", err)
87-
return nil
86+
return nil, err
8887
}
88+
8989
br.channel = channel
9090

91-
return &br
91+
return &br, nil
9292
}
9393

9494
// Send inserts the specified message at the head of the queue.
9595
// Note, the method is safe to call concurrently.
9696
func (br *Broker) Send(m []byte, q string) error {
97-
var headers map[string]interface{}
98-
var body []byte
99-
var contentType string
100-
var contentEncoding string
101-
var deliveryMode uint8
102-
var correlationId string
103-
var replyTo string
97+
var (
98+
headers map[string]interface{}
99+
body []byte
100+
contentType string
101+
contentEncoding string
102+
deliveryMode uint8
103+
correlationId string
104+
replyTo string
105+
)
104106

105107
if br.rawMode {
106108
headers = make(amqp.Table)
@@ -131,7 +133,7 @@ func (br *Broker) Send(m []byte, q string) error {
131133
replyTo = properties_in["reply_to"].(string)
132134
}
133135

134-
err := br.channel.Publish(
136+
return br.channel.Publish(
135137
"", // exchange
136138
q, // routing key
137139
false, // mandatory
@@ -145,21 +147,22 @@ func (br *Broker) Send(m []byte, q string) error {
145147
ReplyTo: replyTo,
146148
Body: body,
147149
})
148-
149-
return err
150150
}
151151

152152
// Observe sets the queues from which the tasks should be received.
153153
// Note, the method is not concurrency safe.
154-
func (br *Broker) Observe(queues []string) {
154+
func (br *Broker) Observe(queues []string) error {
155155
br.queues = queues
156-
for _, queue := range queues {
157-
durable := true
158-
autoDelete := false
159-
exclusive := false
160-
noWait := false
161156

157+
var (
158+
durable = true
159+
autoDelete = false
160+
exclusive = false
161+
noWait = false
162+
)
163+
for _, queue := range queues {
162164
// Check whether the queue exists.
165+
// If the queue doesn't exist, attempt to create it.
163166
_, err := br.channel.QueueDeclarePassive(
164167
queue,
165168
durable,
@@ -168,32 +171,33 @@ func (br *Broker) Observe(queues []string) {
168171
noWait,
169172
nil,
170173
)
171-
172-
// If the queue doesn't exist, attempt to create it.
173174
if err != nil {
174-
// QueueDeclarePassive() will close the channel if the queue does not exist, so we have to create a new channel when this happens.
175+
// QueueDeclarePassive() will close the channel if the queue does not exist,
176+
// so we have to create a new channel when this happens.
175177
if br.channel.IsClosed() {
176178
channel, err := br.conn.Channel()
177179
if err != nil {
178-
log.Panicf("Failed to open a channel: %s", err)
180+
return err
179181
}
182+
180183
br.channel = channel
181184
}
182185

183-
_, err := br.channel.QueueDeclare(
186+
_, err = br.channel.QueueDeclare(
184187
queue,
185188
durable,
186189
autoDelete,
187190
exclusive,
188191
noWait,
189192
nil,
190193
)
191-
192194
if err != nil {
193-
log.Panicf("Failed to declare a queue: %s", err)
195+
return err
194196
}
195197
}
196198
}
199+
200+
return nil
197201
}
198202

199203
// Receive fetches a Celery task message from a tail of one of the queues in RabbitMQ.
@@ -205,8 +209,8 @@ func (br *Broker) Receive() ([]byte, error) {
205209

206210
var err error
207211

208-
delivery, delivery_exists := br.delivery[queue]
209-
if !delivery_exists {
212+
delivery, deliveryExists := br.delivery[queue]
213+
if !deliveryExists {
210214
delivery, err = br.channel.Consume(
211215
queue, // queue
212216
"", // consumer
@@ -216,7 +220,6 @@ func (br *Broker) Receive() ([]byte, error) {
216220
false, // noWait
217221
nil, // args
218222
)
219-
220223
if err != nil {
221224
return nil, err
222225
}

rabbitmq/broker_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ func TestReceive(t *testing.T) {
3434

3535
for name, tc := range tests {
3636
t.Run(name, func(t *testing.T) {
37-
br := NewBroker(WithReceiveTimeout(time.Second))
37+
br, err := NewBroker(WithReceiveTimeout(time.Second))
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
3842
br.rawMode = true
3943
br.Observe([]string{q})
4044

redis/broker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ func (br *Broker) Send(m []byte, q string) error {
8282

8383
// Observe sets the queues from which the tasks should be received.
8484
// Note, the method is not concurrency safe.
85-
func (br *Broker) Observe(queues []string) {
85+
func (br *Broker) Observe(queues []string) error {
8686
br.queues = queues
87+
return nil
8788
}
8889

8990
// Receive fetches a Celery task message from a tail of one of the queues in Redis.

0 commit comments

Comments
 (0)