Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added status to monitor run log report.
- Upgrade node to latest LTS v18.20.3. {pull}40038[40038]
- Add journey duration to synthetics browser events. {pull}40230[40230]
- Add monitor status reporter under managed mode. {pull}41077[41077]

*Metricbeat*

Expand Down
18 changes: 18 additions & 0 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
)

// ErrMonitorDisabled is returned when the monitor plugin is marked as disabled.
Expand Down Expand Up @@ -71,6 +72,12 @@ type Monitor struct {
stats plugin.RegistryRecorder

monitorStateTracker *monitorstate.Tracker
statusReporter status.StatusReporter
}

// SetStatusReporter
func (m *Monitor) SetStatusReporter(statusReporter status.StatusReporter) {
m.statusReporter = statusReporter
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand Down Expand Up @@ -175,6 +182,9 @@ func newMonitorUnsafe(

logp.L().Error(fullErr)
p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) {
// if statusReporter is set, as it is for running managed-mode, update the input status
// to failed, specifying the error
m.updateStatus(status.Failed, fmt.Sprintf("monitor could not be started: %s, err: %s", m.stdFields.ID, fullErr))
return nil, fullErr
}}

Expand Down Expand Up @@ -237,6 +247,7 @@ func (m *Monitor) Start() {

m.stats.StartMonitor(int64(m.endpoints))
m.state = MON_STARTED
m.updateStatus(status.Running, "")
}

// Stop stops the monitor without freeing it in global dedup
Expand All @@ -262,4 +273,11 @@ func (m *Monitor) Stop() {

m.stats.StopMonitor(int64(m.endpoints))
m.state = MON_STOPPED
m.updateStatus(status.Stopped, "")
}

func (m *Monitor) updateStatus(status status.Status, msg string) {
if m.statusReporter != nil {
m.statusReporter.UpdateStatus(status, msg)
}
}
61 changes: 61 additions & 0 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package monitors

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/config"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand All @@ -32,7 +34,9 @@ import (
"github.com/elastic/go-lookslike/testslike"
"github.com/elastic/go-lookslike/validator"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/management/status"
)

// TestMonitorBasic tests a basic config
Expand Down Expand Up @@ -131,3 +135,60 @@ func TestCheckInvalidConfig(t *testing.T) {

require.Error(t, checkMonitorConfig(serverMonConf, reg))
}

type MockStatusReporter struct {
us func(status status.Status, msg string)
}

func (sr *MockStatusReporter) UpdateStatus(status status.Status, msg string) {
sr.us(status, msg)
}

func TestStatusReporter(t *testing.T) {
confMap := map[string]interface{}{
"type": "fail",
"urls": []string{"http://example.net"},
"schedule": "@every 1ms",
"name": "myName",
"id": "myId",
}
conf, err := config.NewConfigFrom(confMap)
require.NoError(t, err)

reg, _, _ := mockPluginsReg()
pipel := &MockPipeline{}
monReg := monitoring.NewRegistry()

mockDegradedPluginFactory := plugin.PluginFactory{
Name: "fail",
Aliases: []string{"failAlias"},
Make: func(s string, config *config.C) (plugin.Plugin, error) {
return plugin.Plugin{}, fmt.Errorf("error plugin")
},
Stats: plugin.NewPluginCountersRecorder("fail", monReg),
}
reg.Add(mockDegradedPluginFactory)

sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, true)
defer sched.Stop()

c, err := pipel.Connect()
require.NoError(t, err)
m, err := newMonitor(conf, reg, c, sched.Add, nil, nil)
require.NoError(t, err)

// Track status marked as failed during run_once execution
var failed bool = false
m.SetStatusReporter(&MockStatusReporter{
us: func(s status.Status, msg string) {
if s == status.Failed {
failed = true
}
},
})
m.Start()

sched.WaitForRunOnce()

require.True(t, failed)
}