-
Notifications
You must be signed in to change notification settings - Fork 619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Websocket timeout #993
Websocket timeout #993
Conversation
The wc command prefixes spaces to the output, which corrupts the GIT_PORCELAIN variable, thus failing the build. This change removes spaces from the output.
This commit aims to make the websocker connection management better by implementing the following improvements: 1. Set read and write deadlines for websocket ReadMessage and WriteMessage operations. This is to ensure that these methods do not hang and result in io timeout if there's issues with the connection 2. Reduce the scope of the lock in the Connect() method. The lock was being held for the length of Connect() method, which meant that it wouldn't be relnquished if there was any delay in establishing the connection. The scope of the lock has now been reduced to just accessing the cs.conn variable 3. Start ACS heartbeat timer after the connection has been established. The timer was being started before a call to Connect, which meant that the connection could be prematurely terminated for being idle if there was a delay in establishing the connection These changes should improve the disconnection behavior of the websocket connection, which should help with scenarios where the Agent never reconnects to ACS because it's forever waiting in Disconnect() method waiting to acquire the lock (aws#985)
agent/acs/handler/acs_handler.go
Outdated
seelog.Debug("ACS activity occured") | ||
seelog.Debug("ACS activity occurred") | ||
// Reset read deadline as there's activity on the channel | ||
if err := client.SetReadDeadline(time.Now().Add(heartbeatTimeout + heartbeatJitter)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd feel more comfortable if this were actually longer, like 2*heartbeatTimeout + heartbeatJitter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain again here too? Doesn't doubling that have the net effect of doubling your pre-established heartbeatTimeout
the first time there is some traffic over the websocket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we use rand.Int63n()
in AddJitter
, heartbeatTimeout + heartbeatJitter
should always be greater than the heartbeat duration that we compute. I think it should be safe to set it the way it's currently being set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't doubling that have the net effect of doubling your pre-established
heartbeatTimeout
the first time there is some traffic over the websocket?
No, the deadline is set based on time.Now()
, so it's always in the future by the amount specified here.
I think it should be safe to set it the way it's currently being set.
I'm worried, because this is a time and not a duration, and because Go 1.7 doesn't have a monotonic clock, that something unexpected can happen here. I'd feel more comfortable with a longer timeout because we don't expect this timeout to be hit anyway and I don't see any danger of making it longer.
Once we move to Go 1.9 there will be a monotonic clock and this should be safer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd feel more comfortable with a longer timeout because we don't expect this timeout to be hit anyway
Oh, you'd be surprised :) I think we'll hit this timeout when there's connection issues. However, I do buy the argument that 2*heartbeatTimeout + heartbeatJitter
will work for most cases here.
agent/acs/handler/acs_handler.go
Outdated
seelog.Warn("Unable to extend read deadline for ACS connection: %v", err) | ||
} | ||
|
||
// Reset heearbeat timer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: heartbeat
agent/tcs/handler/handler.go
Outdated
@@ -35,8 +36,8 @@ const ( | |||
defaultPublishMetricsInterval = 20 * time.Second | |||
|
|||
// The maximum time to wait between heartbeats without disconnecting | |||
defaultHeartbeatTimeout = 5 * time.Minute | |||
defaultHeartbeatJitter = 3 * time.Minute | |||
defaultHeartbeatTimeout = 1 * time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we change these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is just meant to mimic the work that @richardpen did for the ACS client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samuelkarp @jhaynes is correct. This is reflecting the change that @richardpen did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
agent/tcs/handler/handler.go
Outdated
return func(interface{}) { | ||
seelog.Trace("TCS activity occured") | ||
// Reset read deadline as there's activity on the channel | ||
if err := client.SetReadDeadline(time.Now().Add(defaultHeartbeatTimeout + defaultHeartbeatJitter)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, I think this should be longer than just timeout + jitter.
agent/acs/handler/acs_handler.go
Outdated
@@ -377,7 +376,7 @@ func (acsSession *session) heartbeatJitter() time.Duration { | |||
|
|||
// createACSClient creates the ACS Client using the specified URL | |||
func (acsResources *acsSessionResources) createACSClient(url string, cfg *config.Config) wsclient.ClientServer { | |||
return acsclient.New(url, cfg, acsResources.credentialsProvider) | |||
return acsclient.New(url, cfg, acsResources.credentialsProvider, heartbeatTimeout+heartbeatJitter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be longer than timeout + jitter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you expand on why? I would assume you'd want it to be smaller if anything. That way, you could retry a read/write without necessarily having to re-establish the whole connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will respond on the other comment.
agent/tcs/handler/handler.go
Outdated
deregisterInstanceEventStream *eventstream.EventStream) error { | ||
client := tcsclient.New(url, cfg, credentialProvider, statsEngine, publishMetricsInterval) | ||
client := tcsclient.New(url, cfg, credentialProvider, statsEngine, | ||
publishMetricsInterval, defaultHeartbeatTimeout+defaultHeartbeatJitter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be longer than timeout + jitter.
agent/acs/handler/acs_handler.go
Outdated
seelog.Debug("ACS activity occurred") | ||
// Reset read deadline as there's activity on the channel | ||
if err := client.SetReadDeadline(time.Now().Add(heartbeatTimeout + heartbeatJitter)); err != nil { | ||
seelog.Warn("Unable to extend read deadline for ACS connection: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Warnf
@@ -112,7 +118,7 @@ func startSession(url string, cfg *config.Config, credentialProvider *credential | |||
timer := time.AfterFunc(utils.AddJitter(heartbeatTimeout, heartbeatJitter), func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do the same as acs to move the timer after the connection was established?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought I did that. But, obviously didn't. I'll modify this.
agent/tcs/handler/handler.go
Outdated
// connection is active | ||
func anyMessageHandler(client wsclient.ClientServer) func(interface{}) { | ||
return func(interface{}) { | ||
seelog.Trace("TCS activity occured") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the Trace
level here intentionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The similar ACS log line is Debug
level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TCS channel is more chatty, which means we'll see this printed every 20 seconds if this is changed to Debug
. I could get rid of this altogether if you have a strong preference here. But, somehow this Trace soothes my mind.
@@ -48,6 +49,9 @@ const ( | |||
// wsConnectTimeout specifies the default connection timeout to the backend. | |||
wsConnectTimeout = 30 * time.Second | |||
|
|||
// wsHandshakeTimeout specifies the default handshake timeout for the websocket client | |||
wsHandshakeTimeout = wsConnectTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have wsHandshakeTimeout, do we still need wsConnectTimeout? As if the hand shake timed out, the connect will fail, is that right? Or they should be the same value, eg: wsHandshakeTimeout = 3*wsConnectTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume the wsHandshakeTimeout
would be <= wsConectTimeout
// SetReadDeadline sets the read deadline for the websocket connection | ||
// A read timeout results in an io error if there are any outstanding reads | ||
// that exceed the deadline | ||
func (cs *ClientServerImpl) SetReadDeadline(t time.Time) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to pass the duration and calculate the time right before calling the SetReadDeadline
, it's more accurate, as the passed in time could already beyond time.Now() for some wired reason.
cs := &clientServer{} | ||
cs.URL = url | ||
cs.CredentialProvider = credentialProvider | ||
cs.AgentConfig = cfg | ||
cs.ServiceError = &acsError{} | ||
cs.RequestHandlers = make(map[string]wsclient.RequestHandler) | ||
cs.TypeDecoder = NewACSDecoder() | ||
cs.RWTimeout = rwTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
why not just 'timeout'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted it to convey more meaning that just timeout
would convey.
closeErr := client.Close() | ||
if closeErr != nil { | ||
seelog.Warnf("Error disconnecting: %v", closeErr) | ||
if err := client.Close(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only one return value.. so is this still more idiomatic than:
if client.Close() != nil {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is that? I don't see how err := something; if err != nil {}
is more idiomatic than if err := something; err != nil {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I was asking if setting the err
variable is necessary at all. On closer inspection I see that we log the error, so you should ignore this :)
return func(interface{}) { | ||
seelog.Debug("ACS activity occured") | ||
seelog.Debug("ACS activity occurred") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this debug line useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think so. Shows us when the timer was reset.
timer := newDisconnectionTimer(mockWsClient, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter()) | ||
defer timer.Stop() | ||
acsSession.startACSSession(mockWsClient, timer) | ||
acsSession.startACSSession(mockWsClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the function wrapper
go acsSession.startACSSession(mockWsClient)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
@@ -35,8 +36,8 @@ const ( | |||
defaultPublishMetricsInterval = 20 * time.Second | |||
|
|||
// The maximum time to wait between heartbeats without disconnecting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we go ahead and add the default rwtimeout here too? even if expresed as:
defaultRWTimeout = defaultHeartBeatJitter + defaultHeartbeatTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought about and didn't think we needed a new const for this. But, I think it makes it more explicit. I'll add that.
"github.com/aws/amazon-ecs-agent/agent/wsclient" | ||
"github.com/aws/aws-sdk-go/aws/credentials" | ||
"github.com/cihub/seelog" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
agent/acs/handler/acs_handler.go
Outdated
@@ -377,7 +376,7 @@ func (acsSession *session) heartbeatJitter() time.Duration { | |||
|
|||
// createACSClient creates the ACS Client using the specified URL | |||
func (acsResources *acsSessionResources) createACSClient(url string, cfg *config.Config) wsclient.ClientServer { | |||
return acsclient.New(url, cfg, acsResources.credentialsProvider) | |||
return acsclient.New(url, cfg, acsResources.credentialsProvider, heartbeatTimeout+heartbeatJitter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you expand on why? I would assume you'd want it to be smaller if anything. That way, you could retry a read/write without necessarily having to re-establish the whole connection.
agent/acs/handler/acs_handler.go
Outdated
seelog.Debug("ACS activity occured") | ||
seelog.Debug("ACS activity occurred") | ||
// Reset read deadline as there's activity on the channel | ||
if err := client.SetReadDeadline(time.Now().Add(heartbeatTimeout + heartbeatJitter)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain again here too? Doesn't doubling that have the net effect of doubling your pre-established heartbeatTimeout
the first time there is some traffic over the websocket?
@@ -46,7 +46,12 @@ type clientServer struct { | |||
// New returns a client/server to bidirectionally communicate with the backend. | |||
// The returned struct should have both 'Connect' and 'Serve' called upon it | |||
// before being used. | |||
func New(url string, cfg *config.Config, credentialProvider *credentials.Credentials, statsEngine stats.Engine, publishMetricsInterval time.Duration) wsclient.ClientServer { | |||
func New(url string, | |||
cfg *config.Config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I much prefer this style for functions with more than a few arguments.
@@ -67,11 +73,11 @@ func New(url string, cfg *config.Config, credentialProvider *credentials.Credent | |||
func (cs *clientServer) Serve() error { | |||
seelog.Debug("TCS client starting websocket poll loop") | |||
if !cs.IsReady() { | |||
return fmt.Errorf("Websocket not ready for connections") | |||
return fmt.Errorf("tcs client: websocket not ready for connections") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should TCS be capitalized here? We capitalize ACS in most places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're sticking to the style of error strings beginning with a lower case letter.
} | ||
|
||
if cs.statsEngine == nil { | ||
return fmt.Errorf("uninitialized stats engine") | ||
return fmt.Errorf("tcs client: uninitialized stats engine") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above. Probably should be TCS.
agent/tcs/handler/handler.go
Outdated
@@ -35,8 +36,8 @@ const ( | |||
defaultPublishMetricsInterval = 20 * time.Second | |||
|
|||
// The maximum time to wait between heartbeats without disconnecting | |||
defaultHeartbeatTimeout = 5 * time.Minute | |||
defaultHeartbeatJitter = 3 * time.Minute | |||
defaultHeartbeatTimeout = 1 * time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is just meant to mimic the work that @richardpen did for the ACS client.
agent/tcs/handler/handler.go
Outdated
@@ -112,7 +118,7 @@ func startSession(url string, cfg *config.Config, credentialProvider *credential | |||
timer := time.AfterFunc(utils.AddJitter(heartbeatTimeout, heartbeatJitter), func() { | |||
// Close the connection if there haven't been any messages received from backend | |||
// for a long time. | |||
seelog.Debug("TCS Connection hasn't had a heartbeat or an ack message in too long of a timeout; disconnecting") | |||
seelog.Info("TCS Connection hasn't had a heartbeat or an ack message in too long of a timeout; disconnecting") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wording is confusing. The similar ACS log message is: "ACS Connection hasn't had any activity for too long; closing connection"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll modify that.
agent/tcs/handler/handler.go
Outdated
// connection is active | ||
func anyMessageHandler(client wsclient.ClientServer) func(interface{}) { | ||
return func(interface{}) { | ||
seelog.Trace("TCS activity occured") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The similar ACS log line is Debug
level.
@@ -48,6 +49,9 @@ const ( | |||
// wsConnectTimeout specifies the default connection timeout to the backend. | |||
wsConnectTimeout = 30 * time.Second | |||
|
|||
// wsHandshakeTimeout specifies the default handshake timeout for the websocket client | |||
wsHandshakeTimeout = wsConnectTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume the wsHandshakeTimeout
would be <= wsConectTimeout
9613a31
to
679cf93
Compare
@samuelkarp @jhaynes @richardpen @petderek I've modified the code to address your comments. Can you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment
agent/wsclient/client.go
Outdated
|
||
// This is just future proofing. Ignore the error as the gorilla websocket | ||
// library returns 'nil' anyway for SetWriteDeadline | ||
// https://github.com/gorilla/websocket/blob/master/conn.go#L761 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good if you could change this link to reference a specific commit, so we won't lose the information if that file is changed. like: https://github.com/gorilla/websocket/blob/4201258b820c74ac8e6922fc9e6b52f71fe46f8d/conn.go#L761
Increase the websocket read and write timeouts as per review comments
679cf93
to
c140bd3
Compare
Summary
Implements some websocket connection management improvements.
Implementation details
WriteMessage operations. This is to ensure that these methods
do not hang and result in io timeout if there's issues with
the connection
lock was being held for the length of Connect() method, which
meant that it wouldn't be relnquished if there was any delay
in establishing the connection. The scope of the lock has now
been reduced to just accessing the cs.conn variable
established. The timer was being started before a call to
Connect, which meant that the connection could be prematurely
terminated for being idle if there was a delay in establishing
the connection
Testing
make release
)go build -out amazon-ecs-agent.exe ./agent
)make test
) passgo test -timeout=25s ./agent/...
) passmake run-integ-tests
) pass.\scripts\run-integ-tests.ps1
) passmake run-functional-tests
) pass.\scripts\run-functional-tests.ps1
) passNew tests cover the changes: Yes
Description for the changelog
Covered in changelog.md
Licensing
This contribution is under the terms of the Apache 2.0 License: Yes