diff --git a/README.md b/README.md index 8d854ba..21b057d 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,12 @@ Check [releases page](https://github.com/cheshir/go-mq/releases). ## How to upgrade +### From v1 to v2 + +* `New()` returns `*MessageQueue` not the interface. + +* Minimal go version updated to the 1.16. + ### From version 0.x to 1.x * `GetConsumer()` method was renamed to `Consumer()`. This is done to follow go guideline. diff --git a/mq.go b/mq.go index f682048..4ff67fa 100644 --- a/mq.go +++ b/mq.go @@ -34,28 +34,9 @@ type conn interface { NotifyClose(chan wabbit.Error) chan wabbit.Error } -// MQ describes methods provided by message broker adapter. -type MQ interface { - // Consumer returns consumer object by its name. - Consumer(name string) (Consumer, error) - // SetConsumerHandler allows you to set handler callback without getting consumer. - SetConsumerHandler(name string, handler ConsumerHandler) error - // AsyncProducer returns async producer. Should be used in most cases. - AsyncProducer(name string) (AsyncProducer, error) - // SyncProducer returns sync producer. - SyncProducer(name string) (SyncProducer, error) - // Error returns channel with all occurred errors. - // Errors from sync producer won't be accessible. - Error() <-chan error - // Close stop all consumers and producers and close connection to broker. - Close() - // Shows connection state - ConnectionState() ConnectionState -} - type ConnectionState uint8 -type mq struct { +type MessageQueue struct { channel wabbit.Channel config Config connection conn @@ -74,10 +55,10 @@ type mq struct { // New initializes AMQP connection to the message broker // and returns adapter that provides an ability // to get configured consumers and producers, read occurred errors and shutdown all workers. -func New(config Config) (MQ, error) { +func New(config Config) (*MessageQueue, error) { config.normalize() - mq := &mq{ + mq := &MessageQueue{ config: config, errorChannel: make(chan error), internalErrorChannel: make(chan error), @@ -96,9 +77,10 @@ func New(config Config) (MQ, error) { return mq, mq.initialSetup() } -// Set handler for consumer by its name. Returns false if consumer wasn't found. +// SetConsumerHandler allows you to set handler callback without getting consumer. +// Returns false if consumer wasn't found. // Can be called once for each consumer. -func (mq *mq) SetConsumerHandler(name string, handler ConsumerHandler) error { +func (mq *MessageQueue) SetConsumerHandler(name string, handler ConsumerHandler) error { consumer, err := mq.Consumer(name) if err != nil { return err @@ -110,7 +92,7 @@ func (mq *mq) SetConsumerHandler(name string, handler ConsumerHandler) error { } // Consumer returns a consumer by its name or error if consumer wasn't found. -func (mq *mq) Consumer(name string) (Consumer, error) { +func (mq *MessageQueue) Consumer(name string) (Consumer, error) { consumer, ok := mq.consumers.Get(name) if !ok { err := fmt.Errorf("consumer '%s' is not registered. Check your configuration", name) @@ -122,7 +104,8 @@ func (mq *mq) Consumer(name string) (Consumer, error) { } // AsyncProducer returns an async producer by its name or error if producer wasn't found. -func (mq *mq) AsyncProducer(name string) (AsyncProducer, error) { +// Should be used in most cases. +func (mq *MessageQueue) AsyncProducer(name string) (AsyncProducer, error) { item, exists := mq.producers.Get(name) producer, asserted := item.(*asyncProducer) @@ -134,7 +117,7 @@ func (mq *mq) AsyncProducer(name string) (AsyncProducer, error) { } // SyncProducer returns a sync producer by its name or error if producer wasn't found. -func (mq *mq) SyncProducer(name string) (SyncProducer, error) { +func (mq *MessageQueue) SyncProducer(name string) (SyncProducer, error) { item, exists := mq.producers.Get(name) producer, asserted := item.(*syncProducer) @@ -145,13 +128,14 @@ func (mq *mq) SyncProducer(name string) (SyncProducer, error) { return producer, nil } -// Error provides an ability to access occurring errors. -func (mq *mq) Error() <-chan error { +// Error returns channel with all occurred errors. +// Errors from sync producer won't be accessible. Get them directly from producer. +func (mq *MessageQueue) Error() <-chan error { return mq.errorChannel } -// Shutdown all workers and close connection to the message broker. -func (mq *mq) Close() { +// Close stops all consumers and producers and closes connections to the broker. +func (mq *MessageQueue) Close() { mq.stopProducersAndConsumers() if mq.channel != nil { @@ -163,11 +147,12 @@ func (mq *mq) Close() { } } -func (mq *mq) ConnectionState() ConnectionState { +// ConnectionState shows connection state. +func (mq *MessageQueue) ConnectionState() ConnectionState { return ConnectionState(atomic.LoadInt32(mq.state)) } -func (mq *mq) connect() error { +func (mq *MessageQueue) connect() error { atomic.StoreInt32(mq.state, int32(ConnectionStateConnecting)) connection, err := mq.createConnection() if err != nil { @@ -192,7 +177,7 @@ func (mq *mq) connect() error { return nil } -func (mq *mq) createConnection() (conn, error) { +func (mq *MessageQueue) createConnection() (conn, error) { mq.cluster.Do(func() { atomic.StoreInt32(&mq.cluster.currentNode, -1) }) atomic.AddInt32(&mq.cluster.currentNode, 1) if int(mq.cluster.currentNode) >= len(mq.config.dsnList) { @@ -209,7 +194,7 @@ func (mq *mq) createConnection() (conn, error) { // Register close handler. // To get more details visit https://godoc.org/github.com/streadway/amqp#Connection.NotifyClose. -func (mq *mq) handleCloseEvent() { +func (mq *MessageQueue) handleCloseEvent() { err := <-mq.connection.NotifyClose(make(chan wabbit.Error)) if err != nil { mq.internalErrorChannel <- err @@ -217,7 +202,7 @@ func (mq *mq) handleCloseEvent() { atomic.StoreInt32(mq.state, int32(ConnectionStateDisconnected)) } -func (mq *mq) errorHandler() { +func (mq *MessageQueue) errorHandler() { for err := range mq.internalErrorChannel { select { case mq.errorChannel <- err: // Proxies errors to the user. @@ -228,7 +213,7 @@ func (mq *mq) errorHandler() { } } -func (mq *mq) processError(err error) { +func (mq *MessageQueue) processError(err error) { switch err.(type) { case *net.OpError: go mq.reconnect() @@ -248,7 +233,7 @@ func (mq *mq) processError(err error) { } } -func (mq *mq) initialSetup() error { +func (mq *MessageQueue) initialSetup() error { if err := mq.setupExchanges(); err != nil { return err } @@ -265,7 +250,7 @@ func (mq *mq) initialSetup() error { } // Called after each reconnect to recreate non-durable queues and exchanges. -func (mq *mq) setupAfterReconnect() error { +func (mq *MessageQueue) setupAfterReconnect() error { if err := mq.setupExchanges(); err != nil { return err } @@ -289,7 +274,7 @@ func (mq *mq) setupAfterReconnect() error { return nil } -func (mq *mq) setupExchanges() error { +func (mq *MessageQueue) setupExchanges() error { for _, config := range mq.config.Exchanges { if err := mq.declareExchange(config); err != nil { return err @@ -299,11 +284,11 @@ func (mq *mq) setupExchanges() error { return nil } -func (mq *mq) declareExchange(config ExchangeConfig) error { +func (mq *MessageQueue) declareExchange(config ExchangeConfig) error { return mq.channel.ExchangeDeclare(config.Name, config.Type, wabbit.Option(config.Options)) } -func (mq *mq) setupQueues() error { +func (mq *MessageQueue) setupQueues() error { for _, config := range mq.config.Queues { if err := mq.declareQueue(config); err != nil { return err @@ -313,7 +298,7 @@ func (mq *mq) setupQueues() error { return nil } -func (mq *mq) declareQueue(config QueueConfig) error { +func (mq *MessageQueue) declareQueue(config QueueConfig) error { if _, err := mq.channel.QueueDeclare(config.Name, wabbit.Option(config.Options)); err != nil { return err } @@ -321,7 +306,7 @@ func (mq *mq) declareQueue(config QueueConfig) error { return mq.channel.QueueBind(config.Name, config.RoutingKey, config.Exchange, wabbit.Option(config.BindingOptions)) } -func (mq *mq) setupProducers() error { +func (mq *MessageQueue) setupProducers() error { for _, config := range mq.config.Producers { if err := mq.registerProducer(config); err != nil { return err @@ -331,7 +316,7 @@ func (mq *mq) setupProducers() error { return nil } -func (mq *mq) registerProducer(config ProducerConfig) error { +func (mq *MessageQueue) registerProducer(config ProducerConfig) error { if _, ok := mq.producers.Get(config.Name); ok { return fmt.Errorf(`producer with name "%s" is already registered`, config.Name) } @@ -348,7 +333,7 @@ func (mq *mq) registerProducer(config ProducerConfig) error { return nil } -func (mq *mq) reconnectProducer(producer internalProducer) error { +func (mq *MessageQueue) reconnectProducer(producer internalProducer) error { channel, err := mq.connection.Channel() if err != nil { return err @@ -360,7 +345,7 @@ func (mq *mq) reconnectProducer(producer internalProducer) error { return nil } -func (mq *mq) setupConsumers() error { +func (mq *MessageQueue) setupConsumers() error { for _, config := range mq.config.Consumers { if err := mq.registerConsumer(config); err != nil { return err @@ -370,7 +355,7 @@ func (mq *mq) setupConsumers() error { return nil } -func (mq *mq) registerConsumer(config ConsumerConfig) error { +func (mq *MessageQueue) registerConsumer(config ConsumerConfig) error { if _, ok := mq.consumers.Get(config.Name); ok { return fmt.Errorf(`consumer with name "%s" is already registered`, config.Name) } @@ -399,7 +384,7 @@ func (mq *mq) registerConsumer(config ConsumerConfig) error { return nil } -func (mq *mq) reconnectConsumer(consumer *consumer) error { +func (mq *MessageQueue) reconnectConsumer(consumer *consumer) error { for _, worker := range consumer.workers { if err := mq.initializeConsumersWorker(consumer, worker); err != nil { return err @@ -411,7 +396,7 @@ func (mq *mq) reconnectConsumer(consumer *consumer) error { return nil } -func (mq *mq) initializeConsumersWorker(consumer *consumer, worker *worker) error { +func (mq *MessageQueue) initializeConsumersWorker(consumer *consumer, worker *worker) error { channel, err := mq.connection.Channel() if err != nil { return err @@ -434,7 +419,7 @@ func (mq *mq) initializeConsumersWorker(consumer *consumer, worker *worker) erro // Reconnect stops current producers and consumers, // recreates connection to the rabbit and than runs producers and consumers. -func (mq *mq) reconnect() { +func (mq *MessageQueue) reconnect() { startedReconnect := atomic.CompareAndSwapInt32(&mq.reconnectStatus, statusReadyForReconnect, statusReconnecting) // There is no need to start a new reconnect if the previous one is not finished yet. if !startedReconnect { @@ -460,7 +445,7 @@ func (mq *mq) reconnect() { } } -func (mq *mq) stopProducersAndConsumers() { +func (mq *MessageQueue) stopProducersAndConsumers() { mq.producers.GoEach(func(producer internalProducer) { producer.Stop() }) diff --git a/mq_test.go b/mq_test.go index 412e0d3..f861974 100644 --- a/mq_test.go +++ b/mq_test.go @@ -94,7 +94,7 @@ func TestMq_ProduceConsume(t *testing.T) { requireNoError(t, err, "Failed to get config for test") mq, err := New(config) - requireNoError(t, err, "Failed to create a new instance of mq") + requireNoError(t, err, "Failed to create a new instance of MessageQueue") defer mq.Close() done := make(chan struct{}) @@ -242,7 +242,7 @@ func TestMq_Reconnect(t *testing.T) { }, }) - requireNoError(t, err, "Can't create a new instance of mq") + requireNoError(t, err, "Can't create a new instance of MessageQueue") expectedMessage := []byte("test") var messageWasRead int32 @@ -313,7 +313,7 @@ func TestMq_Consumer_Exists(t *testing.T) { }}, }) - requireNoError(t, err, "Can't create a new instance of mq") + requireNoError(t, err, "Can't create a new instance of MessageQueue") defer mq.Close() _, err = mq.Consumer(defaultConsumerName) @@ -332,7 +332,7 @@ func TestMq_Consumer_NotExists(t *testing.T) { DSN: dsnForTests, }) - requireNoError(t, err, "Can't create a new instance of mq") + requireNoError(t, err, "Can't create a new instance of MessageQueue") defer mq.Close() @@ -379,7 +379,7 @@ func TestMq_Producer_Exists(t *testing.T) { }, }) - requireNoError(t, err, "Can't create a new instance of mq") + requireNoError(t, err, "Can't create a new instance of MessageQueue") defer mq.Close() _, err = mq.AsyncProducer(defaultAsyncProducerName) @@ -402,7 +402,7 @@ func TestMq_Producer_NonExistent(t *testing.T) { DSN: dsnForTests, }) - requireNoError(t, err, "Can't create a new instance of mq") + requireNoError(t, err, "Can't create a new instance of MessageQueue") defer mq.Close() @@ -430,7 +430,7 @@ func TestMq_SetConsumerHandler_NonExistentConsumer(t *testing.T) { DSN: dsnForTests, }) - requireNoError(t, err, "Can't create a new instance of mq") + requireNoError(t, err, "Can't create a new instance of MessageQueue") defer mq.Close() @@ -510,7 +510,7 @@ func Test_mq_createConnection(t *testing.T) { cfg.TestMode = true cfg.normalize() - mq := &mq{ + mq := &MessageQueue{ config: cfg, errorChannel: make(chan error), internalErrorChannel: make(chan error), @@ -543,7 +543,7 @@ func TestMq_ConnectionState(t *testing.T) { cfg.TestMode = true cfg.normalize() - mq := &mq{ + mq := &MessageQueue{ config: cfg, errorChannel: make(chan error), internalErrorChannel: make(chan error), @@ -582,7 +582,7 @@ func TestMq_connect(t *testing.T) { cfg.TestMode = true cfg.normalize() - mq := &mq{ + mq := &MessageQueue{ config: cfg, errorChannel: make(chan error), internalErrorChannel: make(chan error), @@ -605,10 +605,10 @@ func TestMq_connect(t *testing.T) { } } -func assertNoMqError(t *testing.T, mq MQ) { +func assertNoMqError(t *testing.T, mq *MessageQueue) { select { case err := <-mq.Error(): - t.Error("Caught an unexpected error from mq: ", err) + t.Error("Caught an unexpected error from MessageQueue: ", err) default: } }