Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commander retry lock #502

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 48 additions & 37 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewCommanderClient() Commander {
connector: newConnector(),
chunkSize: DefaultChunkSize,
backoffSettings: DefaultBackoffSettings,
isRetrying: false,
}
}

Expand All @@ -50,6 +51,8 @@ type commander struct {
mu sync.Mutex
backoffSettings backoff.BackoffSettings
cancel context.CancelFunc
isRetrying bool
retryLock sync.Mutex
}

func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client {
Expand Down Expand Up @@ -154,17 +157,14 @@ func (c *commander) Send(ctx context.Context, message Message) error {
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

isRetrying := false
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

if err := c.channel.Send(cmd); err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Channel Send", err)
}

Expand All @@ -183,32 +183,29 @@ func (c *commander) Recv() <-chan Message {
func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) {
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}
isRetrying := false

err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

var (
header *proto.DataChunk_Header
body []byte
)

downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata})
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Downloader", err)
}

LOOP:
for {
chunk, err := downloader.Recv()
if err != nil && err != io.EOF {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Downloader", err)
}

Expand Down Expand Up @@ -259,20 +256,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

isRetrying := false

return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

sender, err := c.client.Upload(c.ctx)
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload", err)
}

Expand All @@ -287,7 +279,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
})
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload Header", err)
}

Expand All @@ -302,15 +294,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
},
}); err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err)
}
}

log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks))
status, err := sender.CloseAndRecv()
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload CloseAndRecv", err)
}

Expand All @@ -322,6 +314,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
})
}

func (c *commander) checkClientConnection() error {
c.retryLock.Lock()
defer c.retryLock.Unlock()

if c.isRetrying {
log.Infof("Retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}

return nil
}

func (c *commander) createClient() error {
log.Debug("Creating commander client")
c.mu.Lock()
Expand Down Expand Up @@ -353,6 +360,8 @@ func (c *commander) createClient() error {
}
c.channel = channel

c.isRetrying = false

return nil
}

Expand All @@ -364,24 +373,20 @@ loop:
case <-ctx.Done():
break loop
default:
isRetrying := false
err := backoff.WaitUntil(ctx, c.backoffSettings, func() error {
select {
case <-ctx.Done():
return nil
default:
if isRetrying {
log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Channel Recv", err)
}

Expand Down Expand Up @@ -413,3 +418,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error {

return err
}

func (c *commander) setIsRetrying(value bool) {
c.retryLock.Lock()
defer c.retryLock.Unlock()
c.isRetrying = value
}
Loading