Skip to content

Commit

Permalink
Update Agent to support backpressure from server (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
achawla2012 authored May 10, 2023
1 parent b12b293 commit 2850ca0
Show file tree
Hide file tree
Showing 62 changed files with 4,385 additions and 577 deletions.
42 changes: 42 additions & 0 deletions docs/proto/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
- [AgentDetails](#f5-nginx-agent-sdk-AgentDetails)
- [AgentLogging](#f5-nginx-agent-sdk-AgentLogging)
- [AgentMeta](#f5-nginx-agent-sdk-AgentMeta)
- [Backoff](#f5-nginx-agent-sdk-Backoff)
- [Server](#f5-nginx-agent-sdk-Server)

- [AgentConnectStatus.StatusCode](#f5-nginx-agent-sdk-AgentConnectStatus-StatusCode)
- [AgentLogging.Level](#f5-nginx-agent-sdk-AgentLogging-Level)
Expand Down Expand Up @@ -204,6 +206,7 @@ Represents agent details. This message is sent from the management server to the
| extensions | [string](#string) | repeated | List of agent extensions that are enabled |
| tags | [string](#string) | repeated | List of tags |
| alias | [string](#string) | | Alias name for the agent |
| server | [Server](#f5-nginx-agent-sdk-Server) | | Server setting for the agent |



Expand Down Expand Up @@ -251,6 +254,45 @@ Represents agent metadata




<a name="f5-nginx-agent-sdk-Backoff"></a>

### Backoff



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| initial_interval | [int64](#int64) | | First backoff time interval in seconds |
| randomization_factor | [double](#double) | | Random value used to create range around next backoff interval |
| multiplier | [double](#double) | | Value to be multiplied with current backoff interval |
| max_interval | [int64](#int64) | | Max interval in seconds between two retries |
| max_elapsed_time | [int64](#int64) | | Elapsed time in seconds after which backoff stops. It never stops if max_elapsed_time == 0. |






<a name="f5-nginx-agent-sdk-Server"></a>

### Server



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| host | [string](#string) | | Host name or IP of the host to connect to |
| grpc_port | [int32](#int32) | | Grpc port to connect to |
| token | [string](#string) | | Shared secrect between the server and client |
| metrics | [string](#string) | | Metrics server name |
| command | [string](#string) | | Command server name |
| backoff | [Backoff](#f5-nginx-agent-sdk-Backoff) | | Backoff settings for exponential retry and backoff |








Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (

require (
github.com/bufbuild/buf v1.17.0
github.com/evilmartians/lefthook v1.3.10
github.com/evilmartians/lefthook v1.3.12
github.com/go-resty/resty/v2 v2.7.0
github.com/go-swagger/go-swagger v0.30.4
github.com/goreleaser/nfpm/v2 v2.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.3.0-java h1:bV5JGEB1ouEzZa0hgVDFFiClrUEuGWRaAc/3mxR2QK0=
github.com/envoyproxy/protoc-gen-validate v0.3.0-java/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evilmartians/lefthook v1.3.10 h1:tGnI6JAW74/ZLXIESrdLjWQpPJEfjRmfzY90W3r+rHU=
github.com/evilmartians/lefthook v1.3.10/go.mod h1:Ybinyhjf5wdloc78pkBV2K0XZRvdnsIova48LAco5HU=
github.com/evilmartians/lefthook v1.3.12 h1:06BAk0p3pJ8TMnVl9gjPVPAPWWXJbREryvmwi4FfBfo=
github.com/evilmartians/lefthook v1.3.12/go.mod h1:Ybinyhjf5wdloc78pkBV2K0XZRvdnsIova48LAco5HU=
github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
Expand Down
24 changes: 15 additions & 9 deletions sdk/backoff.go → sdk/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* LICENSE file in the root directory of this source tree.
*/

package sdk
package backoff

import (
"context"
Expand All @@ -19,19 +19,25 @@ const (
BACKOFF_MULTIPLIER = backoff.DefaultMultiplier
)

type BackoffSettings struct {
InitialInterval time.Duration
MaxInterval time.Duration
MaxElapsedTime time.Duration
Multiplier float64
Jitter float64
}

func WaitUntil(
ctx context.Context,
initialInterval time.Duration,
maxInterval time.Duration,
maxElapsedTime time.Duration,
backoffSettings BackoffSettings,
operation backoff.Operation,
) error {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = initialInterval
exponentialBackoff.MaxInterval = maxInterval
exponentialBackoff.MaxElapsedTime = maxElapsedTime
exponentialBackoff.RandomizationFactor = BACKOFF_JITTER
exponentialBackoff.Multiplier = BACKOFF_MULTIPLIER
exponentialBackoff.InitialInterval = backoffSettings.InitialInterval
exponentialBackoff.MaxInterval = backoffSettings.MaxInterval
exponentialBackoff.MaxElapsedTime = backoffSettings.MaxElapsedTime
exponentialBackoff.RandomizationFactor = backoffSettings.Jitter
exponentialBackoff.Multiplier = backoffSettings.Multiplier

expoBackoffWithContext := backoff.WithContext(exponentialBackoff, ctx)

Expand Down
11 changes: 9 additions & 2 deletions sdk/backoff_test.go → sdk/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* LICENSE file in the root directory of this source tree.
*/

package sdk
package backoff

import (
"context"
Expand Down Expand Up @@ -87,7 +87,14 @@ func TestBackOff(t *testing.T) {
}

for _, test := range tests {
result := WaitUntil(test.context, test.initialInterval, test.maxInterval, test.maxElapsedTime, test.operation)
backoff := BackoffSettings{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
Jitter: BACKOFF_JITTER,
Multiplier: BACKOFF_MULTIPLIER,
}
result := WaitUntil(test.context, backoff, test.operation)

if test.expectedError {
assert.Errorf(t, result, test.name)
Expand Down
21 changes: 8 additions & 13 deletions sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@ import (

"google.golang.org/grpc"

"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/interceptors"
"github.com/nginx/agent/sdk/v2/proto"
)

type BackoffSettings struct {
initialInterval time.Duration
maxInterval time.Duration
maxTimeout time.Duration
sendMaxTimeout time.Duration
}

type MsgClassification int

const (
Expand All @@ -35,11 +29,12 @@ const (
)

var (
DefaultBackoffSettings = BackoffSettings{
initialInterval: 10 * time.Second,
maxInterval: 60 * time.Second,
maxTimeout: 0,
sendMaxTimeout: 2 * time.Minute,
DefaultBackoffSettings = backoff.BackoffSettings{
InitialInterval: 10 * time.Second,
MaxInterval: 60 * time.Second,
MaxElapsedTime: 2 * time.Minute,
Jitter: backoff.BACKOFF_JITTER,
Multiplier: backoff.BACKOFF_MULTIPLIER,
}
)

Expand Down Expand Up @@ -68,7 +63,7 @@ type (
WithInterceptor(interceptor interceptors.Interceptor) Client
WithClientInterceptor(interceptor interceptors.ClientInterceptor) Client

WithBackoffSettings(backoffSettings BackoffSettings) Client
WithBackoffSettings(backoffSettings backoff.BackoffSettings) Client
}
MetricReporter interface {
Client
Expand Down
24 changes: 11 additions & 13 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/status"

"github.com/nginx/agent/sdk/v2"
"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/checksum"
sdkGRPC "github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/sdk/v2/interceptors"
Expand Down Expand Up @@ -49,7 +49,7 @@ type commander struct {
downloadChan chan *proto.DataChunk
ctx context.Context
mu sync.Mutex
backoffSettings BackoffSettings
backoffSettings backoff.BackoffSettings
}

func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client {
Expand All @@ -71,11 +71,9 @@ func (c *commander) Connect(ctx context.Context) error {
log.Debugf("Commander connecting to %s", c.server)

c.ctx = ctx
err := sdk.WaitUntil(
err := backoff.WaitUntil(
c.ctx,
c.backoffSettings.initialInterval,
c.backoffSettings.maxInterval,
c.backoffSettings.maxTimeout,
c.backoffSettings,
c.createClient,
)
if err != nil {
Expand Down Expand Up @@ -122,7 +120,7 @@ func (c *commander) ChunksSize() int {
return c.chunkSize
}

func (c *commander) WithBackoffSettings(backoffSettings BackoffSettings) Client {
func (c *commander) WithBackoffSettings(backoffSettings backoff.BackoffSettings) Client {
c.backoffSettings = backoffSettings
return c
}
Expand All @@ -136,13 +134,13 @@ func (c *commander) Send(ctx context.Context, message Message) error {
switch message.Classification() {
case MsgClassificationCommand:
if cmd, ok = message.Raw().(*proto.Command); !ok {
return fmt.Errorf("Expected a command message, but received %T", message.Data())
return fmt.Errorf("expected a command message, but received %T", message.Data())
}
default:
return fmt.Errorf("Expected a command message, but received %T", message.Data())
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if err := c.channel.Send(cmd); err != nil {
return c.handleGrpcError("Commander Channel Send", err)
}
Expand All @@ -163,7 +161,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}

err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
var (
header *proto.DataChunk_Header
body []byte
Expand Down Expand Up @@ -228,7 +226,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

return sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
sender, err := c.client.Upload(c.ctx)
if err != nil {
return c.handleGrpcError("Commander Upload", err)
Expand Down Expand Up @@ -314,7 +312,7 @@ func (c *commander) createClient() error {
func (c *commander) recvLoop() {
log.Debug("Commander receive loop starting")
for {
err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.maxTimeout, func() error {
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion sdk/client/commander_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"time"

"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/interceptors"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (m *MockCommandClient) WithConnWaitDuration(d time.Duration) Client {
return m
}

func (m *MockCommandClient) WithBackoffSettings(backoffSettings BackoffSettings) Client {
func (m *MockCommandClient) WithBackoffSettings(backoffSettings backoff.BackoffSettings) Client {
m.Called(backoffSettings)

return m
Expand Down
Loading

0 comments on commit 2850ca0

Please sign in to comment.