Skip to content

Commit 67d6b2a

Browse files
Improve logging around agent checkins. (#1477) (#1492)
Improve logging around agent checkins. - Log transient checkin errors at Info. - Upgrade to an Error log after 2 repeated failures. - Log the wait time for the next retry. - Only update local state after repeated failures. (cherry picked from commit cd6ad3d) Co-authored-by: Craig MacKenzie <[email protected]>
1 parent 8e9f183 commit 67d6b2a

File tree

10 files changed

+141
-57
lines changed

10 files changed

+141
-57
lines changed

CHANGELOG.next.asciidoc

+2
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,5 @@
208208
- Add `lumberjack` input type to the Filebeat spec. {pull}[959]
209209
- Add support for hints' based autodiscovery in kubernetes provider. {pull}[698]
210210
- Improve logging during upgrades. {pull}[1287]
211+
- Added status message to CheckinRequest {pull}[1369]
212+
- Improve logging of Fleet checkins errors. {pull}[1477]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kind: enhancement
2+
summary: Improve logging of Fleet check-in errors.
3+
description: Improve logging of Fleet check-in errors and only report the local state as degraded after two consecutive failed check-ins.
4+
pr: 1477
5+
issue: 1154

internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

+24-15
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (f *fleetGateway) worker() {
173173
// Execute the checkin call and for any errors returned by the fleet-server API
174174
// the function will retry to communicate with fleet-server with an exponential delay and some
175175
// jitter to help better distribute the load from a fleet of agents.
176-
resp, err := f.doExecute()
176+
resp, err := f.executeCheckinWithRetries()
177177
if err != nil {
178178
continue
179179
}
@@ -274,21 +274,34 @@ func (f *fleetGateway) gatherQueuedActions(ts time.Time) (queued, expired []flee
274274
return queued, expired
275275
}
276276

277-
func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
277+
func (f *fleetGateway) executeCheckinWithRetries() (*fleetapi.CheckinResponse, error) {
278278
f.backoff.Reset()
279279

280280
// Guard if the context is stopped by a out of bound call,
281281
// this mean we are rebooting to change the log level or the system is shutting us down.
282282
for f.bgContext.Err() == nil {
283283
f.log.Debugf("Checkin started")
284-
resp, err := f.execute(f.bgContext)
284+
resp, took, err := f.executeCheckin(f.bgContext)
285285
if err != nil {
286286
f.checkinFailCounter++
287-
f.log.Errorf("Could not communicate with fleet-server checkin API will retry, error: %s", err)
287+
288+
// Report the first two failures at warn level as they may be recoverable with retries.
289+
if f.checkinFailCounter <= 2 {
290+
f.log.Warnw("Possible transient error during checkin with fleet-server, retrying",
291+
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
292+
"retry_after_ns", f.backoff.NextWait())
293+
} else {
294+
// Only update the local status after repeated failures: https://github.com/elastic/elastic-agent/issues/1148
295+
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
296+
f.log.Errorw("Cannot checkin in with fleet-server, retrying",
297+
"error.message", err, "request_duration_ns", took, "failed_checkins", f.checkinFailCounter,
298+
"retry_after_ns", f.backoff.NextWait())
299+
}
300+
288301
if !f.backoff.Wait() {
289302
// Something bad has happened and we log it and we should update our current state.
290303
err := errors.New(
291-
"execute retry loop was stopped",
304+
"checkin retry loop was stopped",
292305
errors.TypeNetwork,
293306
errors.M(errors.MetaKeyURI, f.client.URI()),
294307
)
@@ -297,10 +310,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
297310
f.localReporter.Update(state.Failed, err.Error(), nil)
298311
return nil, err
299312
}
300-
if f.checkinFailCounter > 1 {
301-
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
302-
f.log.Errorf("checkin number %d failed: %s", f.checkinFailCounter, err.Error())
303-
}
304313
continue
305314
}
306315

@@ -319,7 +328,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
319328
return nil, f.bgContext.Err()
320329
}
321330

322-
func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
331+
func (f *fleetGateway) executeCheckin(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
323332
ecsMeta, err := info.Metadata()
324333
if err != nil {
325334
f.log.Error(errors.New("failed to load metadata", err))
@@ -339,23 +348,23 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
339348
Status: f.statusController.StatusString(),
340349
}
341350

342-
resp, err := cmd.Execute(ctx, req)
351+
resp, took, err := cmd.Execute(ctx, req)
343352
if isUnauth(err) {
344353
f.unauthCounter++
345354

346355
if f.shouldUnenroll() {
347356
f.log.Warnf("received an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
348357
return &fleetapi.CheckinResponse{
349358
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
350-
}, nil
359+
}, took, nil
351360
}
352361

353-
return nil, err
362+
return nil, took, err
354363
}
355364

356365
f.unauthCounter = 0
357366
if err != nil {
358-
return nil, err
367+
return nil, took, err
359368
}
360369

361370
// Save the latest ackToken
@@ -367,7 +376,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
367376
}
368377
}
369378

370-
return resp, nil
379+
return resp, took, nil
371380
}
372381

373382
// shouldUnenroll checks if the max number of trying an invalid key is reached

internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/stretchr/testify/mock"
2222
"github.com/stretchr/testify/require"
2323

24+
"github.com/elastic/elastic-agent-libs/logp"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
2526
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2627
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
@@ -692,7 +693,7 @@ func TestRetriesOnFailures(t *testing.T) {
692693
scheduler := scheduler.NewStepper()
693694
client := newTestingClient()
694695
dispatcher := newTestingDispatcher()
695-
log, _ := logger.New("fleet_gateway", false)
696+
log := newInfoLogger(t, "fleet_gateway")
696697
ctx, cancel := context.WithCancel(context.Background())
697698
defer cancel()
698699

@@ -705,8 +706,8 @@ func TestRetriesOnFailures(t *testing.T) {
705706
queue.On("Actions").Return([]fleetapi.Action{})
706707

707708
localReporter := &testutils.MockReporter{}
708-
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
709-
localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
709+
// The local state should only be reported as degraded after two consecutive failures.
710+
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Once()
710711
localReporter.On("Unregister").Maybe()
711712

712713
fleetReporter := &testutils.MockReporter{}
@@ -812,3 +813,16 @@ type testAgentInfo struct{}
812813
func (testAgentInfo) AgentID() string { return "agent-secret" }
813814

814815
type request struct{}
816+
817+
func newInfoLogger(t *testing.T, name string) *logger.Logger {
818+
t.Helper()
819+
820+
loggerCfg := logger.DefaultLoggingConfig()
821+
loggerCfg.Level = logp.InfoLevel
822+
loggerCfg.ToFiles = false
823+
loggerCfg.ToStderr = true
824+
825+
log, err := logger.NewFromConfig("", loggerCfg, false)
826+
require.NoError(t, err)
827+
return log
828+
}

internal/pkg/core/backoff/backoff.go

+5
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44

55
package backoff
66

7+
import "time"
8+
79
// Backoff defines the interface for backoff strategies.
810
type Backoff interface {
911
// Wait blocks for a duration of time governed by the backoff strategy.
1012
Wait() bool
1113

14+
// NextWait returns the duration of the next call to Wait().
15+
NextWait() time.Duration
16+
1217
// Reset resets the backoff duration to an initial value governed by the backoff strategy.
1318
Reset()
1419
}

internal/pkg/core/backoff/backoff_test.go

+39-11
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,9 @@ import (
1414

1515
type factory func(<-chan struct{}) Backoff
1616

17-
func TestBackoff(t *testing.T) {
18-
t.Run("test close channel", testCloseChannel)
19-
t.Run("test unblock after some time", testUnblockAfterInit)
20-
}
21-
22-
func testCloseChannel(t *testing.T) {
23-
init := 2 * time.Second
24-
max := 5 * time.Minute
17+
func TestCloseChannel(t *testing.T) {
18+
init := 2 * time.Millisecond
19+
max := 5 * time.Second
2520

2621
tests := map[string]factory{
2722
"ExpBackoff": func(done <-chan struct{}) Backoff {
@@ -42,9 +37,9 @@ func testCloseChannel(t *testing.T) {
4237
}
4338
}
4439

45-
func testUnblockAfterInit(t *testing.T) {
46-
init := 1 * time.Second
47-
max := 5 * time.Minute
40+
func TestUnblockAfterInit(t *testing.T) {
41+
init := 1 * time.Millisecond
42+
max := 5 * time.Second
4843

4944
tests := map[string]factory{
5045
"ExpBackoff": func(done <-chan struct{}) Backoff {
@@ -68,3 +63,36 @@ func testUnblockAfterInit(t *testing.T) {
6863
})
6964
}
7065
}
66+
67+
func TestNextWait(t *testing.T) {
68+
init := time.Millisecond
69+
max := 5 * time.Second
70+
71+
tests := map[string]factory{
72+
"ExpBackoff": func(done <-chan struct{}) Backoff {
73+
return NewExpBackoff(done, init, max)
74+
},
75+
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
76+
return NewEqualJitterBackoff(done, init, max)
77+
},
78+
}
79+
80+
for name, f := range tests {
81+
t.Run(name, func(t *testing.T) {
82+
c := make(chan struct{})
83+
b := f(c)
84+
85+
startWait := b.NextWait()
86+
assert.Equal(t, startWait, b.NextWait(), "next wait not stable")
87+
88+
startedAt := time.Now()
89+
b.Wait()
90+
waitDuration := time.Now().Sub(startedAt)
91+
nextWait := b.NextWait()
92+
93+
t.Logf("actualWait: %s startWait: %s nextWait: %s", waitDuration, startWait, nextWait)
94+
assert.Less(t, startWait, nextWait, "wait value did not increase")
95+
assert.GreaterOrEqual(t, waitDuration, startWait, "next wait duration <= actual wait duration")
96+
})
97+
}
98+
}

internal/pkg/core/backoff/equal_jitter.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ type EqualJitterBackoff struct {
1616
duration time.Duration
1717
done <-chan struct{}
1818

19-
init time.Duration
20-
max time.Duration
19+
init time.Duration
20+
max time.Duration
21+
nextRand time.Duration
2122

2223
last time.Time
2324
}
@@ -29,6 +30,7 @@ func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backof
2930
done: done,
3031
init: init,
3132
max: max,
33+
nextRand: time.Duration(rand.Int63n(int64(init))), //nolint:gosec
3234
}
3335
}
3436

@@ -38,13 +40,18 @@ func (b *EqualJitterBackoff) Reset() {
3840
b.duration = b.init * 2
3941
}
4042

43+
func (b *EqualJitterBackoff) NextWait() time.Duration {
44+
// Make sure we have always some minimal back off and jitter.
45+
temp := b.duration / 2
46+
return temp + b.nextRand
47+
}
48+
4149
// Wait block until either the timer is completed or channel is done.
4250
func (b *EqualJitterBackoff) Wait() bool {
43-
// Make sure we have always some minimal back off and jitter.
44-
temp := int64(b.duration / 2)
45-
backoff := time.Duration(temp + rand.Int63n(temp))
51+
backoff := b.NextWait()
4652

4753
// increase duration for next wait.
54+
b.nextRand = time.Duration(rand.Int63n(int64(b.duration)))
4855
b.duration *= 2
4956
if b.duration > b.max {
5057
b.duration = b.max

internal/pkg/core/backoff/exponential.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,23 @@ func (b *ExpBackoff) Reset() {
3636
b.duration = b.init
3737
}
3838

39+
func (b *ExpBackoff) NextWait() time.Duration {
40+
nextWait := b.duration
41+
nextWait *= 2
42+
if nextWait > b.max {
43+
nextWait = b.max
44+
}
45+
return nextWait
46+
}
47+
3948
// Wait block until either the timer is completed or channel is done.
4049
func (b *ExpBackoff) Wait() bool {
41-
backoff := b.duration
42-
b.duration *= 2
43-
if b.duration > b.max {
44-
b.duration = b.max
45-
}
50+
b.duration = b.NextWait()
4651

4752
select {
4853
case <-b.done:
4954
return false
50-
case <-time.After(backoff):
55+
case <-time.After(b.duration):
5156
b.last = time.Now()
5257
return true
5358
}

internal/pkg/fleetapi/checkin_cmd.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -77,50 +77,53 @@ func NewCheckinCmd(info agentInfo, client client.Sender) *CheckinCmd {
7777
}
7878
}
7979

80-
// Execute enroll the Agent in the Fleet Server.
81-
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, error) {
80+
// Execute enroll the Agent in the Fleet Server. Returns the decoded check in response, a duration indicating
81+
// how long the request took, and an error.
82+
func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinResponse, time.Duration, error) {
8283
if err := r.Validate(); err != nil {
83-
return nil, err
84+
return nil, 0, err
8485
}
8586

8687
b, err := json.Marshal(r)
8788
if err != nil {
88-
return nil, errors.New(err,
89+
return nil, 0, errors.New(err,
8990
"fail to encode the checkin request",
9091
errors.TypeUnexpected)
9192
}
9293

9394
cp := fmt.Sprintf(checkingPath, e.info.AgentID())
95+
sendStart := time.Now()
9496
resp, err := e.client.Send(ctx, "POST", cp, nil, nil, bytes.NewBuffer(b))
97+
sendDuration := time.Now().Sub(sendStart)
9598
if err != nil {
96-
return nil, errors.New(err,
99+
return nil, sendDuration, errors.New(err,
97100
"fail to checkin to fleet-server",
98101
errors.TypeNetwork,
99102
errors.M(errors.MetaKeyURI, cp))
100103
}
101104
defer resp.Body.Close()
102105

103106
if resp.StatusCode != http.StatusOK {
104-
return nil, client.ExtractError(resp.Body)
107+
return nil, sendDuration, client.ExtractError(resp.Body)
105108
}
106109

107110
rs, err := ioutil.ReadAll(resp.Body)
108111
if err != nil {
109-
return nil, errors.New(err, "failed to read checkin response")
112+
return nil, sendDuration, errors.New(err, "failed to read checkin response")
110113
}
111114

112115
checkinResponse := &CheckinResponse{}
113116
decoder := json.NewDecoder(bytes.NewReader(rs))
114117
if err := decoder.Decode(checkinResponse); err != nil {
115-
return nil, errors.New(err,
118+
return nil, sendDuration, errors.New(err,
116119
"fail to decode checkin response",
117120
errors.TypeNetwork,
118121
errors.M(errors.MetaKeyURI, cp))
119122
}
120123

121124
if err := checkinResponse.Validate(); err != nil {
122-
return nil, err
125+
return nil, sendDuration, err
123126
}
124127

125-
return checkinResponse, nil
128+
return checkinResponse, sendDuration, nil
126129
}

0 commit comments

Comments
 (0)