Skip to content

Commit

Permalink
acs,tcs handlers: increase websocket rw timeout
Browse files Browse the repository at this point in the history
Increase the websocket read and write timeouts
as per review comments
  • Loading branch information
aaithal committed Sep 27, 2017
1 parent 7723038 commit 679cf93
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
11 changes: 7 additions & 4 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
// without disconnecting
heartbeatTimeout = 1 * time.Minute
heartbeatJitter = 1 * time.Minute
// wsRWTimeout is the duration of read and write deadline for the
// websocket connection
wsRWTimeout = 2*heartbeatTimeout + heartbeatJitter

inactiveInstanceReconnectDelay = 1 * time.Hour

Expand Down Expand Up @@ -376,7 +379,7 @@ func (acsSession *session) heartbeatJitter() time.Duration {

// createACSClient creates the ACS Client using the specified URL
func (acsResources *acsSessionResources) createACSClient(url string, cfg *config.Config) wsclient.ClientServer {
return acsclient.New(url, cfg, acsResources.credentialsProvider, heartbeatTimeout+heartbeatJitter)
return acsclient.New(url, cfg, acsResources.credentialsProvider, wsRWTimeout)
}

// connectedToACS records a successful connection to ACS
Expand Down Expand Up @@ -437,11 +440,11 @@ func anyMessageHandler(timer ttime.Timer, client wsclient.ClientServer) func(int
return func(interface{}) {
seelog.Debug("ACS activity occurred")
// Reset read deadline as there's activity on the channel
if err := client.SetReadDeadline(time.Now().Add(heartbeatTimeout + heartbeatJitter)); err != nil {
seelog.Warn("Unable to extend read deadline for ACS connection: %v", err)
if err := client.SetReadDeadline(time.Now().Add(wsRWTimeout)); err != nil {
seelog.Warnf("Unable to extend read deadline for ACS connection: %v", err)
}

// Reset heearbeat timer
// Reset hearbeat timer
timer.Reset(utils.AddJitter(heartbeatTimeout, heartbeatJitter))
}
}
Expand Down
4 changes: 1 addition & 3 deletions agent/acs/handler/acs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,7 @@ func TestConnectionIsClosedOnIdle(t *testing.T) {
_heartbeatTimeout: 20 * time.Millisecond,
_heartbeatJitter: 10 * time.Millisecond,
}
go func() {
acsSession.startACSSession(mockWsClient)
}()
go acsSession.startACSSession(mockWsClient)

// Wait for connection to be closed. If the connection is not closed
// due to inactivity, the test will time out
Expand Down
31 changes: 18 additions & 13 deletions agent/tcs/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ const (
defaultPublishMetricsInterval = 20 * time.Second

// The maximum time to wait between heartbeats without disconnecting
defaultHeartbeatTimeout = 1 * time.Minute
defaultHeartbeatJitter = 1 * time.Minute
defaultHeartbeatTimeout = 1 * time.Minute
defaultHeartbeatJitter = 1 * time.Minute
// wsRWTimeout is the duration of read and write deadline for the
// websocket connection
wsRWTimeout = 2*defaultHeartbeatTimeout + defaultHeartbeatJitter
deregisterContainerInstanceHandler = "TCSDeregisterContainerInstanceHandler"
)

Expand Down Expand Up @@ -92,7 +95,9 @@ func startTelemetrySession(params TelemetrySessionParams, statsEngine stats.Engi
return err
}
url := formatURL(tcsEndpoint, params.Cfg.Cluster, params.ContainerInstanceArn)
return startSession(url, params.Cfg, params.CredentialProvider, statsEngine, defaultHeartbeatTimeout, defaultHeartbeatJitter, defaultPublishMetricsInterval, params.DeregisterInstanceEventStream)
return startSession(url, params.Cfg, params.CredentialProvider, statsEngine,
defaultHeartbeatTimeout, defaultHeartbeatJitter, defaultPublishMetricsInterval,
params.DeregisterInstanceEventStream)
}

func startSession(url string,
Expand All @@ -103,7 +108,7 @@ func startSession(url string,
publishMetricsInterval time.Duration,
deregisterInstanceEventStream *eventstream.EventStream) error {
client := tcsclient.New(url, cfg, credentialProvider, statsEngine,
publishMetricsInterval, defaultHeartbeatTimeout+defaultHeartbeatJitter)
publishMetricsInterval, wsRWTimeout)
defer client.Close()

err := deregisterInstanceEventStream.Subscribe(deregisterContainerInstanceHandler, client.Disconnect)
Expand All @@ -112,24 +117,24 @@ func startSession(url string,
}
defer deregisterInstanceEventStream.Unsubscribe(deregisterContainerInstanceHandler)

err = client.Connect()
if err != nil {
seelog.Errorf("Error connecting to TCS: %v", err.Error())
return err
}
seelog.Info("Connected to TCS endpoint")
// start a timer and listens for tcs heartbeats/acks. The timer is reset when
// we receive a heartbeat from the server or when a publish metrics message
// is acked.
timer := time.AfterFunc(utils.AddJitter(heartbeatTimeout, heartbeatJitter), func() {
// Close the connection if there haven't been any messages received from backend
// for a long time.
seelog.Info("TCS Connection hasn't had a heartbeat or an ack message in too long of a timeout; disconnecting")
seelog.Info("TCS Connection hasn't had any activity for too long; disconnecting")
client.Disconnect()
})
defer timer.Stop()
client.AddRequestHandler(heartbeatHandler(timer))
client.AddRequestHandler(ackPublishMetricHandler(timer))
err = client.Connect()
if err != nil {
seelog.Errorf("Error connecting to TCS: %v", err.Error())
return err
}
seelog.Info("Connected to TCS endpoint")
client.SetAnyRequestHandler(anyMessageHandler(client))
return client.Serve()
}
Expand All @@ -155,9 +160,9 @@ func ackPublishMetricHandler(timer *time.Timer) func(*ecstcs.AckPublishMetric) {
// connection is active
func anyMessageHandler(client wsclient.ClientServer) func(interface{}) {
return func(interface{}) {
seelog.Trace("TCS activity occured")
seelog.Trace("TCS activity occurred")
// Reset read deadline as there's activity on the channel
if err := client.SetReadDeadline(time.Now().Add(defaultHeartbeatTimeout + defaultHeartbeatJitter)); err != nil {
if err := client.SetReadDeadline(time.Now().Add(wsRWTimeout)); err != nil {
seelog.Warnf("Unable to extend read deadline for TCS connection: %v", err)
}
}
Expand Down

0 comments on commit 679cf93

Please sign in to comment.