Skip to content

Commit 97550e8

Browse files
fierlionEC2 Default User
authored and
EC2 Default User
committed
Add Container runtime check and messaging mechanism
1 parent 49c1a34 commit 97550e8

24 files changed

+1387
-50
lines changed

agent/acs/handler/acs_handler.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/aws/amazon-ecs-agent/agent/config"
3030
rolecredentials "github.com/aws/amazon-ecs-agent/agent/credentials"
3131
"github.com/aws/amazon-ecs-agent/agent/data"
32+
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
33+
"github.com/aws/amazon-ecs-agent/agent/doctor"
3234
"github.com/aws/amazon-ecs-agent/agent/engine"
3335
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
3436
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
@@ -84,6 +86,7 @@ type session struct {
8486
agentConfig *config.Config
8587
deregisterInstanceEventStream *eventstream.EventStream
8688
taskEngine engine.TaskEngine
89+
dockerClient dockerapi.DockerClient
8790
ecsClient api.ECSClient
8891
state dockerstate.TaskEngineState
8992
dataClient data.Client
@@ -94,6 +97,7 @@ type session struct {
9497
backoff retry.Backoff
9598
resources sessionResources
9699
latestSeqNumTaskManifest *int64
100+
doctor *doctor.Doctor
97101
_heartbeatTimeout time.Duration
98102
_heartbeatJitter time.Duration
99103
_inactiveInstanceReconnectDelay time.Duration
@@ -137,17 +141,22 @@ type sessionState interface {
137141
}
138142

139143
// NewSession creates a new Session object
140-
func NewSession(ctx context.Context,
144+
func NewSession(
145+
ctx context.Context,
141146
config *config.Config,
142147
deregisterInstanceEventStream *eventstream.EventStream,
143-
containerInstanceArn string,
148+
containerInstanceARN string,
144149
credentialsProvider *credentials.Credentials,
150+
dockerClient dockerapi.DockerClient,
145151
ecsClient api.ECSClient,
146152
taskEngineState dockerstate.TaskEngineState,
147153
dataClient data.Client,
148154
taskEngine engine.TaskEngine,
149155
credentialsManager rolecredentials.Manager,
150-
taskHandler *eventhandler.TaskHandler, latestSeqNumTaskManifest *int64) Session {
156+
taskHandler *eventhandler.TaskHandler,
157+
latestSeqNumTaskManifest *int64,
158+
doctor *doctor.Doctor,
159+
) Session {
151160
resources := newSessionResources(credentialsProvider)
152161
backoff := retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax,
153162
connectionBackoffJitter, connectionBackoffMultiplier)
@@ -156,9 +165,10 @@ func NewSession(ctx context.Context,
156165
return &session{
157166
agentConfig: config,
158167
deregisterInstanceEventStream: deregisterInstanceEventStream,
159-
containerInstanceARN: containerInstanceArn,
168+
containerInstanceARN: containerInstanceARN,
160169
credentialsProvider: credentialsProvider,
161170
ecsClient: ecsClient,
171+
dockerClient: dockerClient,
162172
state: taskEngineState,
163173
dataClient: dataClient,
164174
taskEngine: taskEngine,
@@ -169,6 +179,7 @@ func NewSession(ctx context.Context,
169179
backoff: backoff,
170180
resources: resources,
171181
latestSeqNumTaskManifest: latestSeqNumTaskManifest,
182+
doctor: doctor,
172183
_heartbeatTimeout: heartbeatTimeout,
173184
_heartbeatJitter: heartbeatJitter,
174185
_inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay,
@@ -335,8 +346,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
335346

336347
client.AddRequestHandler(payloadHandler.handlerFunc())
337348

338-
// Add HeartbeatHandler to acknowledge ACS heartbeats
339-
heartbeatHandler := newHeartbeatHandler(acsSession.ctx, client)
349+
heartbeatHandler := newHeartbeatHandler(acsSession.ctx, client, acsSession.doctor)
340350
defer heartbeatHandler.clearAcks()
341351
heartbeatHandler.start()
342352
defer heartbeatHandler.stop()

agent/acs/handler/acs_handler_test.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
rolecredentials "github.com/aws/amazon-ecs-agent/agent/credentials"
3939
mock_credentials "github.com/aws/amazon-ecs-agent/agent/credentials/mocks"
4040
"github.com/aws/amazon-ecs-agent/agent/data"
41+
mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks"
42+
"github.com/aws/amazon-ecs-agent/agent/doctor"
4143
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
4244
mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks"
4345
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
@@ -805,6 +807,7 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
805807
ctrl := gomock.NewController(t)
806808
defer ctrl.Finish()
807809
taskEngine := mock_engine.NewMockTaskEngine(ctrl)
810+
dockerClient := mock_dockerapi.NewMockDockerClient(ctrl)
808811
ecsClient := mock_api.NewMockECSClient(ctrl)
809812
ctx, cancel := context.WithCancel(context.Background())
810813
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)
@@ -831,6 +834,10 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
831834
})
832835
taskEngine.EXPECT().Version().Return("Docker: 1.5.0", nil).AnyTimes()
833836
taskEngine.EXPECT().AddTask(gomock.Any()).AnyTimes()
837+
dockerClient.EXPECT().SystemPing(gomock.Any(), gomock.Any()).AnyTimes()
838+
839+
emptyHealthchecksList := []doctor.Healthcheck{}
840+
emptyDoctor, _ := doctor.NewDoctor(emptyHealthchecksList, "test-cluster", "this:is:an:instance:arn")
834841

835842
ended := make(chan bool, 1)
836843
go func() {
@@ -840,6 +847,7 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
840847
credentialsProvider: testCreds,
841848
agentConfig: testConfig,
842849
taskEngine: taskEngine,
850+
dockerClient: dockerClient,
843851
ecsClient: ecsClient,
844852
dataClient: data.NewNoopClient(),
845853
taskHandler: taskHandler,
@@ -849,6 +857,7 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
849857
resources: newSessionResources(testCreds),
850858
credentialsManager: rolecredentials.NewManager(),
851859
latestSeqNumTaskManifest: aws.Int64(12),
860+
doctor: emptyDoctor,
852861
}
853862
acsSession.Start()
854863
ended <- true
@@ -916,6 +925,10 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) {
916925
taskEngine.EXPECT().Version().Return("Docker: 1.5.0", nil).AnyTimes()
917926

918927
credentialsManager := mock_credentials.NewMockManager(ctrl)
928+
dockerClient := mock_dockerapi.NewMockDockerClient(ctrl)
929+
930+
emptyHealthchecksList := []doctor.Healthcheck{}
931+
emptyDoctor, _ := doctor.NewDoctor(emptyHealthchecksList, "test-cluster", "this:is:a:container:arn")
919932

920933
latestSeqNumberTaskManifest := int64(10)
921934
ended := make(chan bool, 1)
@@ -925,12 +938,15 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) {
925938
nil,
926939
"myArn",
927940
testCreds,
941+
dockerClient,
928942
ecsClient,
929943
dockerstate.NewTaskEngineState(),
930944
data.NewNoopClient(),
931945
taskEngine,
932946
credentialsManager,
933-
taskHandler, &latestSeqNumberTaskManifest,
947+
taskHandler,
948+
&latestSeqNumberTaskManifest,
949+
emptyDoctor,
934950
)
935951
acsSession.Start()
936952
// StartSession should never return unless the context is canceled
@@ -1017,11 +1033,12 @@ func TestHandlerReconnectsCorrectlySetsSendCredentialsURLParameter(t *testing.T)
10171033
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)
10181034

10191035
mockWsClient := mock_wsclient.NewMockClientServer(ctrl)
1020-
10211036
mockWsClient.EXPECT().SetAnyRequestHandler(gomock.Any()).AnyTimes()
10221037
mockWsClient.EXPECT().AddRequestHandler(gomock.Any()).AnyTimes()
10231038
mockWsClient.EXPECT().Close().Return(nil).AnyTimes()
10241039
mockWsClient.EXPECT().Serve().Return(io.EOF).AnyTimes()
1040+
1041+
dockerClient := mock_dockerapi.NewMockDockerClient(ctrl)
10251042
resources := newSessionResources(testCreds)
10261043
gomock.InOrder(
10271044
// When the websocket client connects to ACS for the first
@@ -1041,6 +1058,7 @@ func TestHandlerReconnectsCorrectlySetsSendCredentialsURLParameter(t *testing.T)
10411058
credentialsProvider: testCreds,
10421059
agentConfig: testConfig,
10431060
taskEngine: taskEngine,
1061+
dockerClient: dockerClient,
10441062
ecsClient: ecsClient,
10451063
dataClient: data.NewNoopClient(),
10461064
taskHandler: taskHandler,

agent/acs/handler/heartbeat_handler.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818

1919
"github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs"
20+
"github.com/aws/amazon-ecs-agent/agent/doctor"
2021
"github.com/aws/amazon-ecs-agent/agent/wsclient"
2122
"github.com/aws/aws-sdk-go/aws"
2223
"github.com/cihub/seelog"
@@ -29,12 +30,11 @@ type heartbeatHandler struct {
2930
ctx context.Context
3031
cancel context.CancelFunc
3132
acsClient wsclient.ClientServer
33+
doctor *doctor.Doctor
3234
}
3335

3436
// newHeartbeatHandler returns an instance of the heartbeatHandler struct
35-
func newHeartbeatHandler(ctx context.Context,
36-
acsClient wsclient.ClientServer) heartbeatHandler {
37-
37+
func newHeartbeatHandler(ctx context.Context, acsClient wsclient.ClientServer, heartbeatDoctor *doctor.Doctor) heartbeatHandler {
3838
// Create a cancelable context from the parent context
3939
derivedContext, cancel := context.WithCancel(ctx)
4040
return heartbeatHandler{
@@ -43,6 +43,7 @@ func newHeartbeatHandler(ctx context.Context,
4343
ctx: derivedContext,
4444
cancel: cancel,
4545
acsClient: acsClient,
46+
doctor: heartbeatDoctor,
4647
}
4748
}
4849

@@ -73,7 +74,15 @@ func (heartbeatHandler *heartbeatHandler) handleHeartbeatMessage() {
7374
}
7475

7576
func (heartbeatHandler *heartbeatHandler) handleSingleHeartbeatMessage(message *ecsacs.HeartbeatMessage) error {
76-
// Agent currently has no other action hooked to heartbeat messages, except simple ack
77+
// TestHandlerDoesntLeakGoroutines unit test is failing because of this section
78+
79+
// Agent will run healthchecks triggered by ACS heartbeat
80+
// healthcheck results will be sent on to TACS, but for now just to debug logs.
81+
go func() {
82+
heartbeatHandler.doctor.RunHealthchecks()
83+
}()
84+
85+
// Agent will send simple ack to the heartbeatAckMessageBuffer
7786
go func() {
7887
response := &ecsacs.HeartbeatAckRequest{
7988
MessageId: message.MessageId,

agent/acs/handler/heartbeat_handler_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"testing"
2121

2222
"github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs"
23+
mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks"
24+
"github.com/aws/amazon-ecs-agent/agent/doctor"
2325
mock_wsclient "github.com/aws/amazon-ecs-agent/agent/wsclient/mock"
2426
"github.com/aws/aws-sdk-go/aws"
2527
"github.com/golang/mock/gomock"
@@ -92,7 +94,14 @@ func validateHeartbeatAck(t *testing.T, heartbeatReceived *ecsacs.HeartbeatMessa
9294
cancel()
9395
}).Times(1)
9496

95-
handler := newHeartbeatHandler(ctx, mockWsClient)
97+
dockerClient := mock_dockerapi.NewMockDockerClient(ctrl)
98+
dockerClient.EXPECT().SystemPing(gomock.Any(), gomock.Any()).AnyTimes()
99+
100+
emptyHealthchecksList := []doctor.Healthcheck{}
101+
emptyDoctor, _ := doctor.NewDoctor(emptyHealthchecksList, "testCluster", "this:is:an:instance:arn")
102+
103+
handler := newHeartbeatHandler(ctx, mockWsClient, emptyDoctor)
104+
96105
go handler.sendHeartbeatAck()
97106

98107
handler.handleSingleHeartbeatMessage(heartbeatReceived)

agent/app/agent.go

+35-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"time"
2222

23+
"github.com/aws/amazon-ecs-agent/agent/doctor"
2324
"github.com/aws/amazon-ecs-agent/agent/eni/watcher"
2425
"github.com/aws/aws-sdk-go/aws/awserr"
2526

@@ -358,6 +359,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
358359
agent.saveMetadata(data.EC2InstanceIDKey, currentEC2InstanceID)
359360
}
360361

362+
// now that we know the container instance ARN, we can build out the doctor
363+
// and pass it on to ACS and TACS
364+
doctor, doctorCreateErr := agent.newDoctorWithHealthchecks(agent.cfg.Cluster, agent.containerInstanceARN)
365+
if doctorCreateErr != nil {
366+
seelog.Warnf("Error starting doctor, healthchecks won't be running: %v", err)
367+
} else {
368+
seelog.Debug("Doctor healthchecks set up properly.")
369+
}
370+
361371
// Begin listening to the docker daemon and saving changes
362372
taskEngine.SetDataClient(agent.dataClient)
363373
imageManager.SetDataClient(agent.dataClient)
@@ -370,11 +380,11 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
370380
taskHandler := eventhandler.NewTaskHandler(agent.ctx, agent.dataClient, state, client)
371381
attachmentEventHandler := eventhandler.NewAttachmentEventHandler(agent.ctx, agent.dataClient, client)
372382
agent.startAsyncRoutines(containerChangeEventStream, credentialsManager, imageManager,
373-
taskEngine, deregisterInstanceEventStream, client, taskHandler, attachmentEventHandler, state)
383+
taskEngine, deregisterInstanceEventStream, client, taskHandler, attachmentEventHandler, state, doctor)
374384

375385
// Start the acs session, which should block doStart
376386
return agent.startACSSession(credentialsManager, taskEngine,
377-
deregisterInstanceEventStream, client, state, taskHandler)
387+
deregisterInstanceEventStream, client, state, taskHandler, doctor)
378388
}
379389

380390
// newTaskEngine creates a new docker task engine object. It tries to load the
@@ -447,6 +457,21 @@ func (agent *ecsAgent) initMetricsEngine() {
447457
metrics.PublishMetrics()
448458
}
449459

460+
// newDoctorWithHealthchecks creates a new doctor and also configures
461+
// the healthchecks that the doctor should be running
462+
func (agent *ecsAgent) newDoctorWithHealthchecks(cluster, containerInstanceARN string) (*doctor.Doctor, error) {
463+
// configure the required healthchecks
464+
runtimeHealthCheck := doctor.NewDockerRuntimeHealthcheck(agent.dockerClient)
465+
466+
// put the healthechecks in a list
467+
healthcheckList := []doctor.Healthcheck{
468+
runtimeHealthCheck,
469+
}
470+
471+
// set up the doctor and return it
472+
return doctor.NewDoctor(healthcheckList, cluster, containerInstanceARN)
473+
}
474+
450475
// setClusterInConfig sets the cluster name in the config object based on
451476
// previous state. It returns an error if there's a mismatch between the
452477
// the current cluster name with what's restored from the cluster state
@@ -648,7 +673,9 @@ func (agent *ecsAgent) startAsyncRoutines(
648673
client api.ECSClient,
649674
taskHandler *eventhandler.TaskHandler,
650675
attachmentEventHandler *eventhandler.AttachmentEventHandler,
651-
state dockerstate.TaskEngineState) {
676+
state dockerstate.TaskEngineState,
677+
doctor *doctor.Doctor,
678+
) {
652679

653680
// Start of the periodic image cleanup process
654681
if !agent.cfg.ImageCleanupDisabled.Enabled() {
@@ -687,6 +714,7 @@ func (agent *ecsAgent) startAsyncRoutines(
687714
ECSClient: client,
688715
TaskEngine: taskEngine,
689716
StatsEngine: statsEngine,
717+
Doctor: doctor,
690718
}
691719

692720
// Start metrics session in a go routine
@@ -748,21 +776,24 @@ func (agent *ecsAgent) startACSSession(
748776
deregisterInstanceEventStream *eventstream.EventStream,
749777
client api.ECSClient,
750778
state dockerstate.TaskEngineState,
751-
taskHandler *eventhandler.TaskHandler) int {
779+
taskHandler *eventhandler.TaskHandler,
780+
doctor *doctor.Doctor) int {
752781

753782
acsSession := acshandler.NewSession(
754783
agent.ctx,
755784
agent.cfg,
756785
deregisterInstanceEventStream,
757786
agent.containerInstanceARN,
758787
agent.credentialProvider,
788+
agent.dockerClient,
759789
client,
760790
state,
761791
agent.dataClient,
762792
taskEngine,
763793
credentialsManager,
764794
taskHandler,
765795
agent.latestSeqNumberTaskManifest,
796+
doctor,
766797
)
767798
seelog.Info("Beginning Polling for updates")
768799
err := acsSession.Start()

agent/dockerclient/dockerapi/docker_client.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ func (dg *dockerGoClient) createContainer(ctx context.Context,
569569
config *dockercontainer.Config,
570570
hostConfig *dockercontainer.HostConfig,
571571
name string) DockerContainerMetadata {
572+
572573
client, err := dg.sdkDockerClient()
573574
if err != nil {
574575
return DockerContainerMetadata{Error: CannotGetDockerClientError{version: dg.version, err: err}}
@@ -610,8 +611,8 @@ func (dg *dockerGoClient) startContainer(ctx context.Context, id string) DockerC
610611
if err != nil {
611612
return DockerContainerMetadata{Error: CannotGetDockerClientError{version: dg.version, err: err}}
612613
}
613-
614614
err = client.ContainerStart(ctx, id, types.ContainerStartOptions{})
615+
615616
metadata := dg.containerMetadata(ctx, id)
616617
if err != nil {
617618
metadata.Error = CannotStartContainerError{err}

0 commit comments

Comments
 (0)