From 8737812655b5731fbb692970ec4c8f568d9e5cca Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 2 Oct 2024 16:21:54 +0200 Subject: [PATCH 1/2] [Heartbeat] Add status reporter at monitor factory level --- heartbeat/monitors/monitor.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 29e7713145ca..79390a60ef04 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" ) // ErrMonitorDisabled is returned when the monitor plugin is marked as disabled. @@ -71,6 +72,12 @@ type Monitor struct { stats plugin.RegistryRecorder monitorStateTracker *monitorstate.Tracker + statusReporter status.StatusReporter +} + +// SetStatusReporter +func (m *Monitor) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -175,6 +182,9 @@ func newMonitorUnsafe( logp.L().Error(fullErr) p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) { + // if statusReporter is set, as it is for running managed-mode, update the input status + // to failed, specifying the error + m.updateStatus(status.Failed, fmt.Sprintf("monitor could not be started: %s, err: %s", m.stdFields.ID, fullErr)) return nil, fullErr }} @@ -237,6 +247,7 @@ func (m *Monitor) Start() { m.stats.StartMonitor(int64(m.endpoints)) m.state = MON_STARTED + m.updateStatus(status.Running, "") } // Stop stops the monitor without freeing it in global dedup @@ -262,4 +273,11 @@ func (m *Monitor) Stop() { m.stats.StopMonitor(int64(m.endpoints)) m.state = MON_STOPPED + m.updateStatus(status.Stopped, "") +} + +func (m *Monitor) updateStatus(status status.Status, msg string) { + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } } From 4a5b51a92a36d3c46ff1e6b0a0fc0a4adbdc59d2 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Fri, 4 Oct 2024 17:21:06 +0200 Subject: [PATCH 2/2] Add unit test and changelog --- CHANGELOG.next.asciidoc | 1 + heartbeat/monitors/monitor_test.go | 61 ++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8c5e0d74d5f0..c8e49b1f9f6f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -323,6 +323,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added status to monitor run log report. - Upgrade node to latest LTS v18.20.3. {pull}40038[40038] - Add journey duration to synthetics browser events. {pull}40230[40230] +- Add monitor status reporter under managed mode. {pull}41077[41077] *Metricbeat* diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 3176d27a2fa8..1ea3c52d6ca6 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -18,12 +18,14 @@ package monitors import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/config" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" @@ -32,7 +34,9 @@ import ( "github.com/elastic/go-lookslike/testslike" "github.com/elastic/go-lookslike/validator" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/scheduler" + "github.com/elastic/beats/v7/libbeat/management/status" ) // TestMonitorBasic tests a basic config @@ -131,3 +135,60 @@ func TestCheckInvalidConfig(t *testing.T) { require.Error(t, checkMonitorConfig(serverMonConf, reg)) } + +type MockStatusReporter struct { + us func(status status.Status, msg string) +} + +func (sr *MockStatusReporter) UpdateStatus(status status.Status, msg string) { + sr.us(status, msg) +} + +func TestStatusReporter(t *testing.T) { + confMap := map[string]interface{}{ + "type": "fail", + "urls": []string{"http://example.net"}, + "schedule": "@every 1ms", + "name": "myName", + "id": "myId", + } + conf, err := config.NewConfigFrom(confMap) + require.NoError(t, err) + + reg, _, _ := mockPluginsReg() + pipel := &MockPipeline{} + monReg := monitoring.NewRegistry() + + mockDegradedPluginFactory := plugin.PluginFactory{ + Name: "fail", + Aliases: []string{"failAlias"}, + Make: func(s string, config *config.C) (plugin.Plugin, error) { + return plugin.Plugin{}, fmt.Errorf("error plugin") + }, + Stats: plugin.NewPluginCountersRecorder("fail", monReg), + } + reg.Add(mockDegradedPluginFactory) + + sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, true) + defer sched.Stop() + + c, err := pipel.Connect() + require.NoError(t, err) + m, err := newMonitor(conf, reg, c, sched.Add, nil, nil) + require.NoError(t, err) + + // Track status marked as failed during run_once execution + var failed bool = false + m.SetStatusReporter(&MockStatusReporter{ + us: func(s status.Status, msg string) { + if s == status.Failed { + failed = true + } + }, + }) + m.Start() + + sched.WaitForRunOnce() + + require.True(t, failed) +}