Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
send pending acks before closing ACS connection
Browse files Browse the repository at this point in the history
singholt committed Feb 23, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 6d54c7c commit 799564d
Showing 1 changed file with 46 additions and 3 deletions.
49 changes: 46 additions & 3 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

acsclient "github.com/aws/amazon-ecs-agent/agent/acs/client"
@@ -369,8 +370,10 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
}

seelog.Info("Connected to ACS endpoint")
// Start a connection timer; agent will close its ACS websocket connection after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter)
// Start a connection timer; agent will send pending acks and close its ACS websocket connection
// after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter,
&refreshCredsHandler, &taskManifestHandler, &payloadHandler, &heartbeatHandler)
defer connectionTimer.Stop()

// Start a heartbeat timer for closing the connection
@@ -506,9 +509,49 @@ func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitt
}

// newConnectionTimer creates a new timer, after which agent closes its ACS websocket connection
func newConnectionTimer(client wsclient.ClientServer, connectionTime time.Duration, connectionJitter time.Duration) ttime.Timer {
func newConnectionTimer(
client wsclient.ClientServer,
connectionTime time.Duration,
connectionJitter time.Duration,
refreshCredsHandler *refreshCredentialsHandler,
taskManifestHandler *taskManifestHandler,
payloadHandler *payloadRequestHandler,
heartbeatHandler *heartbeatHandler,
) ttime.Timer {
expiresAt := retry.AddJitter(connectionTime, connectionJitter)
timer := time.AfterFunc(expiresAt, func() {
seelog.Debugf("Sending pending acks to ACS before closing the connection")
var wg sync.WaitGroup
wg.Add(4)

// send pending creds refresh acks
go func() {
refreshCredsHandler.sendAcks()
wg.Done()
}()

// send pending task manifest acks and task stop verification acks
go func() {
taskManifestHandler.sendTaskManifestMessageAck()
taskManifestHandler.handleTaskStopVerificationAck()
wg.Done()
}()

// send pending payload acks
go func() {
payloadHandler.sendAcks()
wg.Done()
}()

// send pending heartbeat acks
go func() {
heartbeatHandler.sendHeartbeatAck()
wg.Done()
}()

// wait for acks from all handlers above to be sent before closing the websocket connection
wg.Wait()

seelog.Infof("Closing ACS websocket connection after %v minutes", expiresAt.Minutes())
// WriteCloseMessage() writes a close message using websocket control messages
// Ref: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages

0 comments on commit 799564d

Please sign in to comment.