Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Managed Daemon images in background #3984

Merged
merged 6 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,19 +433,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
return exitcodes.ExitTerminal
}

// Load Managed Daemon images asynchronously
agent.loadManagedDaemonImagesAsync(imageManager)

scManager := agent.serviceconnectManager
scManager.SetECSClient(client, agent.containerInstanceARN)
if loaded, _ := scManager.IsLoaded(agent.dockerClient); loaded {
imageManager.AddImageToCleanUpExclusionList(agent.serviceconnectManager.GetLoadedImageName())
}

// exclude all daemon images from cleanup
for _, csiDM := range agent.daemonManagers {
if loaded, _ := csiDM.IsLoaded(agent.dockerClient); loaded {
imageManager.AddImageToCleanUpExclusionList(csiDM.GetManagedDaemon().GetLoadedDaemonImageRef())
}
}

// Add container instance ARN to metadata manager
if agent.cfg.ContainerMetadataEnabled.Enabled() {
agent.metadataManager.SetContainerInstanceARN(agent.containerInstanceARN)
Expand Down Expand Up @@ -485,7 +481,7 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
agent.startAsyncRoutines(containerChangeEventStream, credentialsManager, imageManager,
taskEngine, deregisterInstanceEventStream, client, taskHandler, attachmentEventHandler, state, doctor)
// TODO add EBS watcher to async routines
agent.startEBSWatcher(state, taskEngine)
agent.startEBSWatcher(state, taskEngine, agent.dockerClient)
// Start the acs session, which should block doStart
return agent.startACSSession(credentialsManager, taskEngine,
deregisterInstanceEventStream, client, state, taskHandler, doctor)
Expand Down Expand Up @@ -751,6 +747,38 @@ func (agent *ecsAgent) constructVPCSubnetAttributes() []*ecs.Attribute {
}
}

// Loads Managed Daemon images for all Managed Daemons registered on the Agent.
// The images are loaded in the background. Successfully loaded images are added to
// imageManager's cleanup exclusion list.
func (agent *ecsAgent) loadManagedDaemonImagesAsync(imageManager engine.ImageManager) {
daemonManagers := agent.getDaemonManagers()
logger.Debug(fmt.Sprintf("Will load images for %d Managed Daemons", len(daemonManagers)))
for _, daemonManager := range daemonManagers {
go agent.loadManagedDaemonImage(daemonManager, imageManager)
}
}

// Loads Managed Daemon image and adds it to image cleanup exclusion list upon success.
func (agent *ecsAgent) loadManagedDaemonImage(dm dm.DaemonManager, imageManager engine.ImageManager) {
imageRef := dm.GetManagedDaemon().GetImageRef()
logger.Info("Starting to load Managed Daemon image", logger.Fields{
field.ImageRef: imageRef,
})
image, err := dm.LoadImage(agent.ctx, agent.dockerClient)
if err != nil {
logger.Error("Failed to load Managed Daemon image", logger.Fields{
field.ImageRef: imageRef,
field.Error: err,
})
return
}
logger.Info("Successfully loaded Managed Daemon image", logger.Fields{
field.ImageRef: imageRef,
field.ImageID: image.ID,
})
imageManager.AddImageToCleanUpExclusionList(imageRef)
}

// registerContainerInstance registers the container instance ID for the ECS Agent
func (agent *ecsAgent) registerContainerInstance(
client api.ECSClient,
Expand Down Expand Up @@ -1140,6 +1168,11 @@ func (agent *ecsAgent) setDaemonManager(key string, val dm.DaemonManager) {
agent.daemonManagers[key] = val
}

// Returns daemon managers map. Not designed to be thread-safe.
func (agent *ecsAgent) getDaemonManagers() map[string]dm.DaemonManager {
return agent.daemonManagers
}

// setVPCSubnet sets the vpc and subnet ids for the agent by querying the
// instance metadata service
func (agent *ecsAgent) setVPCSubnet() (error, bool) {
Expand Down
14 changes: 9 additions & 5 deletions agent/app/agent_capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,16 @@ func (agent *ecsAgent) appendEBSTaskAttachCapabilities(capabilities []*ecs.Attri
if daemonDef.GetImageName() == md.EbsCsiDriver {
csiDaemonManager := dm.NewDaemonManager(daemonDef)
agent.setDaemonManager(md.EbsCsiDriver, csiDaemonManager)
if _, err := csiDaemonManager.LoadImage(agent.ctx, agent.dockerClient); err != nil {
logger.Error("Failed to load the EBS CSI Driver. This container instance will not be able to support EBS Task Attach",
imageExists, err := csiDaemonManager.ImageExists()
if !imageExists {
logger.Error(
"Either EBS Daemon image does not exist or failed to check its existence."+
" This container instance will not advertise EBS Task Attach capability.",
logger.Fields{
field.Error: err,
},
)
field.ImageName: csiDaemonManager.GetManagedDaemon().GetImageName(),
field.ImageTARPath: csiDaemonManager.GetManagedDaemon().GetImageTarPath(),
field.Error: err,
})
return capabilities
}
}
Expand Down
47 changes: 47 additions & 0 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
aws_credentials "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/docker/docker/api/types"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1859,3 +1860,49 @@ func TestWaitUntilInstanceInServicePolling(t *testing.T) {
})
}
}

func TestLoadManagedDaemonImage(t *testing.T) {
tcs := []struct {
name string
setDaemonManagerExpectations func(*mock_daemonmanager.MockDaemonManager)
setImageManagerExpectations func(*mock_engine.MockImageManager)
}{
{
name: "no exclusion list update if image load fails",
setDaemonManagerExpectations: func(mdm *mock_daemonmanager.MockDaemonManager) {
mdm.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")).Times(2)
mdm.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, errors.New("error"))
},
},
{
name: "exclusion list is updated if image load succeeds",
setDaemonManagerExpectations: func(mdm *mock_daemonmanager.MockDaemonManager) {
mdm.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")).Times(3)
mdm.EXPECT().
LoadImage(gomock.Any(), gomock.Any()).
Return(&types.ImageInspect{ID: "image-id"}, nil)
},
setImageManagerExpectations: func(mim *mock_engine.MockImageManager) {
mim.EXPECT().AddImageToCleanUpExclusionList("name:tag")
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl.Finish()

dockerClient := mock_dockerapi.NewMockDockerClient(ctrl)
daemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
imageManager := mock_engine.NewMockImageManager(ctrl)

tc.setDaemonManagerExpectations(daemonManager)
if tc.setImageManagerExpectations != nil {
tc.setImageManagerExpectations(imageManager)
}

agent := &ecsAgent{ctx: context.Background(), dockerClient: dockerClient}
agent.loadManagedDaemonImage(daemonManager, imageManager)
})
}
}
9 changes: 7 additions & 2 deletions agent/app/agent_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

asmfactory "github.com/aws/amazon-ecs-agent/agent/asm/factory"
"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
ebs "github.com/aws/amazon-ecs-agent/agent/ebs"
"github.com/aws/amazon-ecs-agent/agent/ecscni"
"github.com/aws/amazon-ecs-agent/agent/engine"
Expand Down Expand Up @@ -151,10 +152,14 @@ func (agent *ecsAgent) startENIWatcher(state dockerstate.TaskEngineState, stateC
return nil
}

func (agent *ecsAgent) startEBSWatcher(state dockerstate.TaskEngineState, taskEngine engine.TaskEngine) {
func (agent *ecsAgent) startEBSWatcher(
state dockerstate.TaskEngineState,
taskEngine engine.TaskEngine,
dockerClient dockerapi.DockerClient,
) {
if agent.ebsWatcher == nil {
seelog.Debug("Creating new EBS watcher...")
agent.ebsWatcher = ebs.NewWatcher(agent.ctx, state, taskEngine)
agent.ebsWatcher = ebs.NewWatcher(agent.ctx, state, taskEngine, dockerClient)
go agent.ebsWatcher.Start()
}
}
Expand Down
8 changes: 6 additions & 2 deletions agent/app/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

asmfactory "github.com/aws/amazon-ecs-agent/agent/asm/factory"
"github.com/aws/amazon-ecs-agent/agent/data"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
"github.com/aws/amazon-ecs-agent/agent/ecscni"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
Expand Down Expand Up @@ -99,9 +100,12 @@ func (agent *ecsAgent) startWindowsService() int {
return 0
}

func (agent *ecsAgent) startEBSWatcher(state dockerstate.TaskEngineState, taskEngine engine.TaskEngine) error {
func (agent *ecsAgent) startEBSWatcher(
state dockerstate.TaskEngineState,
taskEngine engine.TaskEngine,
dockerClient dockerapi.DockerClient,
) {
seelog.Debug("Windows EBS Watcher not implemented: No Op")
return nil
}

// handler implements https://godoc.org/golang.org/x/sys/windows/svc#Handler
Expand Down
130 changes: 97 additions & 33 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import (
"time"

ecsapi "github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
ecsengine "github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
apiebs "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment/resource"
"github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
csi "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
md "github.com/aws/amazon-ecs-agent/ecs-agent/manageddaemon"
log "github.com/cihub/seelog"

Expand All @@ -47,13 +51,15 @@ type EBSWatcher struct {
csiClient csi.CSIClient
scanTicker *time.Ticker
// TODO: The dockerTaskEngine.stateChangeEvent will be used to send over the state change event for EBS attachments once it's been found and mounted/resize/format.
taskEngine ecsengine.TaskEngine
taskEngine ecsengine.TaskEngine
dockerClient dockerapi.DockerClient
}

// NewWatcher is used to return a new instance of the EBSWatcher struct
func NewWatcher(ctx context.Context,
state dockerstate.TaskEngineState,
taskEngine ecsengine.TaskEngine) *EBSWatcher {
taskEngine ecsengine.TaskEngine,
dockerClient dockerapi.DockerClient) *EBSWatcher {
derivedContext, cancel := context.WithCancel(ctx)
discoveryClient := apiebs.NewDiscoveryClient(derivedContext)
// TODO pull this socket out into config
Expand All @@ -65,6 +71,7 @@ func NewWatcher(ctx context.Context,
discoveryClient: discoveryClient,
csiClient: &csiClient,
taskEngine: taskEngine,
dockerClient: dockerClient,
}
}

Expand All @@ -77,13 +84,7 @@ func (w *EBSWatcher) Start() {
for {
select {
case <-w.scanTicker.C:
pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey()
if len(pendingEBS) > 0 {
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.overrideDeviceName(foundVolumes)
w.StageAll(foundVolumes)
w.NotifyAttached(foundVolumes)
}
w.tick()
case <-w.ctx.Done():
w.scanTicker.Stop()
log.Info("EBS Watcher Stopped due to agent stop")
Expand All @@ -92,6 +93,93 @@ func (w *EBSWatcher) Start() {
}
}

// Method to handle watcher's tick.
// If there are no pending EBS volume attachments in agent state, then this method is a no-op.
// If there are pending EBS volume attachments in agent state then this method will ensure
// that EBS Managed Daemon is running and then scan the host for the EBS volumes and process
// the ones that are found.
func (w *EBSWatcher) tick() {
pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey()
if len(pendingEBS) <= 0 {
return
}
if !w.daemonRunning() {
log.Info("EBS Managed Daemon is not currently running. Skipping EBS Watcher tick.")
return
}
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.overrideDeviceName(foundVolumes)
w.StageAll(foundVolumes)
w.NotifyAttached(foundVolumes)
}

// Checks if EBS Daemon Task is running and starts a new one it if it's not.
func (w *EBSWatcher) daemonRunning() bool {
csiTask := w.taskEngine.GetDaemonTask(md.EbsCsiDriver)

// Check if task is running or about to run
if csiTask != nil && csiTask.GetKnownStatus() == status.TaskRunning {
logger.Debug("EBS Managed Daemon is running", logger.Fields{field.TaskID: csiTask.GetID()})
return true
}
if csiTask != nil && csiTask.GetKnownStatus() < status.TaskRunning {
logger.Debug("EBS Managed Daemon task is pending transitioning to running", logger.Fields{
field.TaskID: csiTask.GetID(),
field.KnownStatus: csiTask.GetKnownStatus(),
})
return false
}

// Task is neither running nor about to run. We need to start a new one.

if csiTask == nil {
logger.Info("EBS Managed Daemon task has not been initialized. Will start a new one.")
} else {
logger.Info("EBS Managed Daemon task is beyond running state. Will start a new one.", logger.Fields{
field.TaskID: csiTask.GetID(),
field.KnownStatus: csiTask.GetKnownStatus(),
})
}

ebsCsiDaemonManager, ok := w.taskEngine.GetDaemonManagers()[md.EbsCsiDriver]
if !ok {
log.Errorf("EBS Daemon Manager is not Initialized. EBS Task Attach is not supported.")
return false
}

// Check if Managed Daemon image has been loaded.
imageLoaded, err := ebsCsiDaemonManager.IsLoaded(w.dockerClient)
if !imageLoaded {
logger.Info("Image is not loaded yet so can't start a Managed Daemon task.", logger.Fields{
"ImageRef": ebsCsiDaemonManager.GetManagedDaemon().GetImageRef(),
field.Error: err,
})
return false
}
logger.Debug("Managed Daemon image has been loaded", logger.Fields{
"ImageRef": ebsCsiDaemonManager.GetManagedDaemon().GetImageRef(),
})

// Create a new Managed Daemon task.
csiTask, err = ebsCsiDaemonManager.CreateDaemonTask()
if err != nil {
// Failed to create the task. There is nothing that the watcher can do at this time
// so swallow the error and try again later.
logger.Error("Failed to create EBS Managed Daemon task.", logger.Fields{field.Error: err})
return false
}

// Add the new task to task engine.
w.taskEngine.SetDaemonTask(md.EbsCsiDriver, csiTask)
w.taskEngine.AddTask(csiTask)
logger.Info("Added EBS Managed Daemon task to task engine", logger.Fields{
field.TaskID: csiTask.GetID(),
})

// Task is not confirmed to be running yet, so return false.
return false
}

// Stop will stop the EBS watcher
func (w *EBSWatcher) Stop() {
log.Info("Stopping EBS watcher.")
Expand All @@ -107,7 +195,6 @@ func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) {

// HandleResourceAttachment processes the resource attachment message. It will:
// 1. Check whether we already have this attachment in state and if so it's a noop.
// 2. Start the EBS CSI driver if it's not already running
// 3. Otherwise add the attachment to state, start its ack timer, and save to the agent state.
func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment) error {
attachmentType := ebs.GetAttachmentType()
Expand All @@ -125,29 +212,6 @@ func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment)
})
}

// start EBS CSI Driver Managed Daemon
// We want to avoid creating a new CSI driver task if there's already one that's not been stopped.
if runningCsiTask := w.taskEngine.GetDaemonTask(md.EbsCsiDriver); runningCsiTask != nil && !runningCsiTask.GetKnownStatus().Terminal() {
log.Debugf("engine ebs CSI driver is running with taskID: %v", runningCsiTask.GetID())
} else {
if ebsCsiDaemonManager, ok := w.taskEngine.GetDaemonManagers()[md.EbsCsiDriver]; ok {
if csiTask, err := ebsCsiDaemonManager.CreateDaemonTask(); err != nil {
// fail attachment and return
log.Errorf("Unable to start ebsCsiDaemon in the engine: error: %s", err)
if csiTask != nil {
log.Errorf("CSI task Error task ID: %s", csiTask.GetID())
}
return err
} else {
w.taskEngine.SetDaemonTask(md.EbsCsiDriver, csiTask)
w.taskEngine.AddTask(csiTask)
log.Infof("task_engine: Added EBS CSI task to engine")
}
} else {
log.Errorf("CSI Driver is not Initialized")
}
}

if err := w.addEBSAttachmentToState(ebs); err != nil {
return fmt.Errorf("%w; attach %v message handler: unable to add ebs attachment to engine state: %v",
err, attachmentType, ebs.EBSToString())
Expand Down
Loading