Skip to content

Commit

Permalink
add auto reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
binlaniua committed May 14, 2022
1 parent 2b358c5 commit 4ce2c8e
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 2 deletions.
2 changes: 1 addition & 1 deletion consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
case <-subs.closed:
// closed before drained, drop in-flight
return
case out <- delivery:
case out <- *delivery:
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/rabbitmq/amqp091-go
module github.com/binlaniua/amqp091-go

go 1.16

Expand Down
20 changes: 20 additions & 0 deletions rabbitmq/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package rabbitmq

import "log"

// Debug debug log flag
var Debug bool

func debug(args ...interface{}) {
if !Debug {
return
}
log.Print(args...)
}

func debugf(format string, args ...interface{}) {
if !Debug {
return
}
log.Printf(format, args...)
}
150 changes: 150 additions & 0 deletions rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package rabbitmq

import (
"sync/atomic"
"time"

amqp "github.com/binlaniua/amqp091-go"
)

const delay = 3 // reconnect after delay seconds

// Connection amqp.Connection wrapper
type Connection struct {
*amqp.Connection
}

// Channel wrap amqp.Connection.Channel, get a auto reconnect channel
func (c *Connection) Channel() (*Channel, error) {
ch, err := c.Connection.Channel()
if err != nil {
return nil, err
}

channel := &Channel{
Channel: ch,
}

go func() {
for {
reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok || channel.IsClosed() {
debug("channel closed")
channel.Close() // close again, ensure closed flag set when connection closed
break
}
debug("channel closed, reason: %v", reason)

// reconnect if not closed by developer
for {
// wait 1s for connection reconnect
time.Sleep(delay * time.Second)

ch, err := c.Connection.Channel()
if err == nil {
debug("channel recreate success")
channel.Channel = ch
break
}

debugf("channel recreate failed, err: %v", err)
}
}

}()

return channel, nil
}

// Dial wrap amqp.Dial, dial and get a reconnect connection
func Dial(url string) (*Connection, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}

connection := &Connection{
Connection: conn,
}

go func() {
for {
reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok {
debug("connection closed")
break
}
debugf("connection closed, reason: %v", reason)

// reconnect if not closed by developer
for {
// wait 1s for reconnect
time.Sleep(delay * time.Second)

conn, err := amqp.Dial(url)
if err == nil {
connection.Connection = conn
debugf("reconnect success")
break
}

debugf("reconnect failed, err: %v", err)
}
}
}()

return connection, nil
}

// Channel amqp.Channel wapper
type Channel struct {
*amqp.Channel
closed int32
}

// IsClosed indicate closed by developer
func (ch *Channel) IsClosed() bool {
return (atomic.LoadInt32(&ch.closed) == 1)
}

// Close ensure closed flag set
func (ch *Channel) Close() error {
if ch.IsClosed() {
return amqp.ErrClosed
}

atomic.StoreInt32(&ch.closed, 1)

return ch.Channel.Close()
}

// Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
deliveries := make(chan amqp.Delivery)

go func() {
for {
d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
if err != nil {
debugf("consume failed, err: %v", err)
time.Sleep(delay * time.Second)
continue
}

for msg := range d {
deliveries <- msg
}

// sleep before IsClose call. closed flag may not set before sleep.
time.Sleep(delay * time.Second)

if ch.IsClosed() {
break
}
}
}()

return deliveries, nil
}

0 comments on commit 4ce2c8e

Please sign in to comment.