Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Agent to support backpressure from server #299

Merged
merged 21 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/proto/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
- [AgentDetails](#f5-nginx-agent-sdk-AgentDetails)
- [AgentLogging](#f5-nginx-agent-sdk-AgentLogging)
- [AgentMeta](#f5-nginx-agent-sdk-AgentMeta)
- [Backoff](#f5-nginx-agent-sdk-Backoff)
- [Server](#f5-nginx-agent-sdk-Server)

- [AgentConnectStatus.StatusCode](#f5-nginx-agent-sdk-AgentConnectStatus-StatusCode)
- [AgentLogging.Level](#f5-nginx-agent-sdk-AgentLogging-Level)
Expand Down Expand Up @@ -204,6 +206,7 @@ Represents agent details. This message is sent from the management server to the
| extensions | [string](#string) | repeated | List of agent extensions that are enabled |
| tags | [string](#string) | repeated | List of tags |
| alias | [string](#string) | | Alias name for the agent |
| server | [Server](#f5-nginx-agent-sdk-Server) | | Server setting for the agent |



Expand Down Expand Up @@ -251,6 +254,45 @@ Represents agent metadata




<a name="f5-nginx-agent-sdk-Backoff"></a>

### Backoff



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| initial_interval | [int64](#int64) | | First backoff time interval in seconds |
| randomization_factor | [double](#double) | | Random value used to create range around next backoff interval |
| multiplier | [double](#double) | | Value to be multiplied with current backoff interval |
| max_interval | [int64](#int64) | | Max interval in seconds between two retries |
| max_elapsed_time | [int64](#int64) | | Elapsed time in seconds after which backoff stops. It never stops if max_elapsed_time == 0. |






<a name="f5-nginx-agent-sdk-Server"></a>

### Server



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| host | [string](#string) | | Host name or IP of the host to connect to |
| grpc_port | [int32](#int32) | | Grpc port to connect to |
| token | [string](#string) | | Shared secrect between the server and client |
| metrics | [string](#string) | | Metrics server name |
| command | [string](#string) | | Command server name |
| backoff | [Backoff](#f5-nginx-agent-sdk-Backoff) | | Backoff settings for exponential retry and backoff |








Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (

require (
github.com/bufbuild/buf v1.17.0
github.com/evilmartians/lefthook v1.3.10
github.com/evilmartians/lefthook v1.3.12
github.com/go-resty/resty/v2 v2.7.0
github.com/go-swagger/go-swagger v0.30.4
github.com/goreleaser/nfpm/v2 v2.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.3.0-java h1:bV5JGEB1ouEzZa0hgVDFFiClrUEuGWRaAc/3mxR2QK0=
github.com/envoyproxy/protoc-gen-validate v0.3.0-java/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evilmartians/lefthook v1.3.10 h1:tGnI6JAW74/ZLXIESrdLjWQpPJEfjRmfzY90W3r+rHU=
github.com/evilmartians/lefthook v1.3.10/go.mod h1:Ybinyhjf5wdloc78pkBV2K0XZRvdnsIova48LAco5HU=
github.com/evilmartians/lefthook v1.3.12 h1:06BAk0p3pJ8TMnVl9gjPVPAPWWXJbREryvmwi4FfBfo=
github.com/evilmartians/lefthook v1.3.12/go.mod h1:Ybinyhjf5wdloc78pkBV2K0XZRvdnsIova48LAco5HU=
github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
Expand Down
26 changes: 26 additions & 0 deletions sdk/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,29 @@ func WaitUntil(

return nil
}

func WaitUntilWithJitterAndMultiplier(
ctx context.Context,
initialInterval time.Duration,
maxInterval time.Duration,
maxElapsedTime time.Duration,
jitter float64,
multiplier float64,
operation backoff.Operation,
) error {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = initialInterval
exponentialBackoff.MaxInterval = maxInterval
exponentialBackoff.MaxElapsedTime = maxElapsedTime
exponentialBackoff.RandomizationFactor = jitter
exponentialBackoff.Multiplier = multiplier

expoBackoffWithContext := backoff.WithContext(exponentialBackoff, ctx)

err := backoff.Retry(backoff.Operation(operation), expoBackoffWithContext)
if err != nil {
return err
}

return nil
}
21 changes: 13 additions & 8 deletions sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ import (

"google.golang.org/grpc"

"github.com/nginx/agent/sdk/v2"
"github.com/nginx/agent/sdk/v2/interceptors"
"github.com/nginx/agent/sdk/v2/proto"
)

type BackoffSettings struct {
initialInterval time.Duration
maxInterval time.Duration
maxTimeout time.Duration
sendMaxTimeout time.Duration
initialInterval time.Duration
maxInterval time.Duration
maxTimeout time.Duration
sendMaxTimeout time.Duration
multiplier float64
randomization_factor float64
}

type MsgClassification int
Expand All @@ -36,10 +39,12 @@ const (

var (
DefaultBackoffSettings = BackoffSettings{
initialInterval: 10 * time.Second,
maxInterval: 60 * time.Second,
maxTimeout: 0,
sendMaxTimeout: 2 * time.Minute,
initialInterval: 10 * time.Second,
maxInterval: 60 * time.Second,
maxTimeout: 0,
sendMaxTimeout: 2 * time.Minute,
randomization_factor: sdk.BACKOFF_JITTER,
multiplier: sdk.BACKOFF_MULTIPLIER,
}
)

Expand Down
86 changes: 64 additions & 22 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"
"strconv"
"sync"
"time"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand Down Expand Up @@ -136,15 +137,15 @@ func (c *commander) Send(ctx context.Context, message Message) error {
switch message.Classification() {
case MsgClassificationCommand:
if cmd, ok = message.Raw().(*proto.Command); !ok {
return fmt.Errorf("Expected a command message, but received %T", message.Data())
return fmt.Errorf("expected a command message, but received %T", message.Data())
}
default:
return fmt.Errorf("Expected a command message, but received %T", message.Data())
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
if err := c.channel.Send(cmd); err != nil {
return c.handleGrpcError("Commander Channel Send", err)
return c.handleGrpcError("Commander Channel Send", err, nil)
}

log.Tracef("Commander sent command %v", cmd)
Expand All @@ -171,14 +172,14 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr

downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata})
if err != nil {
return c.handleGrpcError("Commander Downloader", err)
return c.handleGrpcError("Commander Downloader", err, nil)
}

LOOP:
for {
chunk, err := downloader.Recv()
if err != nil && err != io.EOF {
return c.handleGrpcError("Commander Downloader", err)
return c.handleGrpcError("Commander Downloader", err, nil)
}

if chunk == nil {
Expand Down Expand Up @@ -231,7 +232,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
return sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
sender, err := c.client.Upload(c.ctx)
if err != nil {
return c.handleGrpcError("Commander Upload", err)
return c.handleGrpcError("Commander Upload", err, nil)
}

err = sender.Send(&proto.DataChunk{
Expand All @@ -245,7 +246,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
})
if err != nil {
return c.handleGrpcError("Commander Upload Header", err)
return c.handleGrpcError("Commander Upload Header", err, nil)
}

for id, chunk := range chunks {
Expand All @@ -259,14 +260,14 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
},
}); err != nil {
return c.handleGrpcError("Commander Upload"+strconv.Itoa(id), err)
return c.handleGrpcError("Commander Upload"+strconv.Itoa(id), err, nil)
}
}

log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks))
status, err := sender.CloseAndRecv()
if err != nil {
return c.handleGrpcError("Commander Upload CloseAndRecv", err)
return c.handleGrpcError("Commander Upload CloseAndRecv", err, nil)
}

if status.Status != proto.UploadStatus_OK {
Expand Down Expand Up @@ -314,27 +315,28 @@ func (c *commander) createClient() error {
func (c *commander) recvLoop() {
log.Debug("Commander receive loop starting")
for {
err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.maxTimeout, func() error {
cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
return c.handleGrpcError("Commander Channel Recv", err)
}
err := sdk.WaitUntilWithJitterAndMultiplier(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval,
c.backoffSettings.maxTimeout, c.backoffSettings.randomization_factor, c.backoffSettings.multiplier, func() error {
cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
return c.handleGrpcError("Commander Channel Recv", err, cmd)
}

select {
case <-c.ctx.Done():
case c.recvChan <- MessageFromCommand(cmd):
}
select {
case <-c.ctx.Done():
case c.recvChan <- MessageFromCommand(cmd):
}

return nil
})
return nil
})
if err != nil {
log.Errorf("Error retrying to receive messages from the commander channel: %v", err)
}
}
}

func (c *commander) handleGrpcError(messagePrefix string, err error) error {
func (c *commander) handleGrpcError(messagePrefix string, err error, cmd *proto.Command) error {
if st, ok := status.FromError(err); ok {
log.Errorf("%s: error communicating with %s, code=%s, message=%v", messagePrefix, c.grpc.Target(), st.Code().String(), st.Message())
} else if err == io.EOF {
Expand All @@ -346,5 +348,45 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error {
log.Infof("%s: retrying to connect to %s", messagePrefix, c.grpc.Target())
_ = c.createClient()

c.resetBackoffSettings(cmd)
return err
}

func (c *commander) resetBackoffSettings(cmd *proto.Command) {
if cmd == nil {
return
}
if cmd.GetAgentConfig() == nil {
return
}
if cmd.GetAgentConfig().GetDetails() == nil {
return
}
if cmd.GetAgentConfig().GetDetails().GetServer() == nil {
return
}
sBackoff := cmd.GetAgentConfig().GetDetails().GetServer().Backoff
if sBackoff == nil {
return
}

smultiplier := sdk.BACKOFF_MULTIPLIER
if sBackoff.GetMultiplier() != 0 {
smultiplier = sBackoff.GetMultiplier()
}

srandomization_factor := sdk.BACKOFF_JITTER
if sBackoff.GetRandomizationFactor() != 0 {
srandomization_factor = sBackoff.GetRandomizationFactor()
}

cBackoff := BackoffSettings{
initialInterval: time.Duration(sBackoff.InitialInterval),
maxInterval: time.Duration(sBackoff.MaxInterval),
sendMaxTimeout: time.Duration(sBackoff.MaxElapsedTime),
multiplier: smultiplier,
randomization_factor: srandomization_factor,
}
log.Infof("reset client backoff settings to %+v, for a pause command %+v", cBackoff, cmd)
c.WithBackoffSettings(cBackoff)
}
Loading