Skip to content

Commit

Permalink
51 release v2 (#52)
Browse files Browse the repository at this point in the history
* Removed MQ interface from the library API.
* Updated readme with difference between v1 and v2
  • Loading branch information
cheshir authored Oct 2, 2022
1 parent 738decb commit c89deb6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 64 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ Check [releases page](https://github.com/cheshir/go-mq/releases).

## How to upgrade

### From v1 to v2

* `New()` returns `*MessageQueue` not the interface.

* Minimal go version updated to the 1.16.

### From version 0.x to 1.x

* `GetConsumer()` method was renamed to `Consumer()`. This is done to follow go guideline.
Expand Down
89 changes: 37 additions & 52 deletions mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,9 @@ type conn interface {
NotifyClose(chan wabbit.Error) chan wabbit.Error
}

// MQ describes methods provided by message broker adapter.
type MQ interface {
// Consumer returns consumer object by its name.
Consumer(name string) (Consumer, error)
// SetConsumerHandler allows you to set handler callback without getting consumer.
SetConsumerHandler(name string, handler ConsumerHandler) error
// AsyncProducer returns async producer. Should be used in most cases.
AsyncProducer(name string) (AsyncProducer, error)
// SyncProducer returns sync producer.
SyncProducer(name string) (SyncProducer, error)
// Error returns channel with all occurred errors.
// Errors from sync producer won't be accessible.
Error() <-chan error
// Close stop all consumers and producers and close connection to broker.
Close()
// Shows connection state
ConnectionState() ConnectionState
}

type ConnectionState uint8

type mq struct {
type MessageQueue struct {
channel wabbit.Channel
config Config
connection conn
Expand All @@ -74,10 +55,10 @@ type mq struct {
// New initializes AMQP connection to the message broker
// and returns adapter that provides an ability
// to get configured consumers and producers, read occurred errors and shutdown all workers.
func New(config Config) (MQ, error) {
func New(config Config) (*MessageQueue, error) {
config.normalize()

mq := &mq{
mq := &MessageQueue{
config: config,
errorChannel: make(chan error),
internalErrorChannel: make(chan error),
Expand All @@ -96,9 +77,10 @@ func New(config Config) (MQ, error) {
return mq, mq.initialSetup()
}

// Set handler for consumer by its name. Returns false if consumer wasn't found.
// SetConsumerHandler allows you to set handler callback without getting consumer.
// Returns false if consumer wasn't found.
// Can be called once for each consumer.
func (mq *mq) SetConsumerHandler(name string, handler ConsumerHandler) error {
func (mq *MessageQueue) SetConsumerHandler(name string, handler ConsumerHandler) error {
consumer, err := mq.Consumer(name)
if err != nil {
return err
Expand All @@ -110,7 +92,7 @@ func (mq *mq) SetConsumerHandler(name string, handler ConsumerHandler) error {
}

// Consumer returns a consumer by its name or error if consumer wasn't found.
func (mq *mq) Consumer(name string) (Consumer, error) {
func (mq *MessageQueue) Consumer(name string) (Consumer, error) {
consumer, ok := mq.consumers.Get(name)
if !ok {
err := fmt.Errorf("consumer '%s' is not registered. Check your configuration", name)
Expand All @@ -122,7 +104,8 @@ func (mq *mq) Consumer(name string) (Consumer, error) {
}

// AsyncProducer returns an async producer by its name or error if producer wasn't found.
func (mq *mq) AsyncProducer(name string) (AsyncProducer, error) {
// Should be used in most cases.
func (mq *MessageQueue) AsyncProducer(name string) (AsyncProducer, error) {
item, exists := mq.producers.Get(name)
producer, asserted := item.(*asyncProducer)

Expand All @@ -134,7 +117,7 @@ func (mq *mq) AsyncProducer(name string) (AsyncProducer, error) {
}

// SyncProducer returns a sync producer by its name or error if producer wasn't found.
func (mq *mq) SyncProducer(name string) (SyncProducer, error) {
func (mq *MessageQueue) SyncProducer(name string) (SyncProducer, error) {
item, exists := mq.producers.Get(name)
producer, asserted := item.(*syncProducer)

Expand All @@ -145,13 +128,14 @@ func (mq *mq) SyncProducer(name string) (SyncProducer, error) {
return producer, nil
}

// Error provides an ability to access occurring errors.
func (mq *mq) Error() <-chan error {
// Error returns channel with all occurred errors.
// Errors from sync producer won't be accessible. Get them directly from producer.
func (mq *MessageQueue) Error() <-chan error {
return mq.errorChannel
}

// Shutdown all workers and close connection to the message broker.
func (mq *mq) Close() {
// Close stops all consumers and producers and closes connections to the broker.
func (mq *MessageQueue) Close() {
mq.stopProducersAndConsumers()

if mq.channel != nil {
Expand All @@ -163,11 +147,12 @@ func (mq *mq) Close() {
}
}

func (mq *mq) ConnectionState() ConnectionState {
// ConnectionState shows connection state.
func (mq *MessageQueue) ConnectionState() ConnectionState {
return ConnectionState(atomic.LoadInt32(mq.state))
}

func (mq *mq) connect() error {
func (mq *MessageQueue) connect() error {
atomic.StoreInt32(mq.state, int32(ConnectionStateConnecting))
connection, err := mq.createConnection()
if err != nil {
Expand All @@ -192,7 +177,7 @@ func (mq *mq) connect() error {
return nil
}

func (mq *mq) createConnection() (conn, error) {
func (mq *MessageQueue) createConnection() (conn, error) {
mq.cluster.Do(func() { atomic.StoreInt32(&mq.cluster.currentNode, -1) })
atomic.AddInt32(&mq.cluster.currentNode, 1)
if int(mq.cluster.currentNode) >= len(mq.config.dsnList) {
Expand All @@ -209,15 +194,15 @@ func (mq *mq) createConnection() (conn, error) {

// Register close handler.
// To get more details visit https://godoc.org/github.com/streadway/amqp#Connection.NotifyClose.
func (mq *mq) handleCloseEvent() {
func (mq *MessageQueue) handleCloseEvent() {
err := <-mq.connection.NotifyClose(make(chan wabbit.Error))
if err != nil {
mq.internalErrorChannel <- err
}
atomic.StoreInt32(mq.state, int32(ConnectionStateDisconnected))
}

func (mq *mq) errorHandler() {
func (mq *MessageQueue) errorHandler() {
for err := range mq.internalErrorChannel {
select {
case mq.errorChannel <- err: // Proxies errors to the user.
Expand All @@ -228,7 +213,7 @@ func (mq *mq) errorHandler() {
}
}

func (mq *mq) processError(err error) {
func (mq *MessageQueue) processError(err error) {
switch err.(type) {
case *net.OpError:
go mq.reconnect()
Expand All @@ -248,7 +233,7 @@ func (mq *mq) processError(err error) {
}
}

func (mq *mq) initialSetup() error {
func (mq *MessageQueue) initialSetup() error {
if err := mq.setupExchanges(); err != nil {
return err
}
Expand All @@ -265,7 +250,7 @@ func (mq *mq) initialSetup() error {
}

// Called after each reconnect to recreate non-durable queues and exchanges.
func (mq *mq) setupAfterReconnect() error {
func (mq *MessageQueue) setupAfterReconnect() error {
if err := mq.setupExchanges(); err != nil {
return err
}
Expand All @@ -289,7 +274,7 @@ func (mq *mq) setupAfterReconnect() error {
return nil
}

func (mq *mq) setupExchanges() error {
func (mq *MessageQueue) setupExchanges() error {
for _, config := range mq.config.Exchanges {
if err := mq.declareExchange(config); err != nil {
return err
Expand All @@ -299,11 +284,11 @@ func (mq *mq) setupExchanges() error {
return nil
}

func (mq *mq) declareExchange(config ExchangeConfig) error {
func (mq *MessageQueue) declareExchange(config ExchangeConfig) error {
return mq.channel.ExchangeDeclare(config.Name, config.Type, wabbit.Option(config.Options))
}

func (mq *mq) setupQueues() error {
func (mq *MessageQueue) setupQueues() error {
for _, config := range mq.config.Queues {
if err := mq.declareQueue(config); err != nil {
return err
Expand All @@ -313,15 +298,15 @@ func (mq *mq) setupQueues() error {
return nil
}

func (mq *mq) declareQueue(config QueueConfig) error {
func (mq *MessageQueue) declareQueue(config QueueConfig) error {
if _, err := mq.channel.QueueDeclare(config.Name, wabbit.Option(config.Options)); err != nil {
return err
}

return mq.channel.QueueBind(config.Name, config.RoutingKey, config.Exchange, wabbit.Option(config.BindingOptions))
}

func (mq *mq) setupProducers() error {
func (mq *MessageQueue) setupProducers() error {
for _, config := range mq.config.Producers {
if err := mq.registerProducer(config); err != nil {
return err
Expand All @@ -331,7 +316,7 @@ func (mq *mq) setupProducers() error {
return nil
}

func (mq *mq) registerProducer(config ProducerConfig) error {
func (mq *MessageQueue) registerProducer(config ProducerConfig) error {
if _, ok := mq.producers.Get(config.Name); ok {
return fmt.Errorf(`producer with name "%s" is already registered`, config.Name)
}
Expand All @@ -348,7 +333,7 @@ func (mq *mq) registerProducer(config ProducerConfig) error {
return nil
}

func (mq *mq) reconnectProducer(producer internalProducer) error {
func (mq *MessageQueue) reconnectProducer(producer internalProducer) error {
channel, err := mq.connection.Channel()
if err != nil {
return err
Expand All @@ -360,7 +345,7 @@ func (mq *mq) reconnectProducer(producer internalProducer) error {
return nil
}

func (mq *mq) setupConsumers() error {
func (mq *MessageQueue) setupConsumers() error {
for _, config := range mq.config.Consumers {
if err := mq.registerConsumer(config); err != nil {
return err
Expand All @@ -370,7 +355,7 @@ func (mq *mq) setupConsumers() error {
return nil
}

func (mq *mq) registerConsumer(config ConsumerConfig) error {
func (mq *MessageQueue) registerConsumer(config ConsumerConfig) error {
if _, ok := mq.consumers.Get(config.Name); ok {
return fmt.Errorf(`consumer with name "%s" is already registered`, config.Name)
}
Expand Down Expand Up @@ -399,7 +384,7 @@ func (mq *mq) registerConsumer(config ConsumerConfig) error {
return nil
}

func (mq *mq) reconnectConsumer(consumer *consumer) error {
func (mq *MessageQueue) reconnectConsumer(consumer *consumer) error {
for _, worker := range consumer.workers {
if err := mq.initializeConsumersWorker(consumer, worker); err != nil {
return err
Expand All @@ -411,7 +396,7 @@ func (mq *mq) reconnectConsumer(consumer *consumer) error {
return nil
}

func (mq *mq) initializeConsumersWorker(consumer *consumer, worker *worker) error {
func (mq *MessageQueue) initializeConsumersWorker(consumer *consumer, worker *worker) error {
channel, err := mq.connection.Channel()
if err != nil {
return err
Expand All @@ -434,7 +419,7 @@ func (mq *mq) initializeConsumersWorker(consumer *consumer, worker *worker) erro

// Reconnect stops current producers and consumers,
// recreates connection to the rabbit and than runs producers and consumers.
func (mq *mq) reconnect() {
func (mq *MessageQueue) reconnect() {
startedReconnect := atomic.CompareAndSwapInt32(&mq.reconnectStatus, statusReadyForReconnect, statusReconnecting)
// There is no need to start a new reconnect if the previous one is not finished yet.
if !startedReconnect {
Expand All @@ -460,7 +445,7 @@ func (mq *mq) reconnect() {
}
}

func (mq *mq) stopProducersAndConsumers() {
func (mq *MessageQueue) stopProducersAndConsumers() {
mq.producers.GoEach(func(producer internalProducer) {
producer.Stop()
})
Expand Down
Loading

0 comments on commit c89deb6

Please sign in to comment.