diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index 34ba5011367f..8727893beb76 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -39,15 +39,15 @@ var debugf = logp.MakeDebug("http") func create( name string, cfg *common.Config, -) ([]monitors.Job, error) { +) (jobs []monitors.Job, endpoints int, err error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { - return nil, err + return nil, 0, err } tls, err := outputs.LoadTLSConfig(config.TLS) if err != nil { - return nil, err + return nil, 0, err } var body []byte @@ -58,13 +58,13 @@ func create( compression := config.Check.Request.Compression enc, err = getContentEncoder(compression.Type, compression.Level) if err != nil { - return nil, err + return nil, 0, err } buf := bytes.NewBuffer(nil) err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody)) if err != nil { - return nil, err + return nil, 0, err } body = buf.Bytes() @@ -72,30 +72,30 @@ func create( validator := makeValidateResponse(&config.Check.Response) - jobs := make([]monitors.Job, len(config.URLs)) + jobs = make([]monitors.Job, len(config.URLs)) if config.ProxyURL != "" { transport, err := newRoundTripper(&config, tls) if err != nil { - return nil, err + return nil, 0, err } for i, url := range config.URLs { jobs[i], err = newHTTPMonitorHostJob(url, &config, transport, enc, body, validator) if err != nil { - return nil, err + return nil, 0, err } } } else { for i, url := range config.URLs { jobs[i], err = newHTTPMonitorIPsJob(&config, url, tls, enc, body, validator) if err != nil { - return nil, err + return nil, 0, err } } } - return jobs, nil + return jobs, len(config.URLs), nil } func newRoundTripper(config *Config, tls *transport.TLSConfig) (*http.Transport, error) { diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 8899cdf0fc5f..d6def089e096 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -54,7 +54,7 @@ func testTLSRequest(t *testing.T, testURL string, certPath string) beat.Event { config, err := common.NewConfigFrom(configSrc) require.NoError(t, err) - jobs, err := create("tls", config) + jobs, endpoints, err := create("tls", config) require.NoError(t, err) job := jobs[0] @@ -62,6 +62,8 @@ func testTLSRequest(t *testing.T, testURL string, certPath string) beat.Event { event, _, err := job.Run() require.NoError(t, err) + require.Equal(t, 1, endpoints) + return event } diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index aa7747f0a7d7..60c6d99e7e11 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -37,10 +37,10 @@ var debugf = logp.MakeDebug("icmp") func create( name string, cfg *common.Config, -) ([]monitors.Job, error) { +) (jobs []monitors.Job, endpoints int, err error) { config := DefaultConfig if err := cfg.Unpack(&config); err != nil { - return nil, err + return nil, 0, err } // TODO: check icmp is support by OS + check we've @@ -49,7 +49,6 @@ func create( // TODO: replace icmp package base reader/sender using raw sockets with // OS specific solution - var jobs []monitors.Job addJob := func(t monitors.Job, err error) error { if err != nil { return err @@ -61,7 +60,7 @@ func create( ipVersion := config.Mode.Network() if len(config.Hosts) > 0 && ipVersion == "" { err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled") - return nil, err + return nil, 0, err } var loopErr error @@ -71,11 +70,11 @@ func create( }) if loopErr != nil { debugf("Failed to initialize ICMP loop %v", loopErr) - return nil, loopErr + return nil, 0, loopErr } if err := loop.checkNetworkMode(ipVersion); err != nil { - return nil, err + return nil, 0, err } network := config.Mode.Network() @@ -90,11 +89,11 @@ func create( settings := monitors.MakeHostJobSettings(jobName, host, config.Mode) err := addJob(monitors.MakeByHostJob(settings, pingFactory)) if err != nil { - return nil, err + return nil, 0, err } } - return jobs, nil + return jobs, len(config.Hosts), nil } func createPingIPFactory(config *Config) func(*net.IPAddr) (common.MapStr, error) { diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 6bcf785b70b0..7a27446708c4 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -47,15 +47,15 @@ type connURL struct { func create( name string, cfg *common.Config, -) ([]monitors.Job, error) { +) (jobs []monitors.Job, endpoints int, err error) { config := DefaultConfig if err := cfg.Unpack(&config); err != nil { - return nil, err + return nil, 0, err } tls, err := outputs.LoadTLSConfig(config.TLS) if err != nil { - return nil, err + return nil, 0, err } defaultScheme := "tcp" @@ -63,17 +63,16 @@ func create( defaultScheme = "ssl" } - endpoints, err := collectHosts(&config, defaultScheme) + schemeHosts, err := collectHosts(&config, defaultScheme) if err != nil { - return nil, err + return nil, 0, err } typ := config.Name timeout := config.Timeout validator := makeValidateConn(&config) - var jobs []monitors.Job - for scheme, eps := range endpoints { + for scheme, eps := range schemeHosts { schemeTLS := tls if scheme == "tcp" || scheme == "plain" { schemeTLS = nil @@ -85,7 +84,7 @@ func create( TLS: schemeTLS, }) if err != nil { - return nil, err + return nil, 0, err } epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode, @@ -93,12 +92,17 @@ func create( return pingHost(dialer, addr, timeout, validator) }) if err != nil { - return nil, err + return nil, 0, err } jobs = append(jobs, epJobs...) } - return jobs, nil + + numHosts := 0 + for _, hosts := range schemeHosts { + numHosts += len(hosts) + } + return jobs, numHosts, nil } func collectHosts(config *Config, defaultScheme string) (map[string][]dialchain.Endpoint, error) { diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 9752157b2ff8..6c5864c343c3 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -45,7 +45,7 @@ func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event { }) require.NoError(t, err) - jobs, err := create("tcp", config) + jobs, endpoints, err := create("tcp", config) require.NoError(t, err) job := jobs[0] @@ -53,6 +53,8 @@ func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event { event, _, err := job.Run() require.NoError(t, err) + require.Equal(t, 1, endpoints) + return &event } @@ -65,7 +67,7 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string }) require.NoError(t, err) - jobs, err := create("tcp", config) + jobs, endpoints, err := create("tcp", config) require.NoError(t, err) job := jobs[0] @@ -73,6 +75,8 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string event, _, err := job.Run() require.NoError(t, err) + require.Equal(t, 1, endpoints) + return &event } diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go index 04fedc32cd84..a12c6c9738c7 100644 --- a/heartbeat/monitors/mocks_test.go +++ b/heartbeat/monitors/mocks_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/monitoring" ) type MockBeatClient struct { @@ -99,11 +100,13 @@ func createMockJob(name string, cfg *common.Config) ([]Job, error) { } func mockPluginBuilder() pluginBuilder { - return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) { + reg := monitoring.NewRegistry() + + return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, int, error) { c := common.Config{} j, err := createMockJob("test", &c) - return j, err - }} + return j, 1, err + }, newPluginCountersRecorder("test", reg)} } func mockPluginsReg() *pluginsReg { diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index d363e1560967..37742f8003ee 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -42,6 +42,8 @@ type Monitor struct { scheduler *scheduler.Scheduler jobTasks []*task enabled bool + // endpoints is a count of endpoints this monitor measures. + endpoints int // internalsMtx is used to synchronize access to critical // internal datastructures internalsMtx sync.Mutex @@ -51,6 +53,10 @@ type Monitor struct { watch watcher.Watch pipelineConnector beat.PipelineConnector + + // stats is the countersRecorder used to record lifecycle events + // for global metrics + telemetry + stats registryRecorder } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -95,9 +101,11 @@ func newMonitor( watchPollTasks: []*task{}, internalsMtx: sync.Mutex{}, config: config, + stats: monitorPlugin.stats, } - jobs, err := monitorPlugin.create(config) + jobs, endpoints, err := monitorPlugin.create(config) + m.endpoints = endpoints if err != nil { return nil, fmt.Errorf("job err %v", err) } @@ -181,7 +189,8 @@ func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error { return } - watchJobs, err := monitorPlugin.create(merged) + watchJobs, endpoints, err := monitorPlugin.create(merged) + m.endpoints = endpoints if err != nil { logp.Err("Could not create job from watch file: %v", err) } @@ -227,6 +236,8 @@ func (m *Monitor) Start() { for _, t := range m.watchPollTasks { t.Start() } + + m.stats.startMonitor(int64(m.endpoints)) } // Stop stops the Monitor's execution in its configured scheduler. @@ -242,4 +253,6 @@ func (m *Monitor) Stop() { for _, t := range m.watchPollTasks { t.Stop() } + + m.stats.stopMonitor(int64(m.endpoints)) } diff --git a/heartbeat/monitors/plugin.go b/heartbeat/monitors/plugin.go index 688732fd47cb..b4d88c3639d3 100644 --- a/heartbeat/monitors/plugin.go +++ b/heartbeat/monitors/plugin.go @@ -24,6 +24,7 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/plugin" ) @@ -31,10 +32,30 @@ type pluginBuilder struct { name string typ Type builder PluginBuilder + stats registryRecorder } var pluginKey = "heartbeat.monitor" +var statsRegistry = monitoring.Default.NewRegistry("heartbeat") +var stateRegistry = monitoring.GetNamespace("state").GetRegistry().NewRegistry("heartbeat") + +// stateGlobalRecorder records statistics across all plugin types +var stateGlobalRecorder = newRootGaugeRecorder(stateRegistry) + +func statsForPlugin(pluginName string) registryRecorder { + return multiRegistryRecorder{ + recorders: []registryRecorder{ + // state (telemetry) + newPluginGaugeRecorder(pluginName, stateRegistry), + // Record global monitors / endpoints count + newPluginCountersRecorder(pluginName, statsRegistry), + // When stats for this plugin are updated, update the global stats as well + stateGlobalRecorder, + }, + } +} + func init() { plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { p, ok := ifc.(pluginBuilder) @@ -42,25 +63,26 @@ func init() { return errors.New("plugin does not match monitor plugin type") } - return globalPluginsReg.register(pluginBuilder{p.name, p.typ, p.builder}) + stats := statsForPlugin(p.name) + return globalPluginsReg.register(pluginBuilder{p.name, p.typ, p.builder, stats}) }) } // PluginBuilder is the signature of functions used to build active -// monitors -type PluginBuilder func(string, *common.Config) ([]Job, error) +// monitorStarts +type PluginBuilder func(string, *common.Config) (jobs []Job, endpoints int, err error) // Type represents whether a plugin is active or passive. type Type uint8 const ( - // ActiveMonitor represents monitors that reach across the network to do things. + // ActiveMonitor represents monitorStarts that reach across the network to do things. ActiveMonitor Type = iota + 1 - // PassiveMonitor represents monitors that receive inbound data. + // PassiveMonitor represents monitorStarts that receive inbound data. PassiveMonitor ) -// globalPluginsReg maintains the canonical list of valid Heartbeat monitors at runtime. +// globalPluginsReg maintains the canonical list of valid Heartbeat monitorStarts at runtime. var globalPluginsReg = newPluginsReg() type pluginsReg struct { @@ -75,7 +97,8 @@ func newPluginsReg() *pluginsReg { // RegisterActive registers a new active (as opposed to passive) monitor. func RegisterActive(name string, builder PluginBuilder) { - if err := globalPluginsReg.add(pluginBuilder{name, ActiveMonitor, builder}); err != nil { + stats := statsForPlugin(name) + if err := globalPluginsReg.add(pluginBuilder{name, ActiveMonitor, builder, stats}); err != nil { panic(err) } } @@ -129,7 +152,7 @@ func (r *pluginsReg) monitorNames() []string { return names } -func (e *pluginBuilder) create(cfg *common.Config) ([]Job, error) { +func (e *pluginBuilder) create(cfg *common.Config) (jobs []Job, endpoints int, err error) { return e.builder(e.name, cfg) } diff --git a/heartbeat/monitors/plugin_test.go b/heartbeat/monitors/plugin_test.go deleted file mode 100644 index b8b199afa72b..000000000000 --- a/heartbeat/monitors/plugin_test.go +++ /dev/null @@ -1,249 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package monitors - -import ( - "reflect" - "testing" - - "github.com/elastic/beats/libbeat/common" -) - -func Test_newPluginsReg(t *testing.T) { - tests := []struct { - name string - want *pluginsReg - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := newPluginsReg(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("newPluginsReg() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestRegisterActive(t *testing.T) { - type args struct { - name string - builder PluginBuilder - } - tests := []struct { - name string - args args - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - RegisterActive(tt.args.name, tt.args.builder) - }) - } -} - -func TestMonitorPluginAlreadyExistsError_Error(t *testing.T) { - type fields struct { - name string - typ Type - builder PluginBuilder - } - tests := []struct { - name string - fields fields - want string - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := ErrPluginAlreadyExists{ - name: tt.fields.name, - typ: tt.fields.typ, - builder: tt.fields.builder, - } - if got := m.Error(); got != tt.want { - t.Errorf("ErrPluginAlreadyExists.Error() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_pluginsReg_add(t *testing.T) { - type fields struct { - monitors map[string]pluginBuilder - } - type args struct { - plugin pluginBuilder - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &pluginsReg{ - monitors: tt.fields.monitors, - } - if err := r.add(tt.args.plugin); (err != nil) != tt.wantErr { - t.Errorf("pluginsReg.add() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func Test_pluginsReg_register(t *testing.T) { - type fields struct { - monitors map[string]pluginBuilder - } - type args struct { - plugin pluginBuilder - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &pluginsReg{ - monitors: tt.fields.monitors, - } - if err := r.register(tt.args.plugin); (err != nil) != tt.wantErr { - t.Errorf("pluginsReg.register() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func Test_pluginsReg_get(t *testing.T) { - type fields struct { - monitors map[string]pluginBuilder - } - type args struct { - name string - } - tests := []struct { - name string - fields fields - args args - want pluginBuilder - want1 bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &pluginsReg{ - monitors: tt.fields.monitors, - } - got, got1 := r.get(tt.args.name) - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("pluginsReg.get() got = %v, want %v", got, tt.want) - } - if got1 != tt.want1 { - t.Errorf("pluginsReg.get() got1 = %v, want %v", got1, tt.want1) - } - }) - } -} - -func Test_pluginsReg_String(t *testing.T) { - type fields struct { - monitors map[string]pluginBuilder - } - tests := []struct { - name string - fields fields - want string - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &pluginsReg{ - monitors: tt.fields.monitors, - } - if got := r.String(); got != tt.want { - t.Errorf("pluginsReg.String() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_pluginBuilder_create(t *testing.T) { - type fields struct { - name string - typ Type - builder PluginBuilder - } - type args struct { - cfg *common.Config - } - tests := []struct { - name string - fields fields - args args - want []Job - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := &pluginBuilder{ - name: tt.fields.name, - typ: tt.fields.typ, - builder: tt.fields.builder, - } - got, err := e.create(tt.args.cfg) - if (err != nil) != tt.wantErr { - t.Errorf("pluginBuilder.create() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("pluginBuilder.create() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestType_String(t *testing.T) { - tests := []struct { - name string - t Type - want string - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.t.String(); got != tt.want { - t.Errorf("Type.String() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/heartbeat/monitors/regrecord.go b/heartbeat/monitors/regrecord.go new file mode 100644 index 000000000000..b779d68d0e0e --- /dev/null +++ b/heartbeat/monitors/regrecord.go @@ -0,0 +1,102 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "github.com/elastic/beats/libbeat/monitoring" +) + +type registryRecorder interface { + startMonitor(endpoints int64) + stopMonitor(endpoints int64) +} + +// multiRegistryRecorder composes multiple statsRecorders. +type multiRegistryRecorder struct { + recorders []registryRecorder +} + +func (mr multiRegistryRecorder) startMonitor(endpoints int64) { + for _, recorder := range mr.recorders { + recorder.startMonitor(endpoints) + } +} + +func (mr multiRegistryRecorder) stopMonitor(endpoints int64) { + for _, recorder := range mr.recorders { + recorder.stopMonitor(endpoints) + } +} + +// countersRecorder is used to record start/stop events for a single monitor/plugin +// to a single registry as counters. +type countersRecorder struct { + monitorStarts *monitoring.Int + monitorStops *monitoring.Int + endpointStarts *monitoring.Int + endpointStops *monitoring.Int +} + +func newPluginCountersRecorder(pluginName string, rootRegistry *monitoring.Registry) registryRecorder { + pluginRegistry := rootRegistry.NewRegistry(pluginName) + return countersRecorder{ + monitoring.NewInt(pluginRegistry, "monitor_starts"), + monitoring.NewInt(pluginRegistry, "monitor_stops"), + monitoring.NewInt(pluginRegistry, "endpoint_starts"), + monitoring.NewInt(pluginRegistry, "endpoint_stops"), + } +} + +func (r countersRecorder) startMonitor(endpoints int64) { + r.monitorStarts.Inc() + r.endpointStarts.Add(endpoints) +} + +func (r countersRecorder) stopMonitor(endpoints int64) { + r.monitorStops.Inc() + r.endpointStops.Add(endpoints) +} + +// countersRecorder is used to record start/stop events for a single monitor/plugin +// to a single registry as gauges. +type gaugeRecorder struct { + monitors *monitoring.Int + endpoints *monitoring.Int +} + +func newRootGaugeRecorder(r *monitoring.Registry) registryRecorder { + return gaugeRecorder{ + monitoring.NewInt(r, "monitors"), + monitoring.NewInt(r, "endpoints"), + } +} + +func newPluginGaugeRecorder(pluginName string, rootRegistry *monitoring.Registry) registryRecorder { + pluginRegistry := rootRegistry.NewRegistry(pluginName) + return newRootGaugeRecorder(pluginRegistry) +} + +func (r gaugeRecorder) startMonitor(endpoints int64) { + r.monitors.Inc() + r.endpoints.Add(endpoints) +} + +func (r gaugeRecorder) stopMonitor(endpoints int64) { + r.monitors.Dec() + r.endpoints.Sub(endpoints) +} diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index cd81673436ba..fb61b9ad5d2e 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -135,7 +135,7 @@ func (t *task) Start() { tf := t.makeSchedulerTaskFunc() t.cancelFn, err = t.monitor.scheduler.Add(t.config.Schedule, t.job.Name(), tf) if err != nil { - logp.Err("could not start monitor: %v, err") + logp.Err("could not start monitor: %v", err) } } diff --git a/heartbeat/tests/system/heartbeat.py b/heartbeat/tests/system/heartbeat.py index 8bf11f4c7686..b74e17d63cf8 100644 --- a/heartbeat/tests/system/heartbeat.py +++ b/heartbeat/tests/system/heartbeat.py @@ -2,6 +2,7 @@ import sys import BaseHTTPServer import threading +import nose.tools sys.path.append(os.path.join(os.path.dirname( __file__), '../../../libbeat/tests/system')) @@ -32,3 +33,43 @@ def do_GET(self): thread.start() return server + + @staticmethod + def http_cfg(url): + return """ +- type: http + schedule: "@every 1s" + urls: ["{url}"] + """[1:-1].format(url=url) + + @staticmethod + def tcp_cfg(*hosts): + host_str = ", ".join('"' + host + '"' for host in hosts) + return """ +- type: tcp + schedule: "@every 1s" + hosts: [{host_str}] + """[1:-1].format(host_str=host_str) + + def last_output_line(self): + return self.read_output()[-1] + + def write_dyn_config(self, filename, cfg): + with open(self.monitors_dir() + filename, 'w') as f: + f.write(cfg) + + def monitors_dir(self): + return self.working_dir + "/monitors.d/" + + def assert_last_status(self, status): + nose.tools.eq_(self.last_output_line()["monitor.status"], status) + + def setup_dynamic(self, extra_beat_args=[]): + os.mkdir(self.monitors_dir()) + self.render_config_template( + reload=True, + reload_path=self.monitors_dir() + "*.yml", + flush_min_events=1, + ) + + self.proc = self.start_beat(extra_args=extra_beat_args) diff --git a/heartbeat/tests/system/test_reload.py b/heartbeat/tests/system/test_reload.py index c9af001451c8..91735e517bf9 100644 --- a/heartbeat/tests/system/test_reload.py +++ b/heartbeat/tests/system/test_reload.py @@ -1,5 +1,4 @@ from heartbeat import BaseTest -import nose.tools import os @@ -14,7 +13,7 @@ def test_config_reload(self): """ server = self.start_server("hello world", 200) try: - self.setup() + self.setup_dynamic() cfg_file = "test.yml" @@ -43,7 +42,7 @@ def test_config_remove(self): """ server = self.start_server("hello world", 200) try: - self.setup() + self.setup_dynamic() cfg_file = "test.yml" @@ -70,7 +69,7 @@ def test_config_add(self): """ Test the addition of a dynamic config """ - self.setup() + self.setup_dynamic() self.wait_until(lambda: self.log_contains( "Starting reload procedure, current runners: 0")) @@ -88,34 +87,3 @@ def test_config_add(self): self.proc.check_kill_and_wait() finally: server.shutdown() - - def setup(self): - os.mkdir(self.monitors_dir()) - self.render_config_template( - reload=True, - reload_path=self.monitors_dir() + "*.yml", - flush_min_events=1, - ) - - self.proc = self.start_beat() - - def write_dyn_config(self, filename, cfg): - with open(self.monitors_dir() + filename, 'w') as f: - f.write(cfg) - - def monitors_dir(self): - return self.working_dir + "/monitors.d" - - def assert_last_status(self, status): - nose.tools.eq_(self.last_output_line()["monitor.status"], status) - - def last_output_line(self): - return self.read_output()[-1] - - @staticmethod - def http_cfg(url): - return """ -- type: http - schedule: "@every 1s" - urls: ["{url}"] - """[1:-1].format(url=url) diff --git a/heartbeat/tests/system/test_telemetry.py b/heartbeat/tests/system/test_telemetry.py new file mode 100644 index 000000000000..3ac2b0def934 --- /dev/null +++ b/heartbeat/tests/system/test_telemetry.py @@ -0,0 +1,122 @@ +from heartbeat import BaseTest +import urllib2 +import json +import nose.tools +import os +from nose.plugins.skip import SkipTest + + +class Test(BaseTest): + def __init__(self, *args): + self.proc = None + super(Test, self).__init__(*args) + + def test_telemetry(self): + """ + Test that telemetry metrics are correctly registered and increment / decrement + """ + if os.name == "nt": + # This test is currently skipped on windows because file permission + # configuration isn't implemented on Windows yet + raise SkipTest + + server = self.start_server("hello world", 200) + try: + self.setup_dynamic(["-E", "http.enabled=true"]) + + cfg_file = "test.yml" + + self.write_dyn_config( + cfg_file, self.http_cfg("http://localhost:8185") + ) + + self.wait_until(lambda: self.output_has(lines=1)) + + self.assert_stats({ + "http": { + "monitor_starts": 1, + "monitor_stops": 0, + "endpoint_starts": 1, + "endpoint_stops": 0, + } + }) + self.assert_state({ + "http": { + "monitors": 1, + "endpoints": 1, + } + }) + + tcp_hosts = ["localhost:8185", "localhost:12345"] + + self.write_dyn_config( + cfg_file, self.tcp_cfg(*tcp_hosts) + ) + + for tcp_host in tcp_hosts: + self.wait_until(lambda: self.log_contains( + "Start job 'tcp-tcp@{}".format(tcp_host))) + + init_lines = self.output_lines() + self.wait_until(lambda: self.output_has(lines=init_lines+2)) + + self.assert_stats({ + "http": { + "monitor_starts": 1, + "monitor_stops": 1, + "endpoint_starts": 1, + "endpoint_stops": 1, + }, + "tcp": { + "monitor_starts": 1, + "monitor_stops": 0, + "endpoint_starts": 2, + "endpoint_stops": 0, + } + }) + self.assert_state({ + "tcp": { + "monitors": 1, + "endpoints": 2, + } + }) + finally: + self.proc.check_kill_and_wait() + server.shutdown() + + @staticmethod + def assert_state(expected={}): + stats = json.loads(urllib2.urlopen( + "http://localhost:5066/state").read()) + + total_monitors = 0 + total_endpoints = 0 + + for proto in ("http", "tcp", "icmp"): + proto_expected = expected.get(proto, {}) + monitors = proto_expected.get("monitors", 0) + endpoints = proto_expected.get("endpoints", 0) + total_monitors += monitors + total_endpoints += endpoints + nose.tools.assert_dict_equal(stats['heartbeat'][proto], { + 'monitors': monitors, + 'endpoints': endpoints, + }) + + nose.tools.assert_equal(stats['heartbeat']['monitors'], total_monitors) + nose.tools.assert_equal( + stats['heartbeat']['endpoints'], total_endpoints) + + @staticmethod + def assert_stats(expected={}): + stats = json.loads(urllib2.urlopen( + "http://localhost:5066/stats").read()) + + for proto in ("http", "tcp", "icmp"): + proto_expected = expected.get(proto, {}) + nose.tools.assert_dict_equal(stats['heartbeat'][proto], { + 'monitor_starts': proto_expected.get("monitor_starts", 0), + 'monitor_stops': proto_expected.get("monitor_stops", 0), + 'endpoint_starts': proto_expected.get("endpoint_starts", 0), + 'endpoint_stops': proto_expected.get("endpoint_stops", 0), + })