diff --git a/agent/acs/handler/acs_handler.go b/agent/acs/handler/acs_handler.go index df205b6f4c2..cc09b0148cb 100644 --- a/agent/acs/handler/acs_handler.go +++ b/agent/acs/handler/acs_handler.go @@ -34,8 +34,10 @@ import ( "github.com/aws/amazon-ecs-agent/agent/eventhandler" "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/version" + acssession "github.com/aws/amazon-ecs-agent/ecs-agent/acs/session" rolecredentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime" "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" @@ -321,7 +323,15 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error { client.AddRequestHandler(payloadHandler.handlerFunc()) - client.AddRequestHandler(HeartbeatHandlerFunc(client, acsSession.doctor)) + heartbeatResponder := acssession.NewHeartbeatResponder(acsSession.doctor) + heartbeatResponder.RegisterResponder(func(response interface{}) error { + logger.Debug("Sending response to ACS", logger.Fields{ + "Name": heartbeatResponder.Name(), + "Response": response, + }) + return client.MakeRequest(response) + }) + client.AddRequestHandler(heartbeatResponder.HandlerFunc()) updater.AddAgentUpdateHandlers(client, cfg, acsSession.state, acsSession.dataClient, acsSession.taskEngine) diff --git a/agent/acs/handler/heartbeat_handler.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/heartbeat_responder.go similarity index 54% rename from agent/acs/handler/heartbeat_handler.go rename to agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/heartbeat_responder.go index c6d76789180..c50dd1fe3af 100644 --- a/agent/acs/handler/heartbeat_handler.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/heartbeat_responder.go @@ -11,40 +11,64 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package handler +package session import ( "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" "github.com/aws/aws-sdk-go/aws" - "github.com/cihub/seelog" ) -func HeartbeatHandlerFunc(acsClient wsclient.ClientServer, doctor *doctor.Doctor) func(message *ecsacs.HeartbeatMessage) { - return func(message *ecsacs.HeartbeatMessage) { - handleSingleHeartbeatMessage(acsClient, doctor, message) +// heartbeatResponder implements the wsclient.RequestResponder interface for responding +// to ecsacs.HeartbeatMessage type. +type heartbeatResponder struct { + doctor *doctor.Doctor + respond wsclient.RespondFunc +} + +// NewHeartbeatResponder returns an instance of the heartbeatResponder struct. +func NewHeartbeatResponder(doctor *doctor.Doctor) *heartbeatResponder { + return &heartbeatResponder{ + doctor: doctor, } } +func (*heartbeatResponder) Name() string { + return "heartbeat message responder" +} + +func (r *heartbeatResponder) RegisterResponder(respond wsclient.RespondFunc) { + r.respond = respond +} + +func (r *heartbeatResponder) HandlerFunc() wsclient.RequestHandler { + return r.processHeartbeatMessage +} + // To handle a Heartbeat Message the doctor health checks need to be run and // an ACK needs to be sent back to ACS. // This function is meant to be called from the ACS dispatcher and as such // should not block in any way to prevent starvation of the message handler -func handleSingleHeartbeatMessage(acsClient wsclient.ClientServer, doctor *doctor.Doctor, message *ecsacs.HeartbeatMessage) { +func (r *heartbeatResponder) processHeartbeatMessage(message *ecsacs.HeartbeatMessage) { // Agent will run healthchecks triggered by ACS heartbeat // healthcheck results will be sent on to TACS, but for now just to debug logs. - go doctor.RunHealthchecks() + go r.doctor.RunHealthchecks() // Agent will send simple ack ack := &ecsacs.HeartbeatAckRequest{ MessageId: message.MessageId, } go func() { - err := acsClient.MakeRequest(ack) + err := r.respond(ack) if err != nil { - seelog.Warnf("Error acknowledging server heartbeat, message id: %s, error: %s", aws.StringValue(ack.MessageId), err) + logger.Warn("Error acknowledging server heartbeat", logger.Fields{ + field.MessageID: aws.StringValue(ack.MessageId), + field.Error: err, + }) } }() } diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go index 8b154e1dbec..3bc7ab1833b 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go @@ -48,4 +48,5 @@ const ( ContainerName = "containerName" ContainerImage = "containerImage" ContainerExitCode = "containerExitCode" + MessageID = "messageID" ) diff --git a/ecs-agent/acs/session/heartbeat_responder.go b/ecs-agent/acs/session/heartbeat_responder.go new file mode 100644 index 00000000000..0f5c9b28ee3 --- /dev/null +++ b/ecs-agent/acs/session/heartbeat_responder.go @@ -0,0 +1,72 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package session + +import ( + "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" + "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" + "github.com/aws/aws-sdk-go/aws" +) + +// heartbeatResponder implements the wsclient.RequestResponder interface for responding +// to ecsacs.HeartbeatMessage type. +type heartbeatResponder struct { + doctor *doctor.Doctor + respond wsclient.RespondFunc +} + +// NewHeartbeatResponder returns an instance of the heartbeatResponder struct. +func NewHeartbeatResponder(doctor *doctor.Doctor) wsclient.RequestResponder { + return &heartbeatResponder{ + doctor: doctor, + } +} + +func (*heartbeatResponder) Name() string { + return "heartbeat message responder" +} + +func (r *heartbeatResponder) RegisterResponder(respond wsclient.RespondFunc) { + r.respond = respond +} + +func (r *heartbeatResponder) HandlerFunc() wsclient.RequestHandler { + return r.processHeartbeatMessage +} + +// processHeartbeatMessage processes an ACS heartbeat message. +// This function is meant to be called from the ACS dispatcher and as such +// should not block in any way to prevent starvation of the message handler. +func (r *heartbeatResponder) processHeartbeatMessage(message *ecsacs.HeartbeatMessage) { + // Agent will run container instance healthchecks. They are triggered by ACS heartbeat. + // Results of healthchecks will be sent on to TACS. + go r.doctor.RunHealthchecks() + + // Agent will send simple ack + ack := &ecsacs.HeartbeatAckRequest{ + MessageId: message.MessageId, + } + go func() { + err := r.respond(ack) + if err != nil { + logger.Warn("Error acknowledging server heartbeat", logger.Fields{ + field.MessageID: aws.StringValue(ack.MessageId), + field.Error: err, + }) + } + }() +} diff --git a/agent/acs/handler/heartbeat_handler_test.go b/ecs-agent/acs/session/heartbeat_responder_test.go similarity index 87% rename from agent/acs/handler/heartbeat_handler_test.go rename to ecs-agent/acs/session/heartbeat_responder_test.go index 8dfa86980d3..c9168ae3733 100644 --- a/agent/acs/handler/heartbeat_handler_test.go +++ b/ecs-agent/acs/session/heartbeat_responder_test.go @@ -14,15 +14,13 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package handler +package session import ( "testing" "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" - mock_wsclient "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/mock" - "github.com/aws/aws-sdk-go/aws" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -87,16 +85,16 @@ func validateHeartbeatAck(t *testing.T, heartbeatReceived *ecsacs.HeartbeatMessa ackSent := make(chan *ecsacs.HeartbeatAckRequest) - mockWsClient := mock_wsclient.NewMockClientServer(ctrl) - mockWsClient.EXPECT().MakeRequest(gomock.Any()).Do(func(message *ecsacs.HeartbeatAckRequest) { - ackSent <- message - close(ackSent) - }).Times(1) - emptyHealthchecksList := []doctor.Healthcheck{} emptyDoctor, _ := doctor.NewDoctor(emptyHealthchecksList, "testCluster", "this:is:an:instance:arn") - handleSingleHeartbeatMessage(mockWsClient, emptyDoctor, heartbeatReceived) + testHeartbeatResponder := NewHeartbeatResponder(emptyDoctor) + testHeartbeatResponder.RegisterResponder(func(response interface{}) error { + resp := response.(*ecsacs.HeartbeatAckRequest) + ackSent <- resp + return nil + }) + testHeartbeatResponder.(*heartbeatResponder).processHeartbeatMessage(heartbeatReceived) // wait till we send an heartbeatAckSent := <-ackSent diff --git a/ecs-agent/logger/field/constants.go b/ecs-agent/logger/field/constants.go index 8b154e1dbec..3bc7ab1833b 100644 --- a/ecs-agent/logger/field/constants.go +++ b/ecs-agent/logger/field/constants.go @@ -48,4 +48,5 @@ const ( ContainerName = "containerName" ContainerImage = "containerImage" ContainerExitCode = "containerExitCode" + MessageID = "messageID" )