Skip to content

Commit

Permalink
Improve logging around agent checkins. (#1477)
Browse files Browse the repository at this point in the history
Improve logging around agent checkins.

- Log transient checkin errors at Info.
- Upgrade to an Error log after 2 repeated failures.
- Log the wait time for the next retry.
- Only update local state after repeated failures.
  • Loading branch information
cmacknz authored Oct 12, 2022
1 parent 070af5f commit cd6ad3d
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,4 @@
- Add support for hints' based autodiscovery in kubernetes provider. {pull}[698]
- Improve logging during upgrades. {pull}[1287]
- Added status message to CheckinRequest {pull}[1369]
- Improve logging of Fleet checkins errors. {pull}[1477]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: enhancement
summary: Improve logging of Fleet check-in errors.
description: Improve logging of Fleet check-in errors and only report the local state as degraded after two consecutive failed check-ins.
pr: 1477
issue: 1154
39 changes: 24 additions & 15 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *fleetGateway) worker() {
// Execute the checkin call and for any errors returned by the fleet-server API
// the function will retry to communicate with fleet-server with an exponential delay and some
// jitter to help better distribute the load from a fleet of agents.
resp, err := f.doExecute()
resp, err := f.executeCheckinWithRetries()
if err != nil {
continue
}
Expand Down Expand Up @@ -274,21 +274,34 @@ func (f *fleetGateway) gatherQueuedActions(ts time.Time) (queued, expired []flee
return queued, expired
}

func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
func (f *fleetGateway) executeCheckinWithRetries() (*fleetapi.CheckinResponse, error) {
f.backoff.Reset()

// Guard if the context is stopped by a out of bound call,
// this mean we are rebooting to change the log level or the system is shutting us down.
for f.bgContext.Err() == nil {
f.log.Debugf("Checkin started")
resp, err := f.execute(f.bgContext)
resp, took, err := f.executeCheckin(f.bgContext)
if err != nil {
f.checkinFailCounter++
f.log.Errorf("Could not communicate with fleet-server checkin API will retry, error: %s", err)

// Report the first two failures at warn level as they may be recoverable with retries.
if f.checkinFailCounter <= 2 {
f.log.Warnw("Possible transient error during checkin with fleet-server, retrying",
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
"retry_after_ns", f.backoff.NextWait())
} else {
// Only update the local status after repeated failures: https://github.com/elastic/elastic-agent/issues/1148
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorw("Cannot checkin in with fleet-server, retrying",
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
"retry_after_ns", f.backoff.NextWait())
}

if !f.backoff.Wait() {
// Something bad has happened and we log it and we should update our current state.
err := errors.New(
"execute retry loop was stopped",
"checkin retry loop was stopped",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, f.client.URI()),
)
Expand All @@ -297,10 +310,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.localReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorf("checkin number %d failed: %s", f.checkinFailCounter, err.Error())
}
continue
}

Expand All @@ -319,7 +328,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
return nil, f.bgContext.Err()
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
func (f *fleetGateway) executeCheckin(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata()
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand All @@ -340,23 +349,23 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
Message: f.statusController.Status().Message,
}

resp, err := cmd.Execute(ctx, req)
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnenroll() {
f.log.Warnf("received an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
}, nil
}, took, nil
}

return nil, err
return nil, took, err
}

f.unauthCounter = 0
if err != nil {
return nil, err
return nil, took, err
}

// Save the latest ackToken
Expand All @@ -368,7 +377,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}
}

return resp, nil
return resp, took, nil
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
Expand Down
20 changes: 17 additions & 3 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
Expand Down Expand Up @@ -693,7 +694,7 @@ func TestRetriesOnFailures(t *testing.T) {
scheduler := scheduler.NewStepper()
client := newTestingClient()
dispatcher := newTestingDispatcher()
log, _ := logger.New("fleet_gateway", false)
log := newInfoLogger(t, "fleet_gateway")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -706,8 +707,8 @@ func TestRetriesOnFailures(t *testing.T) {
queue.On("Actions").Return([]fleetapi.Action{})

localReporter := &testutils.MockReporter{}
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
// The local state should only be reported as degraded after two consecutive failures.
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Once()
localReporter.On("Unregister").Maybe()

fleetReporter := &testutils.MockReporter{}
Expand Down Expand Up @@ -814,3 +815,16 @@ type testAgentInfo struct{}
func (testAgentInfo) AgentID() string { return "agent-secret" }

type request struct{}

func newInfoLogger(t *testing.T, name string) *logger.Logger {
t.Helper()

loggerCfg := logger.DefaultLoggingConfig()
loggerCfg.Level = logp.InfoLevel
loggerCfg.ToFiles = false
loggerCfg.ToStderr = true

log, err := logger.NewFromConfig("", loggerCfg, false)
require.NoError(t, err)
return log
}
5 changes: 5 additions & 0 deletions internal/pkg/core/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

package backoff

import "time"

// Backoff defines the interface for backoff strategies.
type Backoff interface {
// Wait blocks for a duration of time governed by the backoff strategy.
Wait() bool

// NextWait returns the duration of the next call to Wait().
NextWait() time.Duration

// Reset resets the backoff duration to an initial value governed by the backoff strategy.
Reset()
}
Expand Down
50 changes: 39 additions & 11 deletions internal/pkg/core/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@ import (

type factory func(<-chan struct{}) Backoff

func TestBackoff(t *testing.T) {
t.Run("test close channel", testCloseChannel)
t.Run("test unblock after some time", testUnblockAfterInit)
}

func testCloseChannel(t *testing.T) {
init := 2 * time.Second
max := 5 * time.Minute
func TestCloseChannel(t *testing.T) {
init := 2 * time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
Expand All @@ -42,9 +37,9 @@ func testCloseChannel(t *testing.T) {
}
}

func testUnblockAfterInit(t *testing.T) {
init := 1 * time.Second
max := 5 * time.Minute
func TestUnblockAfterInit(t *testing.T) {
init := 1 * time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
Expand All @@ -68,3 +63,36 @@ func testUnblockAfterInit(t *testing.T) {
})
}
}

func TestNextWait(t *testing.T) {
init := time.Millisecond
max := 5 * time.Second

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
b := f(c)

startWait := b.NextWait()
assert.Equal(t, startWait, b.NextWait(), "next wait not stable")

startedAt := time.Now()
b.Wait()
waitDuration := time.Now().Sub(startedAt)
nextWait := b.NextWait()

t.Logf("actualWait: %s startWait: %s nextWait: %s", waitDuration, startWait, nextWait)
assert.Less(t, startWait, nextWait, "wait value did not increase")
assert.GreaterOrEqual(t, waitDuration, startWait, "next wait duration <= actual wait duration")
})
}
}
17 changes: 12 additions & 5 deletions internal/pkg/core/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type EqualJitterBackoff struct {
duration time.Duration
done <-chan struct{}

init time.Duration
max time.Duration
init time.Duration
max time.Duration
nextRand time.Duration

last time.Time
}
Expand All @@ -29,6 +30,7 @@ func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backof
done: done,
init: init,
max: max,
nextRand: time.Duration(rand.Int63n(int64(init))), //nolint:gosec
}
}

Expand All @@ -38,13 +40,18 @@ func (b *EqualJitterBackoff) Reset() {
b.duration = b.init * 2
}

func (b *EqualJitterBackoff) NextWait() time.Duration {
// Make sure we have always some minimal back off and jitter.
temp := b.duration / 2
return temp + b.nextRand
}

// Wait block until either the timer is completed or channel is done.
func (b *EqualJitterBackoff) Wait() bool {
// Make sure we have always some minimal back off and jitter.
temp := int64(b.duration / 2)
backoff := time.Duration(temp + rand.Int63n(temp))
backoff := b.NextWait()

// increase duration for next wait.
b.nextRand = time.Duration(rand.Int63n(int64(b.duration)))
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
Expand Down
17 changes: 11 additions & 6 deletions internal/pkg/core/backoff/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ func (b *ExpBackoff) Reset() {
b.duration = b.init
}

func (b *ExpBackoff) NextWait() time.Duration {
nextWait := b.duration
nextWait *= 2
if nextWait > b.max {
nextWait = b.max
}
return nextWait
}

// Wait block until either the timer is completed or channel is done.
func (b *ExpBackoff) Wait() bool {
backoff := b.duration
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
}
b.duration = b.NextWait()

select {
case <-b.done:
return false
case <-time.After(backoff):
case <-time.After(b.duration):
b.last = time.Now()
return true
}
Expand Down
23 changes: 13 additions & 10 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,50 +78,53 @@ func NewCheckinCmd(info agentInfo, client client.Sender) *CheckinCmd {
}
}

// Execute enroll the Agent in the Fleet Server.
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, error) {
// Execute enroll the Agent in the Fleet Server. Returns the decoded check in response, a duration indicating
// how long the request took, and an error.
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, time.Duration, error) {
if err := r.Validate(); err != nil {
return nil, err
return nil, 0, err
}

b, err := json.Marshal(r)
if err != nil {
return nil, errors.New(err,
return nil, 0, errors.New(err,
"fail to encode the checkin request",
errors.TypeUnexpected)
}

cp := fmt.Sprintf(checkingPath, e.info.AgentID())
sendStart := time.Now()
resp, err := e.client.Send(ctx, "POST", cp, nil, nil, bytes.NewBuffer(b))
sendDuration := time.Now().Sub(sendStart)
if err != nil {
return nil, errors.New(err,
return nil, sendDuration, errors.New(err,
"fail to checkin to fleet-server",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cp))
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, client.ExtractError(resp.Body)
return nil, sendDuration, client.ExtractError(resp.Body)
}

rs, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.New(err, "failed to read checkin response")
return nil, sendDuration, errors.New(err, "failed to read checkin response")
}

checkinResponse := &CheckinResponse{}
decoder := json.NewDecoder(bytes.NewReader(rs))
if err := decoder.Decode(checkinResponse); err != nil {
return nil, errors.New(err,
return nil, sendDuration, errors.New(err,
"fail to decode checkin response",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, cp))
}

if err := checkinResponse.Validate(); err != nil {
return nil, err
return nil, sendDuration, err
}

return checkinResponse, nil
return checkinResponse, sendDuration, nil
}
Loading

0 comments on commit cd6ad3d

Please sign in to comment.