Skip to content

Commit

Permalink
Add Filebeat input plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryilyin committed Oct 23, 2018
1 parent 8d0ec99 commit a781328
Show file tree
Hide file tree
Showing 5 changed files with 693 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/fibaro"
_ "github.com/influxdata/telegraf/plugins/inputs/file"
_ "github.com/influxdata/telegraf/plugins/inputs/filecount"
_ "github.com/influxdata/telegraf/plugins/inputs/filebeat"
_ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/fluentd"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
Expand Down
137 changes: 137 additions & 0 deletions plugins/inputs/filebeat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Filebeat Plugin
The Filebeat plugin will collect metrics from the given Filebeat instances.
### Configuration:
```toml
[[inputs.filebeat]]
## An URL from which to read Filebeat-formatted JSON
## Default is "http://127.0.0.1:5066"
url = "http://127.0.0.1:5066"

## Enable collection of the generic Beat stats
collect_beat_stats = true

## Enable the collection if Libbeat stats
collect_libbeat_stats = true

## Enable the collection of OS level stats
collect_system_stats = false

## Enable the collection of Filebeat stats
collect_filebeat_stats = true

## Timeout for HTTP requests
timeout = "5s"

## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"

## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
### Measurements & Fields
- **filebeat_beat**
* Fields:
- cpu_system_ticks
- cpu_system_time_ms
- cpu_total_ticks
- cpu_total_time_ms
- cpu_total_value
- cpu_user_ticks
- cpu_user_time_ms
- info_uptime_ms
- memstats_gc_next
- memstats_memory_alloc
- memstats_memory_total
- memstats_rss
* Tags:
- beat_host
- beat_id
- beat_name
- beat_version

- **filebeat**
* Fields:
- events_active
- events_added
- events_done
- harvester_closed
- harvester_open_files
- harvester_running
- harvester_skipped
- harvester_started
- input_log_files_renamed
- input_log_files_truncated
* Tags:
- beat_host
- beat_id
- beat_name
- beat_version

- **filebeat_libbeat**
* Fields:
- config_module_running
- config_module_starts
- config_module_stops
- config_reloads
- output_events_acked
- output_events_active
- output_events_batches
- output_events_dropped
- output_events_duplicates
- output_events_failed
- output_events_total
- output_read_bytes
- output_read_errors
- output_write_bytes
- output_write_errors
- outputs_kafka_bytes_read
- outputs_kafka_bytes_write
- pipeline_clients
- pipeline_events_active
- pipeline_events_dropped
- pipeline_events_failed
- pipeline_events_filtered
- pipeline_events_published
- pipeline_events_retry
- pipeline_events_total
- pipeline_queue_acked
* Tags:
- beat_host
- beat_id
- beat_name
- beat_version

- **filebeat_system**
* Field:
- cpu_cores
- load_1
- load_15
- load_5
- load_norm_1
- load_norm_15
- load_norm_5
* Tags:
- beat_host
- beat_id
- beat_name
- beat_version

### Example Output:
```
$ telegraf --input-filter filebeat --test
> filebeat_beat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6
cpu_system_ticks=656750,cpu_system_time_ms=656750,cpu_total_ticks=5461190,cpu_total_time_ms=5461198,cpu_total_value=5461190,cpu_user_ticks=4804440,cpu_user_time_ms=4804448,info_uptime_ms=342634196,memstats_gc_next=20199584,memstats_memory_alloc=12547424,memstats_memory_total=486296424792,memstats_rss=72552448 1540316047000000000
> filebeat_libbeat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6
config_module_running=0,config_module_starts=0,config_module_stops=0,config_reloads=0,output_events_acked=192404,output_events_active=0,output_events_batches=1607,output_events_dropped=0,output_events_duplicates=0,output_events_failed=0,output_events_total=192404,output_read_bytes=0,output_read_errors=0,output_write_bytes=0,output_write_errors=0,outputs_kafka_bytes_read=1118528,outputs_kafka_bytes_write=48002014,pipeline_clients=1,pipeline_events_active=0,pipeline_events_dropped=0,pipeline_events_failed=0,pipeline_events_filtered=11496,pipeline_events_published=192404,pipeline_events_retry=14,pipeline_events_total=203900,pipeline_queue_acked=192404 1540316047000000000
> filebeat_system,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6
cpu_cores=32,load_1=46.08,load_15=49.82,load_5=47.88,load_norm_1=1.44,load_norm_15=1.5569,load_norm_5=1.4963 1540316047000000000
> filebeat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6
events_active=0,events_added=3223,events_done=3223,harvester_closed=0,harvester_open_files=0,harvester_running=0,harvester_skipped=0,harvester_started=0,input_log_files_renamed=0,input_log_files_truncated=0 1540320286000000000
```
239 changes: 239 additions & 0 deletions plugins/inputs/filebeat/filebeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package filebeat

import (
"encoding/json"
"net/http"
"net/url"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"

jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)

const sampleConfig = `
[[inputs.filebeat]]
## An URL from which to read Filebeat-formatted JSON
## Default is "http://127.0.0.1:5066".
url = "http://127.0.0.1:5066"
## Enable collection of the generic Beat stats
collect_beat_stats = true
## Enable the collection if Libbeat stats
collect_libbeat_stats = true
## Enable the collection of OS level stats
collect_system_stats = false
## Enable the collection of Filebeat stats
collect_filebeat_stats = true
## Timeout for HTTP requests
timeout = "5s"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`

const SuffixInfo = "/"
const SuffixStats = "/stats"

type FileBeatInfo struct {
Beat string `json:"beat"`
Hostname string `json:"hostname"`
Name string `json:"name"`
UUID string `json:"uuid"`
Version string `json:"version"`
}

type FileBeatStats struct {
Beat map[string]interface{} `json:"beat"`
Filebeat interface{} `json:"filebeat"`
Libbeat interface{} `json:"libbeat"`
System interface{} `json:"system"`
}

type Filebeat struct {
URL string `toml:"url"`

CollectBeatStats bool `toml:"collect_beat_stats"`
CollectLibbeatStats bool `toml:"collect_libbeat_stats"`
CollectSystemStats bool `toml:"collect_system_stats"`
CollectFilebeatStats bool `toml:"collect_filebeat_stats"`

Username string `toml:"username"`
Password string `toml:"password"`
Timeout internal.Duration `toml:"timeout"`

tls.ClientConfig
client *http.Client
}

func NewFilebeat() *Filebeat {
return &Filebeat{
URL: "http://127.0.0.1:5066",
CollectBeatStats: true,
CollectLibbeatStats: true,
CollectSystemStats: true,
CollectFilebeatStats: true,
Timeout: internal.Duration{Duration: time.Second * 5},
}
}

func (filebeat *Filebeat) Description() string { return "Read metrics exposed by Filebeat" }

func (filebeat *Filebeat) SampleConfig() string { return sampleConfig }

func (filebeat *Filebeat) createHTTPClient() (*http.Client, error) {
tlsCfg, err := filebeat.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: filebeat.Timeout.Duration,
}

return client, nil
}

func (filebeat *Filebeat) gatherJsonData(url string, v interface{}) (err error) {

request, err := http.NewRequest("GET", url, nil)

if (filebeat.Username != "") || (filebeat.Password != "") {
request.SetBasicAuth(filebeat.Username, filebeat.Password)
}

response, err := filebeat.client.Do(request)
if err != nil {
return err
}

defer response.Body.Close()

if err = json.NewDecoder(response.Body).Decode(v); err != nil {
return err
}

return nil
}

func (filebeat *Filebeat) gatherInfoTags(url string) (map[string]string, error) {
fileBeatInfo := &FileBeatInfo{}

err := filebeat.gatherJsonData(url, fileBeatInfo)
if err != nil {
return nil, err
}

tags := map[string]string{
"beat_id": fileBeatInfo.UUID,
"beat_name": fileBeatInfo.Name,
"beat_host": fileBeatInfo.Hostname,
"beat_version": fileBeatInfo.Version,
}

return tags, nil
}

func (filebeat *Filebeat) gatherStats(accumulator telegraf.Accumulator) error {
fileBeatStats := &FileBeatStats{}

infoUrl, err := url.Parse(filebeat.URL + SuffixInfo)
if err != nil {
return err
}
statsUrl, err := url.Parse(filebeat.URL + SuffixStats)
if err != nil {
return err
}

tags, err := filebeat.gatherInfoTags(infoUrl.String())
if err != nil {
return err
}

err = filebeat.gatherJsonData(statsUrl.String(), fileBeatStats)
if err != nil {
return err
}

if filebeat.CollectBeatStats {
flattenerBeat := jsonparser.JSONFlattener{}
err := flattenerBeat.FlattenJSON("", fileBeatStats.Beat)
if err != nil {
return err
}
accumulator.AddFields("filebeat_beat", flattenerBeat.Fields, tags)
}

if filebeat.CollectFilebeatStats {
flattenerFilebeat := jsonparser.JSONFlattener{}
err := flattenerFilebeat.FlattenJSON("", fileBeatStats.Filebeat)
if err != nil {
return err
}
accumulator.AddFields("filebeat", flattenerFilebeat.Fields, tags)
}

if filebeat.CollectLibbeatStats {
flattenerLibbeat := jsonparser.JSONFlattener{}
err := flattenerLibbeat.FlattenJSON("", fileBeatStats.Libbeat)
if err != nil {
return err
}
accumulator.AddFields("filebeat_libbeat", flattenerLibbeat.Fields, tags)
}

if filebeat.CollectSystemStats {
flattenerSystem := jsonparser.JSONFlattener{}
err := flattenerSystem.FlattenJSON("", fileBeatStats.System)
if err != nil {
return err
}
accumulator.AddFields("filebeat_system", flattenerSystem.Fields, tags)
}

return nil
}

func (filebeat *Filebeat) Gather(accumulator telegraf.Accumulator) error {
if filebeat.client == nil {
client, err := filebeat.createHTTPClient()

if err != nil {
return err
}
filebeat.client = client
}

err := filebeat.gatherStats(accumulator)
if err != nil {
return err
}

return nil
}

func init() {
inputs.Add("filebeat", func() telegraf.Input {
return NewFilebeat()
})
}
Loading

0 comments on commit a781328

Please sign in to comment.