Skip to content

Commit

Permalink
Add convenience function to set connection name (#108)
Browse files Browse the repository at this point in the history
* Add convenience function to set connection name

RabbitMQ reads the connection property "connection_name" from the client
and displays the value in the Management UI. This is convenient for
debugging and locating specific client connections, and correlating a
connection to an app.

Showed in the examples how to use this function and added an example in
Go Docs.

Signed-off-by: Aitor Perez Cedres <[email protected]>

* Minor refactor to use const string

Signed-off-by: Aitor Perez Cedres <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>
  • Loading branch information
Zerpet and lukebakken authored Aug 18, 2022
1 parent e8a1547 commit de416f1
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 4 deletions.
6 changes: 4 additions & 2 deletions _examples/simple-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Consumer struct {
}

func SetupCloseHandler(consumer *Consumer) {
c := make(chan os.Signal)
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
Expand All @@ -88,8 +88,10 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (

var err error

config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("sample-consumer")
Log.Printf("dialing %q", amqpURI)
c.conn, err = amqp.Dial(amqpURI)
c.conn, err = amqp.DialConfig(amqpURI, config)
if err != nil {
return nil, fmt.Errorf("Dial: %s", err)
}
Expand Down
6 changes: 4 additions & 2 deletions _examples/simple-producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
}

func SetupCloseHandler(done chan bool) {
c := make(chan os.Signal)
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
Expand All @@ -55,8 +55,10 @@ func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body s
// This function dials, connects, declares, publishes, and tears down,
// all in one go. In a real service, you probably want to maintain a
// long-lived connection as state, and publish against that.
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("sample-producer")
Log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
connection, err := amqp.DialConfig(amqpURI, config)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
Expand Down
7 changes: 7 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type Config struct {
Dial func(network, addr string) (net.Conn, error)
}

// NewConnectionProperties initialises an amqp.Table struct to empty value. This
// amqp.Table can be used as Properties in amqp.Config to set the connection
// name, using amqp.DialConfig()
func NewConnectionProperties() Table {
return make(Table)
}

// Connection manages the serialization and deserialization of frames from IO
// and dispatches the frames to the appropriate channel. All RPC methods and
// asynchronous Publishing, Delivery, Ack, Nack and Return messages are
Expand Down
17 changes: 17 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,20 @@ func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
t.Log("waiting for go-routines to terminate")
wg.Wait()
}

func TestConnectionConfigPropertiesWithClientProvidedConnectionName(t *testing.T) {
const expectedConnectionName = "amqp091-go-test"

connectionProperties := NewConnectionProperties()
connectionProperties.SetClientConnectionName(expectedConnectionName)

currentConnectionName, ok := connectionProperties["connection_name"]
if !ok {
t.Fatal("Connection name was not set by Table.SetClientConnectionName")
}
if currentConnectionName != expectedConnectionName {
t.Fatalf("Connection name is set to: %s. Expected: %s",
currentConnectionName,
expectedConnectionName)
}
}
12 changes: 12 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,15 @@ func ExampleConnection_NotifyBlocked() {
// Your application domain channel setup publishings
publishAllTheThings(conn)
}

func ExampleTable_SetClientConnectionName() {
// Sets the well-known connection_name property in amqp.Config. The connection
// name will be visible in RabbitMQ Management UI.
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("my-client-app")
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()
}
8 changes: 8 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ func (t Table) Validate() error {
return validateField(t)
}

// Sets the connection name property. This property can be used in
// amqp.Config to set a custom connection name during amqp.DialConfig(). This
// can be helpful to identify specific connections in RabbitMQ, for debugging or
// tracing purposes.
func (t Table) SetClientConnectionName(connName string) {
t["connection_name"] = connName
}

type message interface {
id() (uint16, uint16)
wait() bool
Expand Down

0 comments on commit de416f1

Please sign in to comment.