diff --git a/sdk/client/commander.go b/sdk/client/commander.go index d3a3f53fb0..a35375f2a0 100644 --- a/sdk/client/commander.go +++ b/sdk/client/commander.go @@ -36,6 +36,7 @@ func NewCommanderClient() Commander { connector: newConnector(), chunkSize: DefaultChunkSize, backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -47,9 +48,10 @@ type commander struct { recvChan chan Message downloadChan chan *proto.DataChunk ctx context.Context - mu sync.Mutex backoffSettings backoff.BackoffSettings cancel context.CancelFunc + isRetrying bool + retryLock sync.Mutex } func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -154,17 +156,20 @@ func (c *commander) Send(ctx context.Context, message Message) error { return fmt.Errorf("expected a command message, but received %T", message.Data()) } - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + if err := c.channel.Send(cmd); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) } @@ -183,16 +188,19 @@ func (c *commander) Recv() <-chan Message { func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) { log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId()) cfg := &proto.NginxConfig{} - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + var ( header *proto.DataChunk_Header body []byte @@ -200,7 +208,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata}) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -208,7 +216,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr for { chunk, err := downloader.Recv() if err != nil && err != io.EOF { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -259,20 +267,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI payloadChecksum := checksum.Checksum(payload) chunks := checksum.Chunk(payload, c.chunkSize) - isRetrying := false - return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() sender, err := c.client.Upload(c.ctx) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload", err) } @@ -287,7 +296,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload Header", err) } @@ -302,7 +311,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }, }); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err) } } @@ -310,7 +319,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks)) status, err := sender.CloseAndRecv() if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload CloseAndRecv", err) } @@ -324,8 +333,6 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI func (c *commander) createClient() error { log.Debug("Creating commander client") - c.mu.Lock() - defer c.mu.Unlock() // Making sure that the previous client connection is closed before creating a new one if c.grpc != nil { @@ -353,6 +360,8 @@ func (c *commander) createClient() error { } c.channel = channel + c.isRetrying = false + return nil } @@ -364,24 +373,26 @@ loop: case <-ctx.Done(): break loop default: - isRetrying := false err := backoff.WaitUntil(ctx, c.backoffSettings, func() error { select { case <-ctx.Done(): return nil default: - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() cmd, err := c.channel.Recv() log.Infof("Commander received %v, %v", cmd, err) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Recv", err) } @@ -413,3 +424,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error { return err } + +func (c *commander) setIsRetrying(value bool) { + c.retryLock.Lock() + defer c.retryLock.Unlock() + c.isRetrying = value +} diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go index d3a3f53fb0..a35375f2a0 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go @@ -36,6 +36,7 @@ func NewCommanderClient() Commander { connector: newConnector(), chunkSize: DefaultChunkSize, backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -47,9 +48,10 @@ type commander struct { recvChan chan Message downloadChan chan *proto.DataChunk ctx context.Context - mu sync.Mutex backoffSettings backoff.BackoffSettings cancel context.CancelFunc + isRetrying bool + retryLock sync.Mutex } func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -154,17 +156,20 @@ func (c *commander) Send(ctx context.Context, message Message) error { return fmt.Errorf("expected a command message, but received %T", message.Data()) } - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + if err := c.channel.Send(cmd); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) } @@ -183,16 +188,19 @@ func (c *commander) Recv() <-chan Message { func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) { log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId()) cfg := &proto.NginxConfig{} - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + var ( header *proto.DataChunk_Header body []byte @@ -200,7 +208,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata}) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -208,7 +216,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr for { chunk, err := downloader.Recv() if err != nil && err != io.EOF { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -259,20 +267,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI payloadChecksum := checksum.Checksum(payload) chunks := checksum.Chunk(payload, c.chunkSize) - isRetrying := false - return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() sender, err := c.client.Upload(c.ctx) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload", err) } @@ -287,7 +296,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload Header", err) } @@ -302,7 +311,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }, }); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err) } } @@ -310,7 +319,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks)) status, err := sender.CloseAndRecv() if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload CloseAndRecv", err) } @@ -324,8 +333,6 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI func (c *commander) createClient() error { log.Debug("Creating commander client") - c.mu.Lock() - defer c.mu.Unlock() // Making sure that the previous client connection is closed before creating a new one if c.grpc != nil { @@ -353,6 +360,8 @@ func (c *commander) createClient() error { } c.channel = channel + c.isRetrying = false + return nil } @@ -364,24 +373,26 @@ loop: case <-ctx.Done(): break loop default: - isRetrying := false err := backoff.WaitUntil(ctx, c.backoffSettings, func() error { select { case <-ctx.Done(): return nil default: - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() cmd, err := c.channel.Recv() log.Infof("Commander received %v, %v", cmd, err) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Recv", err) } @@ -413,3 +424,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error { return err } + +func (c *commander) setIsRetrying(value bool) { + c.retryLock.Lock() + defer c.retryLock.Unlock() + c.isRetrying = value +} diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go index d3a3f53fb0..a35375f2a0 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go @@ -36,6 +36,7 @@ func NewCommanderClient() Commander { connector: newConnector(), chunkSize: DefaultChunkSize, backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -47,9 +48,10 @@ type commander struct { recvChan chan Message downloadChan chan *proto.DataChunk ctx context.Context - mu sync.Mutex backoffSettings backoff.BackoffSettings cancel context.CancelFunc + isRetrying bool + retryLock sync.Mutex } func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -154,17 +156,20 @@ func (c *commander) Send(ctx context.Context, message Message) error { return fmt.Errorf("expected a command message, but received %T", message.Data()) } - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + if err := c.channel.Send(cmd); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) } @@ -183,16 +188,19 @@ func (c *commander) Recv() <-chan Message { func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) { log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId()) cfg := &proto.NginxConfig{} - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + var ( header *proto.DataChunk_Header body []byte @@ -200,7 +208,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata}) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -208,7 +216,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr for { chunk, err := downloader.Recv() if err != nil && err != io.EOF { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -259,20 +267,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI payloadChecksum := checksum.Checksum(payload) chunks := checksum.Chunk(payload, c.chunkSize) - isRetrying := false - return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() sender, err := c.client.Upload(c.ctx) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload", err) } @@ -287,7 +296,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload Header", err) } @@ -302,7 +311,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }, }); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err) } } @@ -310,7 +319,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks)) status, err := sender.CloseAndRecv() if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload CloseAndRecv", err) } @@ -324,8 +333,6 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI func (c *commander) createClient() error { log.Debug("Creating commander client") - c.mu.Lock() - defer c.mu.Unlock() // Making sure that the previous client connection is closed before creating a new one if c.grpc != nil { @@ -353,6 +360,8 @@ func (c *commander) createClient() error { } c.channel = channel + c.isRetrying = false + return nil } @@ -364,24 +373,26 @@ loop: case <-ctx.Done(): break loop default: - isRetrying := false err := backoff.WaitUntil(ctx, c.backoffSettings, func() error { select { case <-ctx.Done(): return nil default: - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() cmd, err := c.channel.Recv() log.Infof("Commander received %v, %v", cmd, err) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Recv", err) } @@ -413,3 +424,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error { return err } + +func (c *commander) setIsRetrying(value bool) { + c.retryLock.Lock() + defer c.retryLock.Unlock() + c.isRetrying = value +} diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/commander.go b/vendor/github.com/nginx/agent/sdk/v2/client/commander.go index d3a3f53fb0..a35375f2a0 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/commander.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/commander.go @@ -36,6 +36,7 @@ func NewCommanderClient() Commander { connector: newConnector(), chunkSize: DefaultChunkSize, backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -47,9 +48,10 @@ type commander struct { recvChan chan Message downloadChan chan *proto.DataChunk ctx context.Context - mu sync.Mutex backoffSettings backoff.BackoffSettings cancel context.CancelFunc + isRetrying bool + retryLock sync.Mutex } func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -154,17 +156,20 @@ func (c *commander) Send(ctx context.Context, message Message) error { return fmt.Errorf("expected a command message, but received %T", message.Data()) } - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + if err := c.channel.Send(cmd); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) } @@ -183,16 +188,19 @@ func (c *commander) Recv() <-chan Message { func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) { log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId()) cfg := &proto.NginxConfig{} - isRetrying := false err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() + var ( header *proto.DataChunk_Header body []byte @@ -200,7 +208,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata}) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -208,7 +216,7 @@ func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*pr for { chunk, err := downloader.Recv() if err != nil && err != io.EOF { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Downloader", err) } @@ -259,20 +267,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI payloadChecksum := checksum.Checksum(payload) chunks := checksum.Chunk(payload, c.chunkSize) - isRetrying := false - return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error { - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() sender, err := c.client.Upload(c.ctx) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload", err) } @@ -287,7 +296,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload Header", err) } @@ -302,7 +311,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI }, }, }); err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err) } } @@ -310,7 +319,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks)) status, err := sender.CloseAndRecv() if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Upload CloseAndRecv", err) } @@ -324,8 +333,6 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI func (c *commander) createClient() error { log.Debug("Creating commander client") - c.mu.Lock() - defer c.mu.Unlock() // Making sure that the previous client connection is closed before creating a new one if c.grpc != nil { @@ -353,6 +360,8 @@ func (c *commander) createClient() error { } c.channel = channel + c.isRetrying = false + return nil } @@ -364,24 +373,26 @@ loop: case <-ctx.Done(): break loop default: - isRetrying := false err := backoff.WaitUntil(ctx, c.backoffSettings, func() error { select { case <-ctx.Done(): return nil default: - if isRetrying { + c.retryLock.Lock() + if c.isRetrying { log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target()) err := c.createClient() if err != nil { + c.retryLock.Unlock() return err } } + c.retryLock.Unlock() cmd, err := c.channel.Recv() log.Infof("Commander received %v, %v", cmd, err) if err != nil { - isRetrying = true + c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Recv", err) } @@ -413,3 +424,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error { return err } + +func (c *commander) setIsRetrying(value bool) { + c.retryLock.Lock() + defer c.retryLock.Unlock() + c.isRetrying = value +}