Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added status to monitor run log report.
- Upgrade node to latest LTS v18.20.3. {pull}40038[40038]
- Add journey duration to synthetics browser events. {pull}40230[40230]
- Add monitor status reporter under managed mode. {pull}41077[41077]

*Metricbeat*

Expand Down
18 changes: 18 additions & 0 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
)

// ErrMonitorDisabled is returned when the monitor plugin is marked as disabled.
Expand Down Expand Up @@ -71,6 +72,12 @@ type Monitor struct {
stats plugin.RegistryRecorder

monitorStateTracker *monitorstate.Tracker
statusReporter status.StatusReporter
}

// SetStatusReporter
func (m *Monitor) SetStatusReporter(statusReporter status.StatusReporter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since its set on the Monitor level, what happens if multiple monitors were configured, does the errors get accumulated or there is upper limit to how its shown in the UI ?

Copy link
Contributor Author

@emilioalvap emilioalvap Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every monitor will map 1:1 to an agent integration, which Fleet UI already shows individually:
image

m.statusReporter = statusReporter
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand Down Expand Up @@ -175,6 +182,9 @@ func newMonitorUnsafe(

logp.L().Error(fullErr)
p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) {
// if statusReporter is set, as it is for running managed-mode, update the input status
// to failed, specifying the error
m.updateStatus(status.Failed, fmt.Sprintf("monitor could not be started: %s, err: %s", m.stdFields.ID, fullErr))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this

	// Failed is status describing unit is failed. This status should
	// only be used in the case the beat should stop running as the failure
	// cannot be recovered.

Could this cause other HB to stop and also other monitors from running? Is this intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this cause other HB to stop and also other monitors from running?

It probably won't (it doesn't, as of now). Even if that were the case, since the status is scoped at monitor level, it should only filter the failed integrations, but I'm speculating here. There are also multiple status layers, this change only affects the stream (not even the integration) status.
As for the status, either failed or degraded should achieve the same purpose, I'm open to discussion on the implications. I leaned on failed because the type of error that is caught on this part is generally not recoverable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried if it could stop the other monitors. But if thats not the case, I am not super inclined towards changing this.

return nil, fullErr
}}

Expand Down Expand Up @@ -237,6 +247,7 @@ func (m *Monitor) Start() {

m.stats.StartMonitor(int64(m.endpoints))
m.state = MON_STARTED
m.updateStatus(status.Running, "")
}

// Stop stops the monitor without freeing it in global dedup
Expand All @@ -262,4 +273,11 @@ func (m *Monitor) Stop() {

m.stats.StopMonitor(int64(m.endpoints))
m.state = MON_STOPPED
m.updateStatus(status.Stopped, "")
}

func (m *Monitor) updateStatus(status status.Status, msg string) {
if m.statusReporter != nil {
m.statusReporter.UpdateStatus(status, msg)
}
}
61 changes: 61 additions & 0 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package monitors

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/config"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand All @@ -32,7 +34,9 @@ import (
"github.com/elastic/go-lookslike/testslike"
"github.com/elastic/go-lookslike/validator"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/management/status"
)

// TestMonitorBasic tests a basic config
Expand Down Expand Up @@ -131,3 +135,60 @@ func TestCheckInvalidConfig(t *testing.T) {

require.Error(t, checkMonitorConfig(serverMonConf, reg))
}

type MockStatusReporter struct {
us func(status status.Status, msg string)
}

func (sr *MockStatusReporter) UpdateStatus(status status.Status, msg string) {
sr.us(status, msg)
}

func TestStatusReporter(t *testing.T) {
confMap := map[string]interface{}{
"type": "fail",
"urls": []string{"http://example.net"},
"schedule": "@every 1ms",
"name": "myName",
"id": "myId",
}
conf, err := config.NewConfigFrom(confMap)
require.NoError(t, err)

reg, _, _ := mockPluginsReg()
pipel := &MockPipeline{}
monReg := monitoring.NewRegistry()

mockDegradedPluginFactory := plugin.PluginFactory{
Name: "fail",
Aliases: []string{"failAlias"},
Make: func(s string, config *config.C) (plugin.Plugin, error) {
return plugin.Plugin{}, fmt.Errorf("error plugin")
},
Stats: plugin.NewPluginCountersRecorder("fail", monReg),
}
reg.Add(mockDegradedPluginFactory)

sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, true)
defer sched.Stop()

c, err := pipel.Connect()
require.NoError(t, err)
m, err := newMonitor(conf, reg, c, sched.Add, nil, nil)
require.NoError(t, err)

// Track status marked as failed during run_once execution
var failed bool = false
m.SetStatusReporter(&MockStatusReporter{
us: func(s status.Status, msg string) {
if s == status.Failed {
failed = true
}
},
})
m.Start()

sched.WaitForRunOnce()

require.True(t, failed)
}