Skip to content

Commit 029908f

Browse files
RichaGangwarRicha Gangwar
and
Richa Gangwar
authored
Move periodic timeout implementation to wsclient library. (#3883)
* wsclient: periodic timeout functionality addeed to wsclient * wsclient: edited comments * Fix static check failures * wsclient: new mock client and client_factory * wsclient: update tests using new mock wsclient and client_factory * wsclient: update the tcs handler in ecs-agent * Pass metricsfactory from handler * Update vendor folder * Fix goimports * Move heartbeathandler back to acs,tcs * Updated mocks * Updated unit tests. * Pass metricsFactory from acs session * Updated acs_handler tests to send new parameter to newsession function. * Fix goimports * Add parameter to Do() in mock as Connect() is updated with parameter as well * Update test files with new parameters * Updated tests for wsclient, tcs, acs handlers * Update metricsFactory.Done * Fixing goimports * Fix acs handler test fail when passed timer as nil in mockwsclient * Fix acs client tests * Fix acs client tests * Fix import order * Add new tests * Fix import order * Fix go imports * Fic acs handler tests * Fic acs handler tests * Remove startTime from acs handler * Update acs tests * Fix static check failiures * Fix unit test acs * Update venfor directory * Remove unnecessary metricsFactory * Update error message from connection closure due to writeclosemessage * Update test messages * Update tests * Return disconnect time in unit tests * Handle review comments * Handle race condition in the TestPeriodicDisconnect * Minor changes to handle review comments * Update vendor dir * Goimports * Fix test --------- Co-authored-by: Richa Gangwar <[email protected]>
1 parent a6b6076 commit 029908f

File tree

23 files changed

+488
-312
lines changed

23 files changed

+488
-312
lines changed

agent/acs/handler/acs_handler.go

+29-48
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ const (
5555

5656
inactiveInstanceReconnectDelay = 1 * time.Hour
5757

58-
// connectionTime is the maximum time after which agent closes its connection to ACS
59-
connectionTime = 15 * time.Minute
60-
connectionJitter = 30 * time.Minute
61-
6258
connectionBackoffMin = 250 * time.Millisecond
6359
connectionBackoffMax = 2 * time.Minute
6460
connectionBackoffJitter = 0.2
@@ -97,6 +93,7 @@ type session struct {
9793
ctx context.Context
9894
cancel context.CancelFunc
9995
backoff retry.Backoff
96+
metricsFactory metrics.EntryFactory
10097
clientFactory wsclient.ClientFactory
10198
sendCredentials bool
10299
latestSeqNumTaskManifest *int64
@@ -127,6 +124,7 @@ func NewSession(
127124
doctor *doctor.Doctor,
128125
clientFactory wsclient.ClientFactory,
129126
addUpdateRequestHandlers func(wsclient.ClientServer),
127+
metricsFactory metrics.EntryFactory,
130128
) Session {
131129
backoff := retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax,
132130
connectionBackoffJitter, connectionBackoffMultiplier)
@@ -149,13 +147,14 @@ func NewSession(
149147
backoff: backoff,
150148
latestSeqNumTaskManifest: latestSeqNumTaskManifest,
151149
doctor: doctor,
150+
metricsFactory: metricsFactory,
152151
clientFactory: clientFactory,
153152
addUpdateRequestHandlers: addUpdateRequestHandlers,
154153
sendCredentials: true,
155154
_heartbeatTimeout: heartbeatTimeout,
156155
_heartbeatJitter: heartbeatJitter,
157-
connectionTime: connectionTime,
158-
connectionJitter: connectionJitter,
156+
connectionTime: wsclient.DisconnectTimeout,
157+
connectionJitter: wsclient.DisconnectJitterMax,
159158
_inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay,
160159
}
161160
}
@@ -233,7 +232,8 @@ func (acsSession *session) startSessionOnce() error {
233232
url,
234233
acsSession.credentialsProvider,
235234
wsRWTimeout,
236-
minAgentCfg)
235+
minAgentCfg,
236+
acsSession.metricsFactory)
237237
defer client.Close()
238238

239239
return acsSession.startACSSession(client)
@@ -257,21 +257,19 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
257257

258258
taskStopper := NewTaskStopper(acsSession.taskEngine)
259259

260-
metricsFactory := metrics.NewNopEntryFactory()
261-
262260
responseSender := func(response interface{}) error {
263261
return client.MakeRequest(response)
264262
}
265263
responders := []wsclient.RequestResponder{
266264
acssession.NewPayloadResponder(payloadMsgHandler, responseSender),
267-
acssession.NewRefreshCredentialsResponder(acsSession.credentialsManager, credsMetadataSetter, metricsFactory,
265+
acssession.NewRefreshCredentialsResponder(acsSession.credentialsManager, credsMetadataSetter, acsSession.metricsFactory,
268266
responseSender),
269267
acssession.NewAttachTaskENIResponder(eniHandler, responseSender),
270268
acssession.NewAttachInstanceENIResponder(eniHandler, responseSender),
271269
acssession.NewHeartbeatResponder(acsSession.doctor, responseSender),
272270
acssession.NewTaskManifestResponder(taskComparer, sequenceNumberAccessor, manifestMessageIDAccessor,
273-
metricsFactory, responseSender),
274-
acssession.NewTaskStopVerificationACKResponder(taskStopper, manifestMessageIDAccessor, metricsFactory),
271+
acsSession.metricsFactory, responseSender),
272+
acssession.NewTaskStopVerificationACKResponder(taskStopper, manifestMessageIDAccessor, acsSession.metricsFactory),
275273
}
276274
for _, r := range responders {
277275
client.AddRequestHandler(r.HandlerFunc())
@@ -281,17 +279,17 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
281279
acsSession.addUpdateRequestHandlers(client)
282280
}
283281

284-
err := client.Connect()
282+
disconnectTimer, err := client.Connect(metrics.ACSDisconnectTimeoutMetricName,
283+
acsSession.connectionTime,
284+
acsSession.connectionJitter)
285285
if err != nil {
286286
seelog.Errorf("Error connecting to ACS: %v", err)
287287
return err
288288
}
289289

290+
defer disconnectTimer.Stop()
291+
290292
seelog.Info("Connected to ACS endpoint")
291-
// Start a connection timer; agent close its ACS websocket connection
292-
// after this timer expires
293-
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter)
294-
defer connectionTimer.Stop()
295293

296294
// Start a heartbeat timer for closing the connection
297295
heartbeatTimer := newHeartbeatTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
@@ -316,6 +314,20 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
316314
return client.Serve(acsSession.ctx)
317315
}
318316

317+
// newHeartbeatTimer creates a new time object, with a callback to
318+
// disconnect from ACS on inactivity
319+
func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitter time.Duration) ttime.Timer {
320+
timer := time.AfterFunc(retry.AddJitter(timeout, jitter), func() {
321+
seelog.Warn("ACS Connection hasn't had any activity for too long; closing connection")
322+
if err := client.Close(); err != nil {
323+
seelog.Warnf("Error disconnecting: %v", err)
324+
}
325+
seelog.Info("Disconnected from ACS")
326+
})
327+
328+
return timer
329+
}
330+
319331
func (acsSession *session) computeReconnectDelay(isInactiveInstance bool) time.Duration {
320332
if isInactiveInstance {
321333
return acsSession._inactiveInstanceReconnectDelay
@@ -366,37 +378,6 @@ func (acsSession *session) acsURL(endpoint string) string {
366378
return acsURL + "?" + query.Encode()
367379
}
368380

369-
// newHeartbeatTimer creates a new time object, with a callback to
370-
// disconnect from ACS on inactivity
371-
func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitter time.Duration) ttime.Timer {
372-
timer := time.AfterFunc(retry.AddJitter(timeout, jitter), func() {
373-
seelog.Warn("ACS Connection hasn't had any activity for too long; closing connection")
374-
if err := client.Close(); err != nil {
375-
seelog.Warnf("Error disconnecting: %v", err)
376-
}
377-
seelog.Info("Disconnected from ACS")
378-
})
379-
380-
return timer
381-
}
382-
383-
// newConnectionTimer creates a new timer, after which agent closes
384-
// its websocket connection
385-
func newConnectionTimer(client wsclient.ClientServer, connectionTime time.Duration,
386-
connectionJitter time.Duration) ttime.Timer {
387-
expiresAt := retry.AddJitter(connectionTime, connectionJitter)
388-
timer := time.AfterFunc(expiresAt, func() {
389-
seelog.Infof("Closing ACS websocket connection after %v minutes", expiresAt.Minutes())
390-
// WriteCloseMessage() writes a close message using websocket control messages
391-
// Ref: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages
392-
err := client.WriteCloseMessage()
393-
if err != nil {
394-
seelog.Warnf("Error writing close message: %v", err)
395-
}
396-
})
397-
return timer
398-
}
399-
400381
// anyMessageHandler handles any server message. Any server message means the
401382
// connection is active and thus the heartbeat disconnect should not occur
402383
func anyMessageHandler(timer ttime.Timer, client wsclient.ClientServer) func(interface{}) {

0 commit comments

Comments
 (0)