Skip to content

Commit

Permalink
Add a test for PR #180
Browse files Browse the repository at this point in the history
Signed-off-by: Aitor Perez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Mar 7, 2023
1 parent 2b75ce4 commit 8307f3a
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package amqp091

import (
"bytes"
"context"
devrand "crypto/rand"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -2004,6 +2005,79 @@ func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
}
}

func TestMemoryNotReleasingAfterConsumingManyMessages(t *testing.T) {
// Setup
d, ok := t.Deadline()
var ctx context.Context
var cancel context.CancelFunc
if ok {
ctx, cancel = context.WithDeadline(context.Background(), d)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()

conn, channel := integrationQueue(t, t.Name())
for i := 0; i < 1_000_000; i++ {
err := channel.PublishWithContext(ctx, "", t.Name(), false, false, Publishing{
ContentType: "text/plain",
DeliveryMode: 2,
Body: []byte(fmt.Sprintf("Message %d", i)),
})
if err != nil {
t.Fatalf("failed to publish message: %v", err)
}
}
defer conn.Close()

_ = channel.Close()
// Then use the auto confirmation mode to start a consumer.
// The first minute the consumes one message per second, and then quickly consumes all the data in the queue
consumerChannel, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
defer consumerChannel.Close()

//err = consumerChannel.Qos(1, 0, false)
//if err != nil {
// t.Fatal(err)
//}

tt := time.Now()
deliveries, err := consumerChannel.Consume(t.Name(), t.Name(), true, false, false, false, Table{})
if err != nil {
t.Fatal(err)
}

// Act
var count int
ForLoop:
for {
select {
case _, ok := <-deliveries:
if !ok {
break ForLoop
}
count += 1
if time.Since(tt).Seconds() < 60.0 {
<-time.After(time.Second)
}
case <-ctx.Done():
t.Fatal(ctx.Err())
default:
if count >= 1_000_000 {
break ForLoop
}
}
}

// Assert
if count != 1_000_000 {
t.Fatalf("did not consume all messages: expected 1_000_000, got %d", count)
}
}

/*
* Support for integration tests
*/
Expand Down

0 comments on commit 8307f3a

Please sign in to comment.