Skip to content

Commit

Permalink
send pending acks before closing ACS connection
Browse files Browse the repository at this point in the history
  • Loading branch information
singholt committed Feb 23, 2023
1 parent 6d54c7c commit b6acc65
Showing 1 changed file with 46 additions and 3 deletions.
49 changes: 46 additions & 3 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

acsclient "github.com/aws/amazon-ecs-agent/agent/acs/client"
Expand Down Expand Up @@ -369,8 +370,10 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
}

seelog.Info("Connected to ACS endpoint")
// Start a connection timer; agent will close its ACS websocket connection after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter)
// Start a connection timer; agent will send pending acks and close its ACS websocket connection
// after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter,
refreshCredsHandler, taskManifestHandler, payloadHandler, heartbeatHandler)
defer connectionTimer.Stop()

// Start a heartbeat timer for closing the connection
Expand Down Expand Up @@ -506,9 +509,49 @@ func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitt
}

// newConnectionTimer creates a new timer, after which agent closes its ACS websocket connection
func newConnectionTimer(client wsclient.ClientServer, connectionTime time.Duration, connectionJitter time.Duration) ttime.Timer {
func newConnectionTimer(
client wsclient.ClientServer,
connectionTime time.Duration,
connectionJitter time.Duration,
refreshCredsHandler refreshCredentialsHandler,
taskManifestHandler taskManifestHandler,
payloadHandler payloadRequestHandler,
heartbeatHandler heartbeatHandler,
) ttime.Timer {
expiresAt := retry.AddJitter(connectionTime, connectionJitter)
timer := time.AfterFunc(expiresAt, func() {
seelog.Debugf("Sending pending acks to ACS before closing the connection")
var wg sync.WaitGroup
wg.Add(4)

// send pending creds refresh acks
go func() {
refreshCredsHandler.sendAcks()
wg.Done()
}()

// send pending task manifest acks and task stop verification acks
go func() {
taskManifestHandler.sendTaskManifestMessageAck()
taskManifestHandler.handleTaskStopVerificationAck()
wg.Done()
}()

// send pending payload acks
go func() {
payloadHandler.sendAcks()
wg.Done()
}()

// send pending heartbeat acks
go func() {
heartbeatHandler.sendHeartbeatAck()
wg.Done()
}()

// wait for acks from all handlers above to be sent before closing the websocket connection
wg.Wait()

seelog.Infof("Closing ACS websocket connection after %v minutes", expiresAt.Minutes())
// WriteCloseMessage() writes a close message using websocket control messages
// Ref: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages
Expand Down

0 comments on commit b6acc65

Please sign in to comment.