Skip to content

Commit

Permalink
Add commander retry lock
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley committed Oct 16, 2023
1 parent 918eb90 commit 7ec182b
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 80 deletions.
57 changes: 37 additions & 20 deletions sdk/client/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewCommanderClient() Commander {
connector: newConnector(),
chunkSize: DefaultChunkSize,
backoffSettings: DefaultBackoffSettings,
isRetrying: false,
}
}

Expand All @@ -47,9 +48,10 @@ type commander struct {
recvChan chan Message
downloadChan chan *proto.DataChunk
ctx context.Context
mu sync.Mutex
backoffSettings backoff.BackoffSettings
cancel context.CancelFunc
isRetrying bool
retryLock sync.Mutex
}

func (c *commander) WithInterceptor(interceptor interceptors.Interceptor) Client {
Expand Down Expand Up @@ -154,17 +156,20 @@ func (c *commander) Send(ctx context.Context, message Message) error {
return fmt.Errorf("expected a command message, but received %T", message.Data())
}

isRetrying := false
err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
c.retryLock.Lock()
if c.isRetrying {
log.Infof("Commander Channel Send: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
c.retryLock.Unlock()
return err
}
}
c.retryLock.Unlock()

if err := c.channel.Send(cmd); err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Channel Send", err)
}

Expand All @@ -183,32 +188,35 @@ func (c *commander) Recv() <-chan Message {
func (c *commander) Download(ctx context.Context, metadata *proto.Metadata) (*proto.NginxConfig, error) {
log.Debugf("Downloading config (messageId=%s)", metadata.GetMessageId())
cfg := &proto.NginxConfig{}
isRetrying := false

err := backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
c.retryLock.Lock()
if c.isRetrying {
log.Infof("Commander Downloader: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
c.retryLock.Unlock()
return err
}
}
c.retryLock.Unlock()

var (
header *proto.DataChunk_Header
body []byte
)

downloader, err := c.client.Download(c.ctx, &proto.DownloadRequest{Meta: metadata})
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Downloader", err)
}

LOOP:
for {
chunk, err := downloader.Recv()
if err != nil && err != io.EOF {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Downloader", err)
}

Expand Down Expand Up @@ -259,20 +267,21 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
payloadChecksum := checksum.Checksum(payload)
chunks := checksum.Chunk(payload, c.chunkSize)

isRetrying := false

return backoff.WaitUntil(c.ctx, c.backoffSettings, func() error {
if isRetrying {
c.retryLock.Lock()
if c.isRetrying {
log.Infof("Commander Upload: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
c.retryLock.Unlock()
return err
}
}
c.retryLock.Unlock()

sender, err := c.client.Upload(c.ctx)
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload", err)
}

Expand All @@ -287,7 +296,7 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
})
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload Header", err)
}

Expand All @@ -302,15 +311,15 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI
},
},
}); err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError(fmt.Sprintf("Commander Upload (chunks=%d)", id), err)
}
}

log.Infof("Upload sending done %s (chunks=%d)", metadata.MessageId, len(chunks))
status, err := sender.CloseAndRecv()
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Upload CloseAndRecv", err)
}

Expand All @@ -324,8 +333,6 @@ func (c *commander) Upload(ctx context.Context, cfg *proto.NginxConfig, messageI

func (c *commander) createClient() error {
log.Debug("Creating commander client")
c.mu.Lock()
defer c.mu.Unlock()

// Making sure that the previous client connection is closed before creating a new one
if c.grpc != nil {
Expand Down Expand Up @@ -353,6 +360,8 @@ func (c *commander) createClient() error {
}
c.channel = channel

c.isRetrying = false

return nil
}

Expand All @@ -364,24 +373,26 @@ loop:
case <-ctx.Done():
break loop
default:
isRetrying := false
err := backoff.WaitUntil(ctx, c.backoffSettings, func() error {
select {
case <-ctx.Done():
return nil
default:
if isRetrying {
c.retryLock.Lock()
if c.isRetrying {
log.Infof("Commander Channel Recv: retrying to connect to %s", c.grpc.Target())
err := c.createClient()
if err != nil {
c.retryLock.Unlock()
return err
}
}
c.retryLock.Unlock()

cmd, err := c.channel.Recv()
log.Infof("Commander received %v, %v", cmd, err)
if err != nil {
isRetrying = true
c.setIsRetrying(true)
return c.handleGrpcError("Commander Channel Recv", err)
}

Expand Down Expand Up @@ -413,3 +424,9 @@ func (c *commander) handleGrpcError(messagePrefix string, err error) error {

return err
}

func (c *commander) setIsRetrying(value bool) {
c.retryLock.Lock()
defer c.retryLock.Unlock()
c.isRetrying = value
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7ec182b

Please sign in to comment.