Skip to content

Commit

Permalink
Add file watcher for Appnet agent image update
Browse files Browse the repository at this point in the history
  • Loading branch information
mythri-garaga committed Oct 19, 2022
1 parent b2bed73 commit d0ca83b
Show file tree
Hide file tree
Showing 34 changed files with 2,690 additions and 22 deletions.
3 changes: 3 additions & 0 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ const (
stopContainerBackoffJitter = 0.2
stopContainerBackoffMultiplier = 1.3
stopContainerMaxRetryCount = 5

serviceConnectAppnetAgenTarballDir = "/var/lib/ecs/deps/serviceconnect/"
)

var newExponentialBackoff = retry.NewExponentialBackoff
Expand Down Expand Up @@ -281,6 +283,7 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
go engine.handleDockerEvents(derivedCtx)
engine.initialized = true
go engine.startPeriodicExecAgentsMonitoring(derivedCtx)
go engine.watchAppNetImage(engine.ctx)
return nil
}

Expand Down
81 changes: 81 additions & 0 deletions agent/engine/docker_task_engine_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
package engine

import (
"context"
"fmt"
"time"

"github.com/fsnotify/fsnotify"

apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status"
"github.com/aws/amazon-ecs-agent/agent/logger"
)

const (
Expand All @@ -39,3 +45,78 @@ func (engine *DockerTaskEngine) updateTaskENIDependencies(task *apitask.Task) {
func (engine *DockerTaskEngine) invokePluginsForContainer(task *apitask.Task, container *apicontainer.Container) error {
return nil
}

func (engine *DockerTaskEngine) watchAppNetImage(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logger.Error(fmt.Sprintf("failed to initialize fsnotify NewWatcher, error: %v", err))
}
appnetContainerTarballDir := engine.serviceconnectManager.GetAppnetContainerTarballDir()
err = watcher.Add(appnetContainerTarballDir)
if err != nil {
logger.Error(fmt.Sprintf("error adding %s to fsnotify watcher, error: %v", appnetContainerTarballDir, err))
}
defer watcher.Close()

// Start listening for events.
for {
select {
case event, ok := <-watcher.Events:
if !ok {
logger.Warn("fsnotify event watcher channel is closed")
return
}
// check if the event file operation is write or create
const writeOrCreateMask = fsnotify.Write | fsnotify.Create
if event.Op&writeOrCreateMask != 0 {
logger.Debug(fmt.Sprintf("new fsnotify watcher event: %s", event.Name))
// reload the updated Appnet Agent image
if err := engine.reloadAppNetImage(); err == nil {
// restart the internal instance relay task with
// updated Appnet Agent image
engine.restartInstanceTask()
}
}
case err, ok := <-watcher.Errors:
if !ok {
logger.Warn("fsnotify event watcher channel is closed")
return
}
logger.Error(fmt.Sprintf("fsnotify watcher error: %v", err))
case <-ctx.Done():
return
}
}
}

func (engine *DockerTaskEngine) reloadAppNetImage() error {
_, err := engine.serviceconnectManager.LoadImage(engine.ctx, engine.cfg, engine.client)
if err != nil {
logger.Error(fmt.Sprintf("engine: Failed to reload appnet Agent container, error: %v", err))
return err
}
return nil
}

func (engine *DockerTaskEngine) restartInstanceTask() {
if engine.serviceconnectRelay != nil {
serviceconnectRelayTask, err := engine.serviceconnectManager.CreateInstanceTask(engine.cfg)
if err != nil {
logger.Error(fmt.Sprintf("Unable to start relay for task in the engine: %v", err))
return
}
// clean up instance relay task
for _, container := range engine.serviceconnectRelay.Containers {
if container.Type == apicontainer.ContainerServiceConnectRelay {
engine.stopContainer(engine.serviceconnectRelay, container)
}
}
engine.serviceconnectRelay.SetDesiredStatus(apitaskstatus.TaskStopped)
engine.sweepTask(engine.serviceconnectRelay)
engine.deleteTask(engine.serviceconnectRelay)

engine.serviceconnectRelay = serviceconnectRelayTask
engine.AddTask(engine.serviceconnectRelay)
logger.Info("engine: Restarted AppNet Relay task")
}
}
38 changes: 33 additions & 5 deletions agent/engine/docker_task_engine_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -81,7 +82,7 @@ func init() {
func TestResourceContainerProgression(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, _, imageManager, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, client, mockTime, taskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

sleepTask := testdata.LoadTask("sleep5")
Expand All @@ -105,6 +106,7 @@ func TestResourceContainerProgression(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
gomock.InOrder(
// Ensure that the resource is created first
mockControl.EXPECT().Exists(gomock.Any()).Return(false),
Expand Down Expand Up @@ -250,7 +252,7 @@ func TestDeleteTaskBranchENIEnabled(t *testing.T) {
func TestResourceContainerProgressionFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, _, _, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, client, mockTime, taskEngine, _, _, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()
sleepTask := testdata.LoadTask("sleep5")
sleepContainer := sleepTask.Containers[0]
Expand All @@ -267,6 +269,7 @@ func TestResourceContainerProgressionFailure(t *testing.T) {
sleepTask.AddResource("cgroup", cgroupResource)
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
gomock.InOrder(
// resource creation failure
mockControl.EXPECT().Exists(gomock.Any()).Return(false),
Expand Down Expand Up @@ -307,7 +310,7 @@ func TestTaskCPULimitHappyPath(t *testing.T) {
metadataConfig.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, credentialsManager, imageManager, metadataManager, _ := mocks(
ctrl, client, mockTime, taskEngine, credentialsManager, imageManager, metadataManager, serviceConnectManager := mocks(
t, ctx, &metadataConfig)
defer ctrl.Finish()

Expand All @@ -328,6 +331,7 @@ func TestTaskCPULimitHappyPath(t *testing.T) {
containerEventsWG := sync.WaitGroup{}

client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
containerName := make(chan string)
go func() {
name := <-containerName
Expand Down Expand Up @@ -590,7 +594,7 @@ func TestBuildCNIConfigFromTaskContainer(t *testing.T) {
func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, _, imageManager, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, client, mockTime, taskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

mockCNIClient := mock_ecscni.NewMockCNIClient(ctrl)
Expand All @@ -616,6 +620,7 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
containerEventsWG := sync.WaitGroup{}

client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
// We cannot rely on the order of pulls between images as they can still be downloaded in
// parallel. The dependency graph enforcement comes into effect for CREATED transitions.
// Hence, do not enforce the order of invocation of these calls
Expand Down Expand Up @@ -728,7 +733,7 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
func TestPauseContainerHappyPath(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, dockerClient, mockTime, taskEngine, _, imageManager, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, dockerClient, mockTime, taskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

cniClient := mock_ecscni.NewMockCNIClient(ctrl)
Expand Down Expand Up @@ -758,6 +763,7 @@ func TestPauseContainerHappyPath(t *testing.T) {
})

dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

sleepContainerID1 := containerID + "1"
sleepContainerID2 := containerID + "2"
Expand Down Expand Up @@ -985,6 +991,7 @@ func TestContainersWithServiceConnect(t *testing.T) {
sleepTask.AddTaskENI(mockENI)

dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

sleepContainerID1 := containerID + "1"
sleepContainerID2 := containerID + "2"
Expand Down Expand Up @@ -1136,6 +1143,7 @@ func TestContainersWithServiceConnect_BridgeMode(t *testing.T) {
})

dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

sleepContainerID := containerID + "1"
scContainerID := "serviceConnectID"
Expand Down Expand Up @@ -1364,3 +1372,23 @@ func TestProvisionContainerResourcesBridgeModeWithServiceConnect(t *testing.T) {
require.Nil(t, taskEngine.(*DockerTaskEngine).provisionContainerResources(testTask, cont).Error)
}
}

func TestWatchAppNetImage(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, _, _, taskEngine, _, _, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

tempServiceConnectAppnetAgenTarballDir := t.TempDir()

serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().Return(tempServiceConnectAppnetAgenTarballDir).AnyTimes()
serviceConnectManager.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

watcherCtx, watcherCancel := context.WithTimeout(context.Background(), time.Second)
defer watcherCancel()
go taskEngine.(*DockerTaskEngine).watchAppNetImage(watcherCtx)
_, err := os.CreateTemp(tempServiceConnectAppnetAgenTarballDir, "agent.tar")
assert.NoError(t, err)

<-watcherCtx.Done()
}
Loading

0 comments on commit d0ca83b

Please sign in to comment.