Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Agent to support backpressure from server #299

Merged
merged 21 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/proto/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
- [AgentDetails](#f5-nginx-agent-sdk-AgentDetails)
- [AgentLogging](#f5-nginx-agent-sdk-AgentLogging)
- [AgentMeta](#f5-nginx-agent-sdk-AgentMeta)
- [Backoff](#f5-nginx-agent-sdk-Backoff)
- [Server](#f5-nginx-agent-sdk-Server)

- [AgentConnectStatus.StatusCode](#f5-nginx-agent-sdk-AgentConnectStatus-StatusCode)
- [AgentLogging.Level](#f5-nginx-agent-sdk-AgentLogging-Level)
Expand Down Expand Up @@ -204,6 +206,7 @@ Represents agent details. This message is sent from the management server to the
| extensions | [string](#string) | repeated | List of agent extensions that are enabled |
| tags | [string](#string) | repeated | List of tags |
| alias | [string](#string) | | Alias name for the agent |
| server | [Server](#f5-nginx-agent-sdk-Server) | | Server setting for the agent |



Expand Down Expand Up @@ -251,6 +254,45 @@ Represents agent metadata




<a name="f5-nginx-agent-sdk-Backoff"></a>

### Backoff



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| initial_interval | [int64](#int64) | | First backoff time interval in seconds |
| randomization_factor | [double](#double) | | Random value used to create range around next backoff interval |
| multiplier | [double](#double) | | Value to be multiplied with current backoff interval |
| max_interval | [int64](#int64) | | Max interval in seconds between two retries |
| max_elapsed_time | [int64](#int64) | | Elapsed time in seconds after which backoff stops. It never stops if max_elapsed_time == 0. |






<a name="f5-nginx-agent-sdk-Server"></a>

### Server



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| host | [string](#string) | | Host name or IP of the host to connect to |
| grpc_port | [int32](#int32) | | Grpc port to connect to |
| token | [string](#string) | | Shared secrect between the server and client |
| metrics | [string](#string) | | Metrics server name |
| command | [string](#string) | | Command server name |
| backoff | [Backoff](#f5-nginx-agent-sdk-Backoff) | | Backoff settings for exponential retry and backoff |








Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (

require (
github.com/bufbuild/buf v1.17.0
github.com/evilmartians/lefthook v1.3.10
github.com/evilmartians/lefthook v1.3.12
github.com/go-resty/resty/v2 v2.7.0
github.com/go-swagger/go-swagger v0.30.4
github.com/goreleaser/nfpm/v2 v2.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.3.0-java h1:bV5JGEB1ouEzZa0hgVDFFiClrUEuGWRaAc/3mxR2QK0=
github.com/envoyproxy/protoc-gen-validate v0.3.0-java/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evilmartians/lefthook v1.3.10 h1:tGnI6JAW74/ZLXIESrdLjWQpPJEfjRmfzY90W3r+rHU=
github.com/evilmartians/lefthook v1.3.10/go.mod h1:Ybinyhjf5wdloc78pkBV2K0XZRvdnsIova48LAco5HU=
github.com/evilmartians/lefthook v1.3.12 h1:06BAk0p3pJ8TMnVl9gjPVPAPWWXJbREryvmwi4FfBfo=
github.com/evilmartians/lefthook v1.3.12/go.mod h1:Ybinyhjf5wdloc78pkBV2K0XZRvdnsIova48LAco5HU=
github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
Expand Down
24 changes: 15 additions & 9 deletions sdk/backoff.go → sdk/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* LICENSE file in the root directory of this source tree.
*/

package sdk
package backoff

import (
"context"
Expand All @@ -19,19 +19,25 @@ const (
BACKOFF_MULTIPLIER = backoff.DefaultMultiplier
)

type BackoffSettings struct {
InitialInterval time.Duration
MaxInterval time.Duration
MaxElapsedTime time.Duration
Multiplier float64
Jitter float64
}

func WaitUntil(
ctx context.Context,
initialInterval time.Duration,
maxInterval time.Duration,
maxElapsedTime time.Duration,
backoffSettings BackoffSettings,
operation backoff.Operation,
) error {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = initialInterval
exponentialBackoff.MaxInterval = maxInterval
exponentialBackoff.MaxElapsedTime = maxElapsedTime
exponentialBackoff.RandomizationFactor = BACKOFF_JITTER
exponentialBackoff.Multiplier = BACKOFF_MULTIPLIER
exponentialBackoff.InitialInterval = backoffSettings.InitialInterval
exponentialBackoff.MaxInterval = backoffSettings.MaxInterval
exponentialBackoff.MaxElapsedTime = backoffSettings.MaxElapsedTime
exponentialBackoff.RandomizationFactor = backoffSettings.Jitter
exponentialBackoff.Multiplier = backoffSettings.Multiplier

expoBackoffWithContext := backoff.WithContext(exponentialBackoff, ctx)

Expand Down
11 changes: 9 additions & 2 deletions sdk/backoff_test.go → sdk/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* LICENSE file in the root directory of this source tree.
*/

package sdk
package backoff

import (
"context"
Expand Down Expand Up @@ -87,7 +87,14 @@ func TestBackOff(t *testing.T) {
}

for _, test := range tests {
result := WaitUntil(test.context, test.initialInterval, test.maxInterval, test.maxElapsedTime, test.operation)
backoff := BackoffSettings{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
Jitter: BACKOFF_JITTER,
Multiplier: BACKOFF_MULTIPLIER,
}
result := WaitUntil(test.context, backoff, test.operation)

if test.expectedError {
assert.Errorf(t, result, test.name)
Expand Down
21 changes: 8 additions & 13 deletions sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@ import (

"google.golang.org/grpc"

"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/interceptors"
"github.com/nginx/agent/sdk/v2/proto"
)

type BackoffSettings struct {
initialInterval time.Duration
maxInterval time.Duration
maxTimeout time.Duration
sendMaxTimeout time.Duration
}

type MsgClassification int

const (
Expand All @@ -35,11 +29,12 @@ const (
)

var (
DefaultBackoffSettings = BackoffSettings{
initialInterval: 10 * time.Second,
maxInterval: 60 * time.Second,
maxTimeout: 0,
sendMaxTimeout: 2 * time.Minute,
DefaultBackoffSettings = backoff.BackoffSettings{
InitialInterval: 10 * time.Second,
MaxInterval: 60 * time.Second,
MaxElapsedTime: 2 * time.Minute,
Jitter: backoff.BACKOFF_JITTER,
Multiplier: backoff.BACKOFF_MULTIPLIER,
}
)

Expand Down Expand Up @@ -68,7 +63,7 @@ type (
WithInterceptor(interceptor interceptors.Interceptor) Client
WithClientInterceptor(interceptor interceptors.ClientInterceptor) Client

WithBackoffSettings(backoffSettings BackoffSettings) Client
WithBackoffSettings(backoffSettings backoff.BackoffSettings) Client
}
MetricReporter interface {
Client
Expand Down
24 changes: 11 additions & 13 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/status"

"github.com/nginx/agent/sdk/v2"
"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/checksum"
sdkGRPC "github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/sdk/v2/interceptors"
Expand Down Expand Up @@ -49,7 +49,7 @@ type commander struct {
downloadChan chan *proto.DataChunk
ctx context.Context
mu sync.Mutex
backoffSettings BackoffSettings
backoffSettings backoff.BackoffSettings
}

func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client {
Expand All @@ -71,11 +71,9 @@ func (c *commander) Connect(ctx context.Context) error {
log.Debugf("Commander connecting to %s", c.server)

c.ctx = ctx
err := sdk.WaitUntil(
err := backoff.WaitUntil(
c.ctx,
c.backoffSettings.initialInterval,
c.backoffSettings.maxInterval,
c.backoffSettings.maxTimeout,
c.backoffSettings,
c.createClient,
)
if err != nil {
Expand Down Expand Up @@ -122,7 +120,7 @@ func (c *commander) ChunksSize() int {
return c.chunkSize
}

func (c *commander) WithBackoffSettings(backoffSettings BackoffSettings) Client {
func (c *commander) WithBackoffSettings(backoffSettings backoff.BackoffSettings) Client {
c.backoffSettings = backoffSettings
return c
}
Expand All @@ -136,13 +134,13 @@ func (c *commander) Send(ctx context.Context, message Message) error {
switch message.Classification() {
case MsgClassificationCommand:
if cmd, ok = message.Raw().(*proto.Command); !ok {
return fmt.Errorf("Expected a command message, but received %T", message.Data())
return fmt.Errorf("expected a command message, but received %T", message.Data())
}
default:
return fmt.Errorf("Expected a command message, but received %T", message.Data())
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if err := c.channel.Send(cmd); err != nil {
return c.handleGrpcError("Commander Channel Send", err)
}
Expand All @@ -163,7 +161,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}

err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
var (
header *proto.DataChunk_Header
body []byte
Expand Down Expand Up @@ -228,7 +226,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

return sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.sendMaxTimeout, func() error {
return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
sender, err := c.client.Upload(c.ctx)
if err != nil {
return c.handleGrpcError("Commander Upload", err)
Expand Down Expand Up @@ -314,7 +312,7 @@ func (c *commander) createClient() error {
func (c *commander) recvLoop() {
log.Debug("Commander receive loop starting")
for {
err := sdk.WaitUntil(c.ctx, c.backoffSettings.initialInterval, c.backoffSettings.maxInterval, c.backoffSettings.maxTimeout, func() error {
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion sdk/client/commander_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"time"

"github.com/nginx/agent/sdk/v2/backoff"
"github.com/nginx/agent/sdk/v2/interceptors"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (m *MockCommandClient) WithConnWaitDuration(d time.Duration) Client {
return m
}

func (m *MockCommandClient) WithBackoffSettings(backoffSettings BackoffSettings) Client {
func (m *MockCommandClient) WithBackoffSettings(backoffSettings backoff.BackoffSettings) Client {
m.Called(backoffSettings)

return m
Expand Down
Loading