Skip to content

Commit

Permalink
Add commander retry lock (#502)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley authored Nov 8, 2023
1 parent 93c1d75 commit 747b121
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 148 deletions.
85 changes: 48 additions & 37 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewCommanderClient() Commander {
connector: newConnector(),
chunkSize: DefaultChunkSize,
backoffSettings: DefaultBackoffSettings,
isRetrying: false,
}
}

Expand All @@ -50,6 +51,8 @@ type commander struct {
mu sync.Mutex
backoffSettings backoff.BackoffSettings
cancel context.CancelFunc
isRetrying bool
retryLock sync.Mutex
}

func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client {
Expand Down Expand Up @@ -154,17 +157,14 @@ func (c *commander) Send(ctx context.Context, message Message) error {
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

isRetrying := false
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

if err := c.channel.Send(cmd); err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Channel Send", err)
}

Expand All @@ -183,32 +183,29 @@ func (c *commander) Recv() <-chan Message {
func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) {
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}
isRetrying := false

err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

var (
header *proto.DataChunk_Header
body []byte
)

downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata})
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Downloader", err)
}

LOOP:
for {
chunk, err := downloader.Recv()
if err != nil && err != io.EOF {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Downloader", err)
}

Expand Down Expand Up @@ -259,20 +256,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

isRetrying := false

return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

sender, err := c.client.Upload(c.ctx)
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload", err)
}

Expand All @@ -287,7 +279,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
})
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload Header", err)
}

Expand All @@ -302,15 +294,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
},
}); err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err)
}
}

log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks))
status, err := sender.CloseAndRecv()
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload CloseAndRecv", err)
}

Expand All @@ -322,6 +314,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
})
}

func (c *commander) checkClientConnection() error {
c.retryLock.Lock()
defer c.retryLock.Unlock()

if c.isRetrying {
log.Infof("Retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
}

return nil
}

func (c *commander) createClient() error {
log.Debug("Creating commander client")
c.mu.Lock()
Expand Down Expand Up @@ -353,6 +360,8 @@ func (c *commander) createClient() error {
}
c.channel = channel

c.isRetrying = false

return nil
}

Expand All @@ -364,24 +373,20 @@ loop:
case <-ctx.Done():
break loop
default:
isRetrying := false
err := backoff.WaitUntil(ctx, c.backoffSettings, func() error {
select {
case <-ctx.Done():
return nil
default:
if isRetrying {
log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
return err
}
err := c.checkClientConnection()
if err != nil {
return err
}

cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Channel Recv", err)
}

Expand Down Expand Up @@ -413,3 +418,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error {

return err
}

func (c *commander) setIsRetrying(value bool) {
c.retryLock.Lock()
defer c.retryLock.Unlock()
c.isRetrying = value
}
Loading

0 comments on commit 747b121

Please sign in to comment.