Skip to content

Commit

Permalink
AMQP Consumer plugin (#1678)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzampolin authored and danielnelson committed Mar 3, 2017
1 parent 1873abd commit 1074464
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ be deprecated eventually.
- [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors.
- [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs.
- [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK.
- [#1678](https://github.com/influxdata/telegraf/pull/1678): Add AMQP consumer input plugin

### Bugfixes

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ configuration options.

## Input Plugins

* [aws cloudwatch](./plugins/inputs/cloudwatch)
* [aerospike](./plugins/inputs/aerospike)
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
* [apache](./plugins/inputs/apache)
* [aws cloudwatch](./plugins/inputs/cloudwatch)
* [bcache](./plugins/inputs/bcache)
* [cassandra](./plugins/inputs/cassandra)
* [ceph](./plugins/inputs/ceph)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package all

import (
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
Expand Down
47 changes: 47 additions & 0 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# AMQP Consumer Input Plugin

This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).

Metrics are read from a topic exchange using the configured queue and binding_key.

Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).

For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- https://www.rabbitmq.com/getstarted.html

The following defaults are known to work with RabbitMQ:

```toml
# AMQP consumer plugin
[[inputs.amqp_consumer]]
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"

## Controls how many messages the server will try to keep on the network
## for consumers before receiving delivery acks.
#prefetch_count = 50

## Auth method. PLAIN and EXTERNAL are supported.
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```
280 changes: 280 additions & 0 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package amqp_consumer

import (
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/streadway/amqp"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string
// AMQP exchange
Exchange string
// Queue Name
Queue string
// Binding Key
BindingKey string `toml:"binding_key"`

// Controls how many messages the server will try to keep on the network
// for consumers before receiving delivery acks.
PrefetchCount int

// AMQP Auth method
AuthMethod string
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
}

type externalAuth struct{}

func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return fmt.Sprintf("\000")
}

const (
DefaultAuthMethod = "PLAIN"
DefaultPrefetchCount = 50
)

func (a *AMQPConsumer) SampleConfig() string {
return `
## AMQP url
url = "amqp://localhost:5672/influxdb"
## AMQP exchange
exchange = "telegraf"
## AMQP queue name
queue = "telegraf"
## Binding Key
binding_key = "#"
## Maximum number of messages server should give to the worker.
prefetch_count = 50
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`
}

func (a *AMQPConsumer) Description() string {
return "AMQP consumer plugin"
}

func (a *AMQPConsumer) SetParser(parser parsers.Parser) {
a.parser = parser
}

// All gathering is done in the Start function
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}

func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config
tls, err := internal.GetTLSConfig(
a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify)
if err != nil {
return nil, err
}

// parse auth method
var sasl []amqp.Authentication // nil by default

if strings.ToUpper(a.AuthMethod) == "EXTERNAL" {
sasl = []amqp.Authentication{&externalAuth{}}
}

config := amqp.Config{
TLSClientConfig: tls,
SASL: sasl, // if nil, it will be PLAIN
}
return &config, nil
}

// Start satisfies the telegraf.ServiceInput interface
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
amqpConf, err := a.createConfig()
if err != nil {
return err
}

msgs, err := a.connect(amqpConf)
if err != nil {
return err
}

a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)

go func() {
err := <-a.conn.NotifyClose(make(chan *amqp.Error))
if err == nil {
return
}

log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
for {
msgs, err := a.connect(amqpConf)
if err != nil {
log.Printf("E! AMQP connection failed: %s", err)
time.Sleep(10 * time.Second)
continue
}

a.wg.Add(1)
go a.process(msgs, acc)
break
}
}()

return nil
}

func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
conn, err := amqp.DialConfig(a.URL, *amqpConf)
if err != nil {
return nil, err
}
a.conn = conn

ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("Failed to open a channel: %s", err)
}

err = ch.ExchangeDeclare(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
}

q, err := ch.QueueDeclare(
a.Queue, // queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
}

err = ch.QueueBind(
q.Name, // queue
a.BindingKey, // binding-key
a.Exchange, // exchange
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("Failed to bind a queue: %s", err)
}

err = ch.Qos(
a.PrefetchCount,
0, // prefetch-size
false, // global
)
if err != nil {
return nil, fmt.Errorf("Failed to set QoS: %s", err)
}

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
}

log.Println("I! Started AMQP consumer")
return msgs, err
}

// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
} else {
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}

d.Ack(false)
}
log.Printf("I! AMQP consumer queue closed")
}

func (a *AMQPConsumer) Stop() {
err := a.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
return
}
a.wg.Wait()
log.Println("I! Stopped AMQP service")
}

func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
AuthMethod: DefaultAuthMethod,
PrefetchCount: DefaultPrefetchCount,
}
})
}
11 changes: 9 additions & 2 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# AMQP Output Plugin

This plugin writes to a AMQP exchange using tag, defined in configuration file
as RoutingTag, as a routing key.
This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).

Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.

If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.

This plugin doesn't bind exchange to a queue, so it should be done by consumer.

For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- https://www.rabbitmq.com/getstarted.html

### Configuration:

```
Expand All @@ -18,6 +23,8 @@ This plugin doesn't bind exchange to a queue, so it should be done by consumer.
## AMQP exchange
exchange = "telegraf"
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
Expand Down
Loading

0 comments on commit 1074464

Please sign in to comment.