diff --git a/.travis.yml b/.travis.yml index 97b620d..74f9990 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ +os: linux +dist: trusty language: go services: @@ -7,9 +9,13 @@ go: - 1.9.x - 1.10.x - 1.11.x + - 1.12.x + - 1.13.x + - 1.14.x - tip script: + - go get ./... - gofmt -d -s . - go vet *.go - go test -v -race -mock-broker=1 # Run tests with real broker. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1e7d22f --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/cheshir/go-mq + +go 1.14 + +require ( + github.com/NeowayLabs/wabbit v0.0.0-20200409220312-12e68ab5b0c6 + github.com/google/uuid v1.1.1 // indirect + github.com/pborman/uuid v1.2.0 // indirect + github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71 + gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0 +) diff --git a/mq.go b/mq.go index 7c2e1d0..98f1295 100644 --- a/mq.go +++ b/mq.go @@ -195,7 +195,6 @@ func (mq *mq) errorHandler() { } func (mq *mq) processError(err error) { - println("got error: " + err.Error()) switch err.(type) { case *net.OpError: go mq.reconnect() diff --git a/mq_test.go b/mq_test.go index f88bdb2..119f9f5 100644 --- a/mq_test.go +++ b/mq_test.go @@ -3,6 +3,8 @@ package mq import ( "encoding/json" "flag" + "runtime" + "strings" "sync/atomic" "testing" "time" @@ -44,7 +46,7 @@ func TestMq_ConnectToStoppedBroker(t *testing.T) { func TestMq_ProduceConsume(t *testing.T) { if brokerIsMocked { broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start(), "Failed to start broker") defer broker.Stop() } @@ -62,14 +64,11 @@ func TestMq_ProduceConsume(t *testing.T) { "with json config", func() (Config, error) { marshaled, err := json.Marshal(newDefaultConfig()) - if err != nil { - t.Fatalf("Failed to marshal config to JSON: %v", err) - } + requireNoError(t, err, "Failed marshal config to json") config := Config{} - if err = json.Unmarshal(marshaled, &config); err != nil { - t.Fatalf("Failed to unmarshal test data: %v", err) - } + err = json.Unmarshal(marshaled, &config) + requireNoError(t, err, "Failed unmarshal test data") return config, nil }, @@ -78,14 +77,11 @@ func TestMq_ProduceConsume(t *testing.T) { "with yaml config", func() (Config, error) { marshaled, err := yaml.Marshal(newDefaultConfig()) - if err != nil { - t.Fatalf("Failed to marshal config to yaml: %v", err) - } + requireNoError(t, err, "Failed marshal config to yaml") config := Config{} - if err = yaml.Unmarshal(marshaled, &config); err != nil { - t.Fatalf("Failed to unmarshal test data: %v", err) - } + err = yaml.Unmarshal(marshaled, &config) + requireNoError(t, err, "Failed unmarshal test data") return config, err }, @@ -95,14 +91,10 @@ func TestMq_ProduceConsume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { config, err := tc.Config() - if err != nil { - t.Fatalf("Failed to get config for test: %v", err) - } + requireNoError(t, err, "Failed to get config for test") mq, err := New(config) - if err != nil { - t.Fatalf("Failed to create a new instance of mq: %v", err) - } + requireNoError(t, err, "Failed to create a new instance of mq") defer mq.Close() done := make(chan struct{}) @@ -110,7 +102,7 @@ func TestMq_ProduceConsume(t *testing.T) { for { select { case err := <-mq.Error(): - t.Errorf("unexpected error from queue: %v", err) + assertNoError(t, err, "Unexpected error from queue") case <-done: return } @@ -120,33 +112,28 @@ func TestMq_ProduceConsume(t *testing.T) { expectedMessage := []byte("test") asyncProducer, err := mq.AsyncProducer(defaultAsyncProducerName) - if err != nil { - t.Fatal("Failed to get registered producer") - } - + requireNoError(t, err, "Failed to get registered producer") asyncProducer.Produce(expectedMessage) syncProducer, err := mq.SyncProducer(defaultSyncProducerName) - if err != nil { - t.Fatal("Failed to get registered producer") - } - + requireNoError(t, err, "Failed to get registered producer") err = syncProducer.Produce(expectedMessage) - if err != nil { - t.Fatal("Failed to produce message") - } + requireNoError(t, err, "Failed to produce message") var messageWasRead int32 - mq.SetConsumerHandler(defaultConsumerName, func(message Message) { + handler := func(message Message) { atomic.AddInt32(&messageWasRead, 1) if !isSliceOfBytesIsEqual(expectedMessage, message.Body()) { - message.Reject(false) + assertNoError(t, message.Reject(false)) t.Errorf("Actual message '%s' is not equal to expected '%s'", message.Body(), expectedMessage) } - message.Ack(false) - }) + assertNoError(t, message.Ack(false)) + } + + err = mq.SetConsumerHandler(defaultConsumerName, handler) + requireNoError(t, err) waitForMessageDelivery() @@ -214,14 +201,12 @@ func newDefaultConfig() Config { } func TestMq_Reconnect(t *testing.T) { - t.Skip("Test skipped due to phantom bug described in https://github.com/cheshir/go-mq/issues/25") - if !brokerIsMocked { t.Skip("We can't stop real broker from test.") } broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start(), "Failed to start broker") messageQueue, err := New(Config{ DSN: dsnForTests, @@ -230,7 +215,7 @@ func TestMq_Reconnect(t *testing.T) { Name: defaultConsumerName, Queue: defaultQueueName, PrefetchCount: 1, - Workers: 4, + Workers: 1, }}, Exchanges: Exchanges{{ Name: defaultExchangeName, @@ -257,48 +242,43 @@ func TestMq_Reconnect(t *testing.T) { }, }) - if err != nil { - t.Fatal("Can't create a new instance of mq: ", err) - } - + requireNoError(t, err, "Can't create a new instance of mq") expectedMessage := []byte("test") var messageWasRead int32 - messageQueue.SetConsumerHandler(defaultConsumerName, func(message Message) { + handler := func(message Message) { atomic.AddInt32(&messageWasRead, 1) if !isSliceOfBytesIsEqual(expectedMessage, message.Body()) { - message.Reject(false) + assertNoError(t, message.Reject(false)) t.Errorf("Actual message '%s' is not equal to expected '%s'", message.Body(), expectedMessage) } - message.Ack(false) - }) + assertNoError(t, message.Ack(false)) + } - broker.Stop() // Force reconnect. + err = messageQueue.SetConsumerHandler(defaultConsumerName, handler) + requireNoError(t, err, "Failed to set consumer handler") + runtime.Gosched() // Required for running tests in build machine where only 1 cpu is available. - asyncProducer, err := messageQueue.AsyncProducer(defaultAsyncProducerName) - if err != nil { - t.Error("Failed to get registered producer") - } + // Force reconnect + requireNoError(t, broker.Stop(), "Failed to stop broker") + // Produce when connection to broker is down. + asyncProducer, err := messageQueue.AsyncProducer(defaultAsyncProducerName) + requireNoError(t, err, "Failed to get registered producer") asyncProducer.Produce(expectedMessage) syncProducer, err := messageQueue.SyncProducer(defaultSyncProducerName) - if err != nil { - t.Error("Failed to get registered producer") - } - - syncProducer.Produce(expectedMessage) + requireNoError(t, err, "Failed to get registered producer") + requireNoError(t, syncProducer.Produce(expectedMessage), "Sync producer error") - broker.Start() + requireNoError(t, broker.Start(), "Failed to start broker") defer broker.Stop() - waitForMessageDelivery() readMessages := atomic.LoadInt32(&messageWasRead) if readMessages != 2 { - // Probably known problem. Check https://github.com/cheshir/go-mq/issues/25 for details. t.Errorf("Consumer did not read messages. Produced %d, read %d", 2, readMessages) } @@ -308,7 +288,7 @@ func TestMq_Reconnect(t *testing.T) { func TestMq_Consumer_Exists(t *testing.T) { if brokerIsMocked { broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start()) defer broker.Stop() } @@ -333,24 +313,18 @@ func TestMq_Consumer_Exists(t *testing.T) { }}, }) - if err != nil { - t.Error("Can't create a new instance of mq: ", err) - } - + requireNoError(t, err, "Can't create a new instance of mq") defer mq.Close() _, err = mq.Consumer(defaultConsumerName) - if err != nil { - t.Error("Failed to get a consumer: ", err) - } - + assertNoError(t, err, "Failed to get a consumer") assertNoMqError(t, mq) } func TestMq_Consumer_NotExists(t *testing.T) { if brokerIsMocked { broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start()) defer broker.Stop() } @@ -358,9 +332,7 @@ func TestMq_Consumer_NotExists(t *testing.T) { DSN: dsnForTests, }) - if err != nil { - t.Error("Can't create a new instance of mq: ", err) - } + requireNoError(t, err, "Can't create a new instance of mq") defer mq.Close() @@ -375,7 +347,7 @@ func TestMq_Consumer_NotExists(t *testing.T) { func TestMq_Producer_Exists(t *testing.T) { if brokerIsMocked { broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start()) defer broker.Stop() } @@ -407,21 +379,14 @@ func TestMq_Producer_Exists(t *testing.T) { }, }) - if err != nil { - t.Error("Can't create a new instance of mq: ", err) - } - + requireNoError(t, err, "Can't create a new instance of mq") defer mq.Close() _, err = mq.AsyncProducer(defaultAsyncProducerName) - if err != nil { - t.Error("Failed to get an async producer: ", err) - } + assertNoError(t, err, "Failed to get an async producer") _, err = mq.SyncProducer(defaultSyncProducerName) - if err != nil { - t.Error("Failed to get a sync producer: ", err) - } + assertNoError(t, err, "Failed to get an sync producer") assertNoMqError(t, mq) } @@ -429,7 +394,7 @@ func TestMq_Producer_Exists(t *testing.T) { func TestMq_Producer_NonExistent(t *testing.T) { if brokerIsMocked { broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start()) defer broker.Stop() } @@ -437,9 +402,7 @@ func TestMq_Producer_NonExistent(t *testing.T) { DSN: dsnForTests, }) - if err != nil { - t.Error("Can't create a new instance of mq: ", err) - } + requireNoError(t, err, "Can't create a new instance of mq") defer mq.Close() @@ -459,7 +422,7 @@ func TestMq_Producer_NonExistent(t *testing.T) { func TestMq_SetConsumerHandler_NonExistentConsumer(t *testing.T) { if brokerIsMocked { broker := server.NewServer(dsnForTests) - broker.Start() + requireNoError(t, broker.Start()) defer broker.Stop() } @@ -467,9 +430,7 @@ func TestMq_SetConsumerHandler_NonExistentConsumer(t *testing.T) { DSN: dsnForTests, }) - if err != nil { - t.Error("Can't create a new instance of mq: ", err) - } + requireNoError(t, err, "Can't create a new instance of mq") defer mq.Close() @@ -541,7 +502,7 @@ func assertNoMqError(t *testing.T, mq MQ) { } func waitForMessageDelivery() { - time.Sleep(300 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } func isSliceOfBytesIsEqual(expected, actual []byte) bool { @@ -565,3 +526,23 @@ func isSliceOfBytesIsEqual(expected, actual []byte) bool { return true } + +func assertNoError(t *testing.T, err error, message ...string) { + if err != nil { + if len(message) == 0 { + message = []string{"Unexpected error"} + } + + t.Error(strings.Join(message, " ")+":", err) + } +} + +func requireNoError(t *testing.T, err error, message ...string) { + if err != nil { + if len(message) == 0 { + message = []string{"Unexpected error"} + } + + t.Fatal(strings.Join(message, " ")+":", err) + } +}