Skip to content

Commit b3db90b

Browse files
committed
acs,tcs handlers: increase websocket rw timeout
Increase the websocket read and write timeouts as per review comments
1 parent ec4d577 commit b3db90b

File tree

4 files changed

+27
-21
lines changed

4 files changed

+27
-21
lines changed

agent/acs/handler/acs_handler.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ const (
4848
// without disconnecting
4949
heartbeatTimeout = 1 * time.Minute
5050
heartbeatJitter = 1 * time.Minute
51+
// wsRWTimeout is the duration of read and write deadline for the
52+
// websocket connection
53+
wsRWTimeout = 2*heartbeatTimeout + heartbeatJitter
5154

5255
inactiveInstanceReconnectDelay = 1 * time.Hour
5356

@@ -376,7 +379,7 @@ func (acsSession *session) heartbeatJitter() time.Duration {
376379

377380
// createACSClient creates the ACS Client using the specified URL
378381
func (acsResources *acsSessionResources) createACSClient(url string, cfg *config.Config) wsclient.ClientServer {
379-
return acsclient.New(url, cfg, acsResources.credentialsProvider, heartbeatTimeout+heartbeatJitter)
382+
return acsclient.New(url, cfg, acsResources.credentialsProvider, wsRWTimeout)
380383
}
381384

382385
// connectedToACS records a successful connection to ACS
@@ -437,11 +440,11 @@ func anyMessageHandler(timer ttime.Timer, client wsclient.ClientServer) func(int
437440
return func(interface{}) {
438441
seelog.Debug("ACS activity occurred")
439442
// Reset read deadline as there's activity on the channel
440-
if err := client.SetReadDeadline(time.Now().Add(heartbeatTimeout + heartbeatJitter)); err != nil {
441-
seelog.Warn("Unable to extend read deadline for ACS connection: %v", err)
443+
if err := client.SetReadDeadline(time.Now().Add(wsRWTimeout)); err != nil {
444+
seelog.Warnf("Unable to extend read deadline for ACS connection: %v", err)
442445
}
443446

444-
// Reset heearbeat timer
447+
// Reset hearbeat timer
445448
timer.Reset(utils.AddJitter(heartbeatTimeout, heartbeatJitter))
446449
}
447450
}

agent/acs/handler/acs_handler_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -803,9 +803,7 @@ func TestConnectionIsClosedOnIdle(t *testing.T) {
803803
_heartbeatTimeout: 20 * time.Millisecond,
804804
_heartbeatJitter: 10 * time.Millisecond,
805805
}
806-
go func() {
807-
acsSession.startACSSession(mockWsClient)
808-
}()
806+
go acsSession.startACSSession(mockWsClient)
809807

810808
// Wait for connection to be closed. If the connection is not closed
811809
// due to inactivity, the test will time out

agent/tcs/handler/handler.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ const (
3636
defaultPublishMetricsInterval = 20 * time.Second
3737

3838
// The maximum time to wait between heartbeats without disconnecting
39-
defaultHeartbeatTimeout = 1 * time.Minute
40-
defaultHeartbeatJitter = 1 * time.Minute
39+
defaultHeartbeatTimeout = 1 * time.Minute
40+
defaultHeartbeatJitter = 1 * time.Minute
41+
// wsRWTimeout is the duration of read and write deadline for the
42+
// websocket connection
43+
wsRWTimeout = 2*defaultHeartbeatTimeout + defaultHeartbeatJitter
4144
deregisterContainerInstanceHandler = "TCSDeregisterContainerInstanceHandler"
4245
)
4346

@@ -92,7 +95,9 @@ func startTelemetrySession(params TelemetrySessionParams, statsEngine stats.Engi
9295
return err
9396
}
9497
url := formatURL(tcsEndpoint, params.Cfg.Cluster, params.ContainerInstanceArn)
95-
return startSession(url, params.Cfg, params.CredentialProvider, statsEngine, defaultHeartbeatTimeout, defaultHeartbeatJitter, defaultPublishMetricsInterval, params.DeregisterInstanceEventStream)
98+
return startSession(url, params.Cfg, params.CredentialProvider, statsEngine,
99+
defaultHeartbeatTimeout, defaultHeartbeatJitter, defaultPublishMetricsInterval,
100+
params.DeregisterInstanceEventStream)
96101
}
97102

98103
func startSession(url string,
@@ -103,7 +108,7 @@ func startSession(url string,
103108
publishMetricsInterval time.Duration,
104109
deregisterInstanceEventStream *eventstream.EventStream) error {
105110
client := tcsclient.New(url, cfg, credentialProvider, statsEngine,
106-
publishMetricsInterval, defaultHeartbeatTimeout+defaultHeartbeatJitter)
111+
publishMetricsInterval, wsRWTimeout)
107112
defer client.Close()
108113

109114
err := deregisterInstanceEventStream.Subscribe(deregisterContainerInstanceHandler, client.Disconnect)
@@ -112,24 +117,24 @@ func startSession(url string,
112117
}
113118
defer deregisterInstanceEventStream.Unsubscribe(deregisterContainerInstanceHandler)
114119

120+
err = client.Connect()
121+
if err != nil {
122+
seelog.Errorf("Error connecting to TCS: %v", err.Error())
123+
return err
124+
}
125+
seelog.Info("Connected to TCS endpoint")
115126
// start a timer and listens for tcs heartbeats/acks. The timer is reset when
116127
// we receive a heartbeat from the server or when a publish metrics message
117128
// is acked.
118129
timer := time.AfterFunc(utils.AddJitter(heartbeatTimeout, heartbeatJitter), func() {
119130
// Close the connection if there haven't been any messages received from backend
120131
// for a long time.
121-
seelog.Info("TCS Connection hasn't had a heartbeat or an ack message in too long of a timeout; disconnecting")
132+
seelog.Info("TCS Connection hasn't had any activity for too long; disconnecting")
122133
client.Disconnect()
123134
})
124135
defer timer.Stop()
125136
client.AddRequestHandler(heartbeatHandler(timer))
126137
client.AddRequestHandler(ackPublishMetricHandler(timer))
127-
err = client.Connect()
128-
if err != nil {
129-
seelog.Errorf("Error connecting to TCS: %v", err.Error())
130-
return err
131-
}
132-
seelog.Info("Connected to TCS endpoint")
133138
client.SetAnyRequestHandler(anyMessageHandler(client))
134139
return client.Serve()
135140
}
@@ -155,9 +160,9 @@ func ackPublishMetricHandler(timer *time.Timer) func(*ecstcs.AckPublishMetric) {
155160
// connection is active
156161
func anyMessageHandler(client wsclient.ClientServer) func(interface{}) {
157162
return func(interface{}) {
158-
seelog.Trace("TCS activity occured")
163+
seelog.Trace("TCS activity occurred")
159164
// Reset read deadline as there's activity on the channel
160-
if err := client.SetReadDeadline(time.Now().Add(defaultHeartbeatTimeout + defaultHeartbeatJitter)); err != nil {
165+
if err := client.SetReadDeadline(time.Now().Add(wsRWTimeout)); err != nil {
161166
seelog.Warnf("Unable to extend read deadline for TCS connection: %v", err)
162167
}
163168
}

agent/wsclient/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func (cs *ClientServerImpl) WriteMessage(send []byte) error {
305305

306306
// This is just future proofing. Ignore the error as the gorilla websocket
307307
// library returns 'nil' anyway for SetWriteDeadline
308-
// https://github.com/gorilla/websocket/blob/master/conn.go#L761
308+
// https://github.com/gorilla/websocket/blob/4201258b820c74ac8e6922fc9e6b52f71fe46f8d/conn.go#L761
309309
if err := cs.conn.SetWriteDeadline(time.Now().Add(cs.RWTimeout)); err != nil {
310310
seelog.Warnf("Unable to set write deadline for websocket connection: %v for %s", err, cs.URL)
311311
}

0 commit comments

Comments
 (0)