Skip to content

Commit 6769d47

Browse files
Add a ring-buffer reporter to libbeat (elastic#28750)
* Add a ring-buffer reporter to libbeat Add a ring buffer reporter that when enabled will store configured namespaces in a buffer to allow operators to view recent metrics history. Defaults are to gather the stats namespace every 10s for 10m. Must be explicitly enabled, along with monitoring, and the HTTP endpoint. The buffer endpoint is intended to be used for diagnostics reporting. * Add basic benchmarks * Review feddback
1 parent 1c68693 commit 6769d47

File tree

9 files changed

+519
-1
lines changed

9 files changed

+519
-1
lines changed

libbeat/api/server.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package api
1919

2020
import (
21+
"errors"
2122
"fmt"
2223
"net"
2324
"net/http"
@@ -72,6 +73,25 @@ func (s *Server) Stop() error {
7273
return s.l.Close()
7374
}
7475

76+
// AttachHandler will attach a handler at the specified route and return an error instead of panicing.
77+
func (s *Server) AttachHandler(route string, h http.Handler) (err error) {
78+
defer func() {
79+
if r := recover(); r != nil {
80+
switch r := r.(type) {
81+
case error:
82+
err = r
83+
case string:
84+
err = errors.New(r)
85+
default:
86+
err = fmt.Errorf("handle attempted to panic with %v", r)
87+
}
88+
}
89+
}()
90+
s.log.Infof("Attempting to attach %q to server.", route)
91+
s.mux.Handle(route, h)
92+
return
93+
}
94+
7595
func parse(host string, port int) (string, string, error) {
7696
url, err := url.Parse(host)
7797
if err != nil {

libbeat/api/server_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package api
2020
import (
2121
"context"
2222
"fmt"
23+
"io"
2324
"io/ioutil"
2425
"net"
2526
"net/http"
@@ -184,3 +185,39 @@ func simpleMux() *http.ServeMux {
184185
})
185186
return mux
186187
}
188+
189+
func TestAttachHandler(t *testing.T) {
190+
url := "http://localhost:0"
191+
192+
cfg := common.MustNewConfigFrom(map[string]interface{}{
193+
"host": url,
194+
})
195+
196+
s, err := New(nil, simpleMux(), cfg)
197+
require.NoError(t, err)
198+
go s.Start()
199+
defer s.Stop()
200+
201+
h := &testHandler{}
202+
203+
err = s.AttachHandler("/test", h)
204+
require.NoError(t, err)
205+
206+
r, err := http.Get("http://" + s.l.Addr().String() + "/test")
207+
require.NoError(t, err)
208+
defer r.Body.Close()
209+
210+
body, err := io.ReadAll(r.Body)
211+
require.NoError(t, err)
212+
213+
assert.Equal(t, "test!", string(body))
214+
215+
err = s.AttachHandler("/test", h)
216+
assert.NotNil(t, err)
217+
}
218+
219+
type testHandler struct{}
220+
221+
func (t *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
222+
fmt.Fprintf(w, "test!")
223+
}

libbeat/cmd/instance/beat.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import (
6262
"github.com/elastic/beats/v7/libbeat/metric/system/host"
6363
"github.com/elastic/beats/v7/libbeat/monitoring"
6464
"github.com/elastic/beats/v7/libbeat/monitoring/report"
65+
"github.com/elastic/beats/v7/libbeat/monitoring/report/buffer"
6566
"github.com/elastic/beats/v7/libbeat/monitoring/report/log"
6667
"github.com/elastic/beats/v7/libbeat/outputs"
6768
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
@@ -105,6 +106,7 @@ type beatConfig struct {
105106
// beat internal components configurations
106107
HTTP *common.Config `config:"http"`
107108
HTTPPprof *common.Config `config:"http.pprof"`
109+
BufferConfig *common.Config `config:"http.buffer"`
108110
Path paths.Path `config:"path"`
109111
Logging *common.Config `config:"logging"`
110112
MetricLogging *common.Config `config:"logging.metrics"`
@@ -437,8 +439,9 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
437439
// Start the API Server before the Seccomp lock down, we do this so we can create the unix socket
438440
// set the appropriate permission on the unix domain file without having to whitelist anything
439441
// that would be set at runtime.
442+
var s *api.Server // buffer reporter may need to attach to the server.
440443
if b.Config.HTTP.Enabled() {
441-
s, err := api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace)
444+
s, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace)
442445
if err != nil {
443446
return errw.Wrap(err, "could not start the HTTP server for the API")
444447
}
@@ -474,6 +477,19 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
474477
defer reporter.Stop()
475478
}
476479

480+
// only collect into a ring buffer if HTTP, and the ring buffer are explicitly enabled
481+
if b.Config.HTTP.Enabled() && monitoring.IsBufferEnabled(b.Config.BufferConfig) {
482+
buffReporter, err := buffer.MakeReporter(b.Info, b.Config.BufferConfig)
483+
if err != nil {
484+
return err
485+
}
486+
defer buffReporter.Stop()
487+
488+
if err := s.AttachHandler("/buffer", buffReporter); err != nil {
489+
return err
490+
}
491+
}
492+
477493
ctx, cancel := context.WithCancel(context.Background())
478494
var stopBeat = func() {
479495
b.Instrumentation.Tracer().Close()

libbeat/monitoring/monitoring.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,18 @@ func IsEnabled(monitoringCfg *common.Config) bool {
108108

109109
return monitoringCfg.Enabled()
110110
}
111+
112+
// IsBufferEnabled will check if the monitoring buffer is explicitly enabled.
113+
func IsBufferEnabled(monitoringCfg *common.Config) bool {
114+
if monitoringCfg == nil {
115+
return false
116+
}
117+
fields := monitoringCfg.GetFields()
118+
for _, field := range fields {
119+
if field == "enabled" {
120+
// default Enabled will return true, so we only return the value if it's defined.
121+
return monitoringCfg.Enabled()
122+
}
123+
}
124+
return false
125+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package monitoring
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
)
28+
29+
func TestIsBufferEnabled(t *testing.T) {
30+
tests := []struct {
31+
name string
32+
input map[string]interface{}
33+
expect bool
34+
}{{
35+
name: "enabled",
36+
input: map[string]interface{}{
37+
"enabled": true,
38+
},
39+
expect: true,
40+
}, {
41+
name: "disabled",
42+
input: map[string]interface{}{
43+
"enabled": false,
44+
},
45+
expect: false,
46+
}, {
47+
name: "missing",
48+
input: map[string]interface{}{
49+
"size": 10,
50+
},
51+
expect: false,
52+
}, {
53+
name: "nil",
54+
input: nil,
55+
expect: false,
56+
}}
57+
for _, tt := range tests {
58+
t.Run(tt.name, func(t *testing.T) {
59+
cfg, err := common.NewConfigFrom(tt.input)
60+
require.NoError(t, err)
61+
assert.Equal(t, tt.expect, IsBufferEnabled(cfg))
62+
})
63+
}
64+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package buffer
19+
20+
import "sync"
21+
22+
// ringBuffer is a buffer with a fixed number of items that can be tracked.
23+
//
24+
// We assume that the size of the buffer is greater than one.
25+
// the buffer should be thread-safe.
26+
type ringBuffer struct {
27+
mu sync.Mutex
28+
entries []interface{}
29+
i int
30+
full bool
31+
}
32+
33+
// newBuffer returns a reference to a new ringBuffer with set size.
34+
func newBuffer(size int) *ringBuffer {
35+
return &ringBuffer{
36+
entries: make([]interface{}, size),
37+
}
38+
}
39+
40+
// add will add the passed entry to the buffer.
41+
func (r *ringBuffer) add(entry interface{}) {
42+
r.mu.Lock()
43+
defer r.mu.Unlock()
44+
r.entries[r.i] = entry
45+
r.i = (r.i + 1) % len(r.entries)
46+
if r.i == 0 {
47+
r.full = true
48+
}
49+
}
50+
51+
// getAll returns all entries in the buffer in order
52+
func (r *ringBuffer) getAll() []interface{} {
53+
r.mu.Lock()
54+
defer r.mu.Unlock()
55+
if r.i == 0 && !r.full {
56+
return []interface{}{}
57+
}
58+
if !r.full {
59+
return r.entries[:r.i]
60+
}
61+
if r.full && r.i == 0 {
62+
return r.entries
63+
}
64+
return append(r.entries[r.i:], r.entries[:r.i]...)
65+
}

0 commit comments

Comments
 (0)