Skip to content

Commit

Permalink
Refactor ACS heartbeat message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
danehlim committed May 30, 2023
1 parent bc8945d commit 9c5c61b
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 20 deletions.
12 changes: 11 additions & 1 deletion agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
"github.com/aws/amazon-ecs-agent/agent/eventstream"
"github.com/aws/amazon-ecs-agent/agent/version"
acssession "github.com/aws/amazon-ecs-agent/ecs-agent/acs/session"
rolecredentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime"
"github.com/aws/amazon-ecs-agent/ecs-agent/wsclient"
Expand Down Expand Up @@ -321,7 +323,15 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {

client.AddRequestHandler(payloadHandler.handlerFunc())

client.AddRequestHandler(HeartbeatHandlerFunc(client, acsSession.doctor))
heartbeatResponder := acssession.NewHeartbeatResponder(acsSession.doctor)
heartbeatResponder.RegisterResponder(func(response interface{}) error {
logger.Debug("Sending response to ACS", logger.Fields{
"Name": heartbeatResponder.Name(),
"Response": response,
})
return client.MakeRequest(response)
})
client.AddRequestHandler(heartbeatResponder.HandlerFunc())

updater.AddAgentUpdateHandlers(client, cfg, acsSession.state, acsSession.dataClient, acsSession.taskEngine)

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions ecs-agent/acs/session/heartbeat_responder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package session

import (
"github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/amazon-ecs-agent/ecs-agent/wsclient"
"github.com/aws/aws-sdk-go/aws"
)

// heartbeatResponder implements the wsclient.RequestResponder interface for responding
// to ecsacs.HeartbeatMessage type.
type heartbeatResponder struct {
doctor *doctor.Doctor
respond wsclient.RespondFunc
}

// NewHeartbeatResponder returns an instance of the heartbeatResponder struct.
func NewHeartbeatResponder(doctor *doctor.Doctor) wsclient.RequestResponder {
return &heartbeatResponder{
doctor: doctor,
}
}

func (*heartbeatResponder) Name() string {
return "heartbeat message responder"
}

func (r *heartbeatResponder) RegisterResponder(respond wsclient.RespondFunc) {
r.respond = respond
}

func (r *heartbeatResponder) HandlerFunc() wsclient.RequestHandler {
return r.processHeartbeatMessage
}

// processHeartbeatMessage processes an ACS heartbeat message.
// This function is meant to be called from the ACS dispatcher and as such
// should not block in any way to prevent starvation of the message handler.
func (r *heartbeatResponder) processHeartbeatMessage(message *ecsacs.HeartbeatMessage) {
// Agent will run container instance healthchecks. They are triggered by ACS heartbeat.
// Results of healthchecks will be sent on to TACS.
go r.doctor.RunHealthchecks()

// Agent will send simple ack
ack := &ecsacs.HeartbeatAckRequest{
MessageId: message.MessageId,
}
go func() {
err := r.respond(ack)
if err != nil {
logger.Warn("Error acknowledging server heartbeat", logger.Fields{
field.MessageID: aws.StringValue(ack.MessageId),
field.Error: err,
})
}
}()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package handler
package session

import (
"testing"

"github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
mock_wsclient "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/mock"

"github.com/aws/aws-sdk-go/aws"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -87,16 +85,16 @@ func validateHeartbeatAck(t *testing.T, heartbeatReceived *ecsacs.HeartbeatMessa

ackSent := make(chan *ecsacs.HeartbeatAckRequest)

mockWsClient := mock_wsclient.NewMockClientServer(ctrl)
mockWsClient.EXPECT().MakeRequest(gomock.Any()).Do(func(message *ecsacs.HeartbeatAckRequest) {
ackSent <- message
close(ackSent)
}).Times(1)

emptyHealthchecksList := []doctor.Healthcheck{}
emptyDoctor, _ := doctor.NewDoctor(emptyHealthchecksList, "testCluster", "this:is:an:instance:arn")

handleSingleHeartbeatMessage(mockWsClient, emptyDoctor, heartbeatReceived)
testHeartbeatResponder := NewHeartbeatResponder(emptyDoctor)
testHeartbeatResponder.RegisterResponder(func(response interface{}) error {
resp := response.(*ecsacs.HeartbeatAckRequest)
ackSent <- resp
return nil
})
testHeartbeatResponder.(*heartbeatResponder).processHeartbeatMessage(heartbeatReceived)

// wait till we send an
heartbeatAckSent := <-ackSent
Expand Down
1 change: 1 addition & 0 deletions ecs-agent/logger/field/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ const (
ContainerName = "containerName"
ContainerImage = "containerImage"
ContainerExitCode = "containerExitCode"
MessageID = "messageID"
)

0 comments on commit 9c5c61b

Please sign in to comment.