Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Remove task serialization and use host resource manager for task reso…
Browse files Browse the repository at this point in the history
…urces (#3723)
prateekchaudhry authored Jun 1, 2023
1 parent 3348d1d commit e00484f
Showing 19 changed files with 272 additions and 396 deletions.
1 change: 0 additions & 1 deletion agent/acs/handler/acs_handler_test.go
Original file line number Diff line number Diff line change
@@ -1313,7 +1313,6 @@ func validateAddedTask(expectedTask apitask.Task, addedTask apitask.Task) error
Family: addedTask.Family,
Version: addedTask.Version,
DesiredStatusUnsafe: addedTask.GetDesiredStatus(),
StartSequenceNumber: addedTask.StartSequenceNumber,
}

if !reflect.DeepEqual(expectedTask, taskToCompareFromAdded) {
4 changes: 4 additions & 0 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
@@ -315,6 +315,10 @@ func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error)
}
resourceMap := make(map[string]*ecs.Resource)
for _, resource := range resources {
if *resource.Name == "PORTS" {
// Except for RCI, TCP Ports are named as PORTS_TCP in agent for Host Resources purpose
resource.Name = utils.Strptr("PORTS_TCP")
}
resourceMap[*resource.Name] = resource
}
return resourceMap, nil
38 changes: 10 additions & 28 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
@@ -234,9 +234,6 @@ type Task struct {
// is handled properly so that the state storage continues to work.
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`

StartSequenceNumber int64
StopSequenceNumber int64

// ExecutionCredentialsID is the ID of credentials that are used by agent to
// perform some action at the task level, such as pulling image from ECR
ExecutionCredentialsID string `json:"executionCredentialsID"`
@@ -312,11 +309,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
if err := json.Unmarshal(data, task); err != nil {
return nil, err
}
if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil {
task.StartSequenceNumber = *envelope.SeqNum
} else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil {
task.StopSequenceNumber = *envelope.SeqNum
}

// Overrides the container command if it's set
for _, container := range task.Containers {
@@ -2831,22 +2823,6 @@ func (task *Task) GetAppMesh() *apiappmesh.AppMesh {
return task.AppMesh
}

// GetStopSequenceNumber returns the stop sequence number of a task
func (task *Task) GetStopSequenceNumber() int64 {
task.lock.RLock()
defer task.lock.RUnlock()

return task.StopSequenceNumber
}

// SetStopSequenceNumber sets the stop seqence number of a task
func (task *Task) SetStopSequenceNumber(seqnum int64) {
task.lock.Lock()
defer task.lock.Unlock()

task.StopSequenceNumber = seqnum
}

// SetPullStartedAt sets the task pullstartedat timestamp and returns whether
// this field was updated or not
func (task *Task) SetPullStartedAt(timestamp time.Time) bool {
@@ -3522,10 +3498,6 @@ func (task *Task) IsServiceConnectConnectionDraining() bool {
//
// * GPU
// - Return num of gpus requested (len of GPUIDs field)
//
// TODO remove this once ToHostResources is used
//
//lint:file-ignore U1000 Ignore all unused code
func (task *Task) ToHostResources() map[string]*ecs.Resource {
resources := make(map[string]*ecs.Resource)
// CPU
@@ -3639,3 +3611,13 @@ func (task *Task) ToHostResources() map[string]*ecs.Resource {
})
return resources
}

func (task *Task) HasActiveContainers() bool {
for _, container := range task.Containers {
containerStatus := container.GetKnownStatus()
if containerStatus >= apicontainerstatus.ContainerPulled && containerStatus <= apicontainerstatus.ContainerResourcesProvisioned {
return true
}
}
return false
}
7 changes: 3 additions & 4 deletions agent/api/task/task_test.go
Original file line number Diff line number Diff line change
@@ -1860,10 +1860,9 @@ func TestTaskFromACS(t *testing.T) {
Type: "elastic-inference",
},
},
StartSequenceNumber: 42,
CPU: 2.0,
Memory: 512,
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
CPU: 2.0,
Memory: 512,
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
}

seqNum := int64(42)
4 changes: 0 additions & 4 deletions agent/api/task/taskvolume_test.go
Original file line number Diff line number Diff line change
@@ -119,8 +119,6 @@ func TestMarshalTaskVolumesEFS(t *testing.T) {
"PullStoppedAt": "0001-01-01T00:00:00Z",
"ExecutionStoppedAt": "0001-01-01T00:00:00Z",
"SentStatus": "NONE",
"StartSequenceNumber": 0,
"StopSequenceNumber": 0,
"executionCredentialsID": "",
"ENI": null,
"AppMesh": null,
@@ -168,8 +166,6 @@ func TestUnmarshalTaskVolumesEFS(t *testing.T) {
"PullStoppedAt": "0001-01-01T00:00:00Z",
"ExecutionStoppedAt": "0001-01-01T00:00:00Z",
"SentStatus": "NONE",
"StartSequenceNumber": 0,
"StopSequenceNumber": 0,
"executionCredentialsID": "",
"ENI": null,
"AppMesh": null,
4 changes: 0 additions & 4 deletions agent/api/task/taskvolume_windows_test.go
Original file line number Diff line number Diff line change
@@ -77,8 +77,6 @@ func TestMarshalTaskVolumeFSxWindowsFileServer(t *testing.T) {
"PullStoppedAt": "0001-01-01T00:00:00Z",
"ExecutionStoppedAt": "0001-01-01T00:00:00Z",
"SentStatus": "NONE",
"StartSequenceNumber": 0,
"StopSequenceNumber": 0,
"executionCredentialsID": "",
"ENI": null,
"AppMesh": null,
@@ -118,8 +116,6 @@ func TestUnmarshalTaskVolumeFSxWindowsFileServer(t *testing.T) {
"PullStoppedAt": "0001-01-01T00:00:00Z",
"ExecutionStoppedAt": "0001-01-01T00:00:00Z",
"SentStatus": "NONE",
"StartSequenceNumber": 0,
"StopSequenceNumber": 0,
"executionCredentialsID": "",
"ENI": null,
"AppMesh": null,
2 changes: 2 additions & 0 deletions agent/app/agent_unix_test.go
Original file line number Diff line number Diff line change
@@ -478,6 +478,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) {
state.EXPECT().AllImageStates().Return(nil),
state.EXPECT().AllENIAttachments().Return(nil),
state.EXPECT().AllTasks().Return(nil),
state.EXPECT().AllTasks().Return(nil),
client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Do(func(x interface{}) {
// Ensures that the test waits until acs session has bee started
discoverEndpointsInvoked.Done()
@@ -646,6 +647,7 @@ func TestDoStartGPUManagerHappyPath(t *testing.T) {
state.EXPECT().AllImageStates().Return(nil),
state.EXPECT().AllENIAttachments().Return(nil),
state.EXPECT().AllTasks().Return(nil),
state.EXPECT().AllTasks().Return(nil),
client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Do(func(x interface{}) {
// Ensures that the test waits until acs session has been started
discoverEndpointsInvoked.Done()
4 changes: 3 additions & 1 deletion agent/engine/common_integ_test.go
Original file line number Diff line number Diff line change
@@ -225,14 +225,16 @@ func skipIntegTestIfApplicable(t *testing.T) {
}
}

// Values in host resources from getTestHostResources() should be looked at and CPU/Memory assigned
// accordingly
func createTestContainerWithImageAndName(image string, name string) *apicontainer.Container {
return &apicontainer.Container{
Name: name,
Image: image,
Command: []string{},
Essential: true,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 1024,
CPU: 256,
Memory: 128,
}
}
12 changes: 6 additions & 6 deletions agent/engine/docker_image_manager_integ_test.go
Original file line number Diff line number Diff line change
@@ -568,23 +568,23 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task {
Image: test1Image1Name,
Essential: false,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
},
{
Name: "test2",
Image: test1Image2Name,
Essential: false,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
},
{
Name: "test3",
Image: test1Image3Name,
Essential: false,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
},
},
@@ -603,23 +603,23 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task {
Image: test2Image1Name,
Essential: false,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
},
{
Name: "test2",
Image: test2Image2Name,
Essential: false,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
},
{
Name: "test3",
Image: test2Image3Name,
Essential: false,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
},
},
161 changes: 145 additions & 16 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
@@ -53,7 +53,6 @@ import (
"github.com/aws/amazon-ecs-agent/agent/taskresource/firelens"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/utils/retry"
utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync"
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"
"github.com/aws/aws-sdk-go/aws"
"github.com/docker/docker/api/types"
@@ -135,10 +134,12 @@ type DockerTaskEngine struct {
state dockerstate.TaskEngineState
managedTasks map[string]*managedTask

taskStopGroup *utilsync.SequentialWaitGroup
// waitingTasksQueue is a FIFO queue of tasks waiting to acquire host resources
waitingTaskQueue []*managedTask

events <-chan dockerapi.DockerContainerChangeEvent
stateChangeEvents chan statechange.Event
events <-chan dockerapi.DockerContainerChangeEvent
monitorQueuedTaskEvent chan struct{}
stateChangeEvents chan statechange.Event

client dockerapi.DockerClient
dataClient data.Client
@@ -154,6 +155,8 @@ type DockerTaskEngine struct {
// all tasks, it must not acquire it for any significant duration
// The write mutex should be taken when adding and removing tasks from managedTasks.
tasksLock sync.RWMutex
// waitingTasksLock is a mutext for operations on waitingTasksQueue
waitingTasksLock sync.RWMutex

credentialsManager credentials.Manager
_time ttime.Time
@@ -207,10 +210,10 @@ func NewDockerTaskEngine(cfg *config.Config,
client: client,
dataClient: data.NewNoopClient(),

state: state,
managedTasks: make(map[string]*managedTask),
taskStopGroup: utilsync.NewSequentialWaitGroup(),
stateChangeEvents: make(chan statechange.Event),
state: state,
managedTasks: make(map[string]*managedTask),
stateChangeEvents: make(chan statechange.Event),
monitorQueuedTaskEvent: make(chan struct{}, 1),

credentialsManager: credentialsManager,

@@ -238,6 +241,37 @@ func NewDockerTaskEngine(cfg *config.Config,
return dockerTaskEngine
}

// Reconcile state of host resource manager with task status in managedTasks Slice
// Done on agent restarts
func (engine *DockerTaskEngine) reconcileHostResources() {
logger.Info("Reconciling host resources")
for _, task := range engine.state.AllTasks() {
taskStatus := task.GetKnownStatus()
resources := task.ToHostResources()

// Release stopped tasks host resources
// Call to release here for stopped tasks should always succeed
// Idempotent release call
if taskStatus.Terminal() {
err := engine.hostResourceManager.release(task.Arn, resources)
if err != nil {
logger.Critical("Failed to release resources during reconciliation", logger.Fields{field.TaskARN: task.Arn})
}
continue
}

// Consume host resources if task has progressed (check if any container has progressed)
// Call to consume here should always succeed
// Idempotent consume call
if !task.IsInternal && task.HasActiveContainers() {
consumed, err := engine.hostResourceManager.consume(task.Arn, resources)
if err != nil || !consumed {
logger.Critical("Failed to consume resources for created/running tasks during reconciliation", logger.Fields{field.TaskARN: task.Arn})
}
}
}
}

func (engine *DockerTaskEngine) initializeContainerStatusToTransitionFunction() {
containerStatusToTransitionFunction := map[apicontainerstatus.ContainerStatus]transitionApplyFunc{
apicontainerstatus.ContainerPulled: engine.pullContainer,
@@ -280,6 +314,7 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return err
}
engine.synchronizeState()
go engine.monitorQueuedTasks(derivedCtx)
// Now catch up and start processing new events per normal
go engine.handleDockerEvents(derivedCtx)
engine.initialized = true
@@ -288,6 +323,96 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return nil
}

// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent
// but does not block if monitorQueuedTasks is already processing queued tasks
func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() {
select {
case engine.monitorQueuedTaskEvent <- struct{}{}:
default:
// do nothing
}
}

func (engine *DockerTaskEngine) topTask() (*managedTask, error) {
engine.waitingTasksLock.Lock()
defer engine.waitingTasksLock.Unlock()
if len(engine.waitingTaskQueue) > 0 {
return engine.waitingTaskQueue[0], nil
}
return nil, fmt.Errorf("no tasks in waiting queue")
}

func (engine *DockerTaskEngine) enqueueTask(task *managedTask) {
engine.waitingTasksLock.Lock()
engine.waitingTaskQueue = append(engine.waitingTaskQueue, task)
engine.waitingTasksLock.Unlock()
logger.Debug("Enqueued task in Waiting Task Queue", logger.Fields{field.TaskARN: task.Arn})
engine.wakeUpTaskQueueMonitor()
}

func (engine *DockerTaskEngine) dequeueTask() (*managedTask, error) {
engine.waitingTasksLock.Lock()
defer engine.waitingTasksLock.Unlock()
if len(engine.waitingTaskQueue) > 0 {
task := engine.waitingTaskQueue[0]
engine.waitingTaskQueue = engine.waitingTaskQueue[1:]
logger.Debug("Dequeued task from Waiting Task Queue", logger.Fields{field.TaskARN: task.Arn})
return task, nil
}

return nil, fmt.Errorf("no tasks in waiting queue")
}

// monitorQueuedTasks starts as many tasks as possible based on FIFO order of waitingTaskQueue
// and availability of host resources. When no more tasks can be started, it will wait on
// monitorQueuedTaskEvent channel. This channel receives (best effort) messages when
// - a task stops
// - a new task is queued up
// It does not need to receive all messages, as if the routine is going through the queue, it
// may schedule more than one task for a single 'event' received
func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
logger.Info("Monitoring Task Queue started")
for {
select {
case <-ctx.Done():
return
case <-engine.monitorQueuedTaskEvent:
// Dequeue as many tasks as possible and start wake up their goroutines
for {
task, err := engine.topTask()
if err != nil {
break
}
taskHostResources := task.ToHostResources()
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
}
if consumed {
engine.startWaitingTask()
} else {
// not consumed, go to wait
break
}
}
logger.Debug("No more tasks could be started at this moment, waiting")
}
}
}

func (engine *DockerTaskEngine) failWaitingTask(err error) {
task, _ := engine.dequeueTask()
logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn})
task.SetDesiredStatus(apitaskstatus.TaskStopped)
task.consumedHostResourceEvent <- struct{}{}
}

func (engine *DockerTaskEngine) startWaitingTask() {
task, _ := engine.dequeueTask()
logger.Info("Host resources consumed, progressing task", logger.Fields{field.TaskARN: task.Arn})
task.consumedHostResourceEvent <- struct{}{}
}

func (engine *DockerTaskEngine) startPeriodicExecAgentsMonitoring(ctx context.Context) {
engine.monitorExecAgentsTicker = time.NewTicker(engine.monitorExecAgentsInterval)
for {
@@ -469,6 +594,9 @@ func (engine *DockerTaskEngine) synchronizeState() {
engine.saveTaskData(task)
}

// Before starting managedTask goroutines, pre-allocate resources for already running
// tasks in host resource manager
engine.reconcileHostResources()
for _, task := range tasksToStart {
engine.startTask(task)
}
@@ -493,11 +621,6 @@ func (engine *DockerTaskEngine) filterTasksToStartUnsafe(tasks []*apitask.Task)
}

tasksToStart = append(tasksToStart, task)

// Put tasks that are stopped by acs but hasn't been stopped in wait group
if task.GetDesiredStatus().Terminal() && task.GetStopSequenceNumber() != 0 {
engine.taskStopGroup.Add(task.GetStopSequenceNumber(), 1)
}
}

return tasksToStart
@@ -785,6 +908,15 @@ func (engine *DockerTaskEngine) deleteTask(task *apitask.Task) {
}

func (engine *DockerTaskEngine) emitTaskEvent(task *apitask.Task, reason string) {
if task.GetKnownStatus().Terminal() {
// Always do (idempotent) release host resources whenever state change with
// known status == STOPPED is done to ensure sync between tasks and host resource manager
resourcesToRelease := task.ToHostResources()
err := engine.hostResourceManager.release(task.Arn, resourcesToRelease)
if err != nil {
logger.Critical("Failed to release resources after test stopped", logger.Fields{field.TaskARN: task.Arn})
}
}
event, err := api.NewTaskStateChangeEvent(task, reason)
if err != nil {
if _, ok := err.(api.ErrShouldNotSendEvent); ok {
@@ -2124,16 +2256,13 @@ func (engine *DockerTaskEngine) updateTaskUnsafe(task *apitask.Task, update *api
logger.Debug("Putting update on the acs channel", logger.Fields{
field.TaskID: task.GetID(),
field.DesiredStatus: updateDesiredStatus.String(),
field.Sequence: update.StopSequenceNumber,
})
managedTask.emitACSTransition(acsTransition{
desiredStatus: updateDesiredStatus,
seqnum: update.StopSequenceNumber,
})
logger.Debug("Update taken off the acs channel", logger.Fields{
field.TaskID: task.GetID(),
field.DesiredStatus: updateDesiredStatus.String(),
field.Sequence: update.StopSequenceNumber,
})
}

82 changes: 2 additions & 80 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
@@ -566,7 +566,6 @@ func TestStopWithPendingStops(t *testing.T) {
testTime.EXPECT().After(gomock.Any()).AnyTimes()

sleepTask1 := testdata.LoadTask("sleep5")
sleepTask1.StartSequenceNumber = 5
sleepTask2 := testdata.LoadTask("sleep5")
sleepTask2.Arn = "arn2"
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
@@ -594,13 +593,11 @@ func TestStopWithPendingStops(t *testing.T) {
stopSleep2 := testdata.LoadTask("sleep5")
stopSleep2.Arn = "arn2"
stopSleep2.SetDesiredStatus(apitaskstatus.TaskStopped)
stopSleep2.StopSequenceNumber = 4
taskEngine.AddTask(stopSleep2)

taskEngine.AddTask(sleepTask1)
stopSleep1 := testdata.LoadTask("sleep5")
stopSleep1.SetDesiredStatus(apitaskstatus.TaskStopped)
stopSleep1.StopSequenceNumber = 5
taskEngine.AddTask(stopSleep1)
pullDone <- true
// this means the PullImage is only called once due to the task is stopped before it
@@ -1639,11 +1636,11 @@ func TestPullAndUpdateContainerReference(t *testing.T) {
// agent starts, container created, metadata file created, agent restarted, container recovered
// during task engine init, metadata file updated
func TestMetadataFileUpdatedAgentRestart(t *testing.T) {
conf := &defaultConfig
conf := defaultConfig
conf.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, _, privateTaskEngine, _, imageManager, metadataManager, serviceConnectManager := mocks(t, ctx, conf)
ctrl, client, _, privateTaskEngine, _, imageManager, metadataManager, serviceConnectManager := mocks(t, ctx, &conf)
defer ctrl.Finish()

var metadataUpdateWG sync.WaitGroup
@@ -1869,81 +1866,6 @@ func TestNewTaskTransitionOnRestart(t *testing.T) {
assert.True(t, ok, "task wasnot started")
}

// TestTaskWaitForHostResourceOnRestart tests task stopped by acs but hasn't
// reached stopped should block the later task to start
func TestTaskWaitForHostResourceOnRestart(t *testing.T) {
// Task 1 stopped by backend
taskStoppedByACS := testdata.LoadTask("sleep5")
taskStoppedByACS.SetDesiredStatus(apitaskstatus.TaskStopped)
taskStoppedByACS.SetStopSequenceNumber(1)
taskStoppedByACS.SetKnownStatus(apitaskstatus.TaskRunning)
// Task 2 has essential container stopped
taskEssentialContainerStopped := testdata.LoadTask("sleep5")
taskEssentialContainerStopped.Arn = "task_Essential_Container_Stopped"
taskEssentialContainerStopped.SetDesiredStatus(apitaskstatus.TaskStopped)
taskEssentialContainerStopped.SetKnownStatus(apitaskstatus.TaskRunning)
// Normal task 3 needs to be started
taskNotStarted := testdata.LoadTask("sleep5")
taskNotStarted.Arn = "task_Not_started"

conf := &defaultConfig
conf.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, _, privateTaskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, conf)
defer ctrl.Finish()

client.EXPECT().Version(gomock.Any(), gomock.Any()).MaxTimes(1)
client.EXPECT().ContainerEvents(gomock.Any()).MaxTimes(1)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

err := privateTaskEngine.Init(ctx)
assert.NoError(t, err)

taskEngine := privateTaskEngine.(*DockerTaskEngine)
taskEngine.State().AddTask(taskStoppedByACS)
taskEngine.State().AddTask(taskNotStarted)
taskEngine.State().AddTask(taskEssentialContainerStopped)

taskEngine.State().AddContainer(&apicontainer.DockerContainer{
Container: taskStoppedByACS.Containers[0],
DockerID: containerID + "1",
DockerName: dockerContainerName + "1",
}, taskStoppedByACS)
taskEngine.State().AddContainer(&apicontainer.DockerContainer{
Container: taskNotStarted.Containers[0],
DockerID: containerID + "2",
DockerName: dockerContainerName + "2",
}, taskNotStarted)
taskEngine.State().AddContainer(&apicontainer.DockerContainer{
Container: taskEssentialContainerStopped.Containers[0],
DockerID: containerID + "3",
DockerName: dockerContainerName + "3",
}, taskEssentialContainerStopped)

// these are performed in synchronizeState on restart
client.EXPECT().DescribeContainer(gomock.Any(), gomock.Any()).Return(apicontainerstatus.ContainerRunning, dockerapi.DockerContainerMetadata{
DockerID: containerID,
}).Times(3)
imageManager.EXPECT().RecordContainerReference(gomock.Any()).Times(3)
// start the two tasks
taskEngine.synchronizeState()

var waitStopWG sync.WaitGroup
waitStopWG.Add(1)
go func() {
// This is to confirm the other task is waiting
time.Sleep(1 * time.Second)
// Remove the task sequence number 1 from waitgroup
taskEngine.taskStopGroup.Done(1)
waitStopWG.Done()
}()

// task with sequence number 2 should wait until 1 is removed from the waitgroup
taskEngine.taskStopGroup.Wait(2)
waitStopWG.Wait()
}

// TestPullStartedStoppedAtWasSetCorrectly tests the PullStartedAt and PullStoppedAt
// was set correctly
func TestPullStartedStoppedAtWasSetCorrectly(t *testing.T) {
2 changes: 0 additions & 2 deletions agent/engine/dockerstate/json_test.go
Original file line number Diff line number Diff line change
@@ -124,8 +124,6 @@ const (
"KnownStatus": "RUNNING",
"KnownTime": "2017-11-01T20:24:21.449897483Z",
"SentStatus": "RUNNING",
"StartSequenceNumber": 9,
"StopSequenceNumber": 0,
"ENI": {
"ec2Id": "eni-abcd",
"IPV4Addresses": [
4 changes: 3 additions & 1 deletion agent/engine/engine_windows_integ_test.go
Original file line number Diff line number Diff line change
@@ -75,13 +75,15 @@ var endpoint = utils.DefaultIfBlank(os.Getenv(DockerEndpointEnvVariable), docker
// TODO implement this
func isDockerRunning() bool { return true }

// Values in host resources from getTestHoustResources() should be looked at and CPU/Memory assigned
// accordingly
func createTestContainer() *apicontainer.Container {
return &apicontainer.Container{
Name: "windows",
Image: testBaseImage,
Essential: true,
DesiredStatusUnsafe: apicontainerstatus.ContainerRunning,
CPU: 512,
CPU: 256,
Memory: 256,
}
}
15 changes: 11 additions & 4 deletions agent/engine/host_resource_manager.go
Original file line number Diff line number Diff line change
@@ -25,9 +25,6 @@ import (
"github.com/aws/amazon-ecs-agent/agent/utils"
)

// TODO remove this once resource, consume are used
//lint:file-ignore U1000 Ignore all unused code

const (
CPU = "CPU"
GPU = "GPU"
@@ -92,6 +89,13 @@ func (h *HostResourceManager) consumeStringSetType(resourceType string, resource
}
}

func (h *HostResourceManager) checkTaskConsumed(taskArn string) bool {
h.hostResourceManagerRWLock.Lock()
defer h.hostResourceManagerRWLock.Unlock()
_, ok := h.taskConsumed[taskArn]
return ok
}

// Returns if resources consumed or not and error status
// false, nil -> did not consume, task should stay pending
// false, err -> resources map has errors, task should fail as cannot schedule with 'wrong' resource map (this basically never happens)
@@ -195,7 +199,7 @@ func (h *HostResourceManager) checkResourcesHealth(resources map[string]*ecs.Res
for resourceKey, resourceVal := range resources {
_, ok := h.initialHostResource[resourceKey]
if !ok {
logger.Error(fmt.Sprintf("resource %s not found in ", resourceKey))
logger.Error(fmt.Sprintf("resource %s not found in host resources", resourceKey))
return &InvalidHostResource{resourceKey}
}

@@ -256,6 +260,9 @@ func (h *HostResourceManager) consumable(resources map[string]*ecs.Resource) (bo
func removeSubSlice(s1 []*string, s2 []*string) []*string {
begin := 0
end := len(s1) - 1
if len(s2) == 0 {
return s1
}
for ; begin < len(s1); begin++ {
if *s1[begin] == *s2[0] {
break
80 changes: 25 additions & 55 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@ import (
"github.com/aws/amazon-ecs-agent/agent/taskresource"
resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status"
"github.com/aws/amazon-ecs-agent/agent/utils/retry"
utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync"
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"
)

@@ -135,12 +134,12 @@ type managedTask struct {
credentialsManager credentials.Manager
cniClient ecscni.CNIClient
dockerClient dockerapi.DockerClient
taskStopWG *utilsync.SequentialWaitGroup

acsMessages chan acsTransition
dockerMessages chan dockerContainerChange
resourceStateChangeEvent chan resourceStateChange
stateChangeEvents chan statechange.Event
consumedHostResourceEvent chan struct{}
containerChangeEventStream *eventstream.EventStream

// unexpectedStart is a once that controls stopping a container that
@@ -177,14 +176,14 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask
acsMessages: make(chan acsTransition),
dockerMessages: make(chan dockerContainerChange),
resourceStateChangeEvent: make(chan resourceStateChange),
consumedHostResourceEvent: make(chan struct{}, 1),
engine: engine,
cfg: engine.cfg,
stateChangeEvents: engine.stateChangeEvents,
containerChangeEventStream: engine.containerChangeEventStream,
credentialsManager: engine.credentialsManager,
cniClient: engine.cniClient,
dockerClient: engine.client,
taskStopWG: engine.taskStopGroup,
steadyStatePollInterval: engine.taskSteadyStatePollInterval,
steadyStatePollIntervalJitter: engine.taskSteadyStatePollIntervalJitter,
}
@@ -243,13 +242,8 @@ func (mtask *managedTask) overseeTask() {
mtask.engine.checkTearDownPauseContainer(mtask.Task)
// TODO [SC]: We need to also tear down pause containets in bridge mode for SC-enabled tasks
mtask.cleanupCredentials()
if mtask.StopSequenceNumber != 0 {
logger.Debug("Marking done for this sequence", logger.Fields{
field.TaskID: mtask.GetID(),
field.Sequence: mtask.StopSequenceNumber,
})
mtask.taskStopWG.Done(mtask.StopSequenceNumber)
}
// Send event to monitor queue task routine to check for any pending tasks to progress
mtask.engine.wakeUpTaskQueueMonitor()
// TODO: make this idempotent on agent restart
go mtask.releaseIPInIPAM()
mtask.cleanupTask(retry.AddJitter(mtask.cfg.TaskCleanupWaitDuration, mtask.cfg.TaskCleanupWaitDurationJitter))
@@ -275,43 +269,21 @@ func (mtask *managedTask) emitCurrentStatus() {
}

// waitForHostResources waits for host resources to become available to start
// the task. This involves waiting for previous stops to complete so the
// resources become free.
// the task. It will wait for event on this task's consumedHostResourceEvent
// channel from monitorQueuedTasks routine to wake up
func (mtask *managedTask) waitForHostResources() {
if mtask.StartSequenceNumber == 0 {
// This is the first transition on this host. No need to wait
return
}
if mtask.GetDesiredStatus().Terminal() {
// Task's desired status is STOPPED. No need to wait in this case either
return
}

logger.Info("Waiting for any previous stops to complete", logger.Fields{
field.TaskID: mtask.GetID(),
field.Sequence: mtask.StartSequenceNumber,
})

othersStoppedCtx, cancel := context.WithCancel(mtask.ctx)
defer cancel()

go func() {
mtask.taskStopWG.Wait(mtask.StartSequenceNumber)
cancel()
}()

for !mtask.waitEvent(othersStoppedCtx.Done()) {
if mtask.GetDesiredStatus().Terminal() {
// If we end up here, that means we received a start then stop for this
// task before a task that was expected to stop before it could
// actually stop
break
if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) {
// Internal tasks are started right away as their resources are not accounted for
mtask.engine.enqueueTask(mtask)
for !mtask.waitEvent(mtask.consumedHostResourceEvent) {
if mtask.GetDesiredStatus().Terminal() {
// If we end up here, that means we received a start then stop for this
// task before a task that was expected to stop before it could
// actually stop
break
}
}
}
logger.Info("Wait over; ready to move towards desired status", logger.Fields{
field.TaskID: mtask.GetID(),
field.DesiredStatus: mtask.GetDesiredStatus().String(),
})
}

// waitSteady waits for a task to leave steady-state by waiting for a new
@@ -406,26 +378,15 @@ func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.
field.TaskID: mtask.GetID(),
field.DesiredStatus: desiredStatus.String(),
field.Sequence: seqnum,
"StopNumber": mtask.StopSequenceNumber,
})
if desiredStatus <= mtask.GetDesiredStatus() {
logger.Debug("Redundant task transition; ignoring", logger.Fields{
field.TaskID: mtask.GetID(),
field.DesiredStatus: desiredStatus.String(),
field.Sequence: seqnum,
"StopNumber": mtask.StopSequenceNumber,
})
return
}
if desiredStatus == apitaskstatus.TaskStopped && seqnum != 0 && mtask.GetStopSequenceNumber() == 0 {
logger.Info("Managed task moving to stopped, adding to stopgroup with sequence number",
logger.Fields{
field.TaskID: mtask.GetID(),
field.Sequence: seqnum,
})
mtask.SetStopSequenceNumber(seqnum)
mtask.taskStopWG.Add(seqnum, 1)
}
mtask.SetDesiredStatus(desiredStatus)
mtask.UpdateDesiredStatus()
mtask.engine.saveTaskData(mtask.Task)
@@ -606,6 +567,15 @@ func getContainerEventLogFields(c api.ContainerStateChange) logger.Fields {

func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
taskKnownStatus := task.GetKnownStatus()
// Always do (idempotent) release host resources whenever state change with
// known status == STOPPED is done to ensure sync between tasks and host resource manager
if taskKnownStatus.Terminal() {
resourcesToRelease := mtask.ToHostResources()
err := mtask.engine.hostResourceManager.release(mtask.Arn, resourcesToRelease)
if err != nil {
logger.Critical("Failed to release resources after tast stopped", logger.Fields{field.TaskARN: mtask.Arn})
}
}
if !taskKnownStatus.BackendRecognized() {
logger.Debug("Skipping event emission for task", logger.Fields{
field.TaskID: mtask.GetID(),
2 changes: 0 additions & 2 deletions agent/engine/task_manager_data_test.go
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@ import (
resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status"
resourcetype "github.com/aws/amazon-ecs-agent/agent/taskresource/types"
"github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
@@ -84,7 +83,6 @@ func TestHandleDesiredStatusChangeSaveData(t *testing.T) {
engine: &DockerTaskEngine{
dataClient: dataClient,
},
taskStopWG: utilsync.NewSequentialWaitGroup(),
}
mtask.handleDesiredStatusChange(tc.targetDesiredStatus, int64(1))
tasks, err := dataClient.GetTasks()
85 changes: 58 additions & 27 deletions agent/engine/task_manager_test.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ import (
"github.com/aws/amazon-ecs-agent/agent/taskresource"
mock_taskresource "github.com/aws/amazon-ecs-agent/agent/taskresource/mocks"
resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status"
utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync"

"github.com/aws/amazon-ecs-agent/agent/api"
apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
@@ -831,6 +830,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) {
containerChangeEventStream := eventstream.NewEventStream(eventStreamName, context.Background())
containerChangeEventStream.StartListening()

hostResourceManager := NewHostResourceManager(getTestHostResources())
stateChangeEvents := make(chan statechange.Event)

task := &managedTask{
@@ -844,6 +844,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) {
containerChangeEventStream: containerChangeEventStream,
stateChangeEvents: stateChangeEvents,
dataClient: data.NewNoopClient(),
hostResourceManager: &hostResourceManager,
},
stateChangeEvents: stateChangeEvents,
containerChangeEventStream: containerChangeEventStream,
@@ -964,13 +965,15 @@ func TestWaitForContainerTransitionsForTerminalTask(t *testing.T) {

func TestOnContainersUnableToTransitionStateForDesiredStoppedTask(t *testing.T) {
stateChangeEvents := make(chan statechange.Event)
hostResourceManager := NewHostResourceManager(getTestHostResources())
task := &managedTask{
Task: &apitask.Task{
Containers: []*apicontainer.Container{},
DesiredStatusUnsafe: apitaskstatus.TaskStopped,
},
engine: &DockerTaskEngine{
stateChangeEvents: stateChangeEvents,
stateChangeEvents: stateChangeEvents,
hostResourceManager: &hostResourceManager,
},
stateChangeEvents: stateChangeEvents,
ctx: context.TODO(),
@@ -1780,31 +1783,6 @@ func TestHandleContainerChangeUpdateMetadataRedundant(t *testing.T) {
assert.Equal(t, timeNow, containerCreateTime)
}

func TestWaitForHostResources(t *testing.T) {
taskStopWG := utilsync.NewSequentialWaitGroup()
taskStopWG.Add(1, 1)
ctx, cancel := context.WithCancel(context.Background())

mtask := &managedTask{
ctx: ctx,
cancel: cancel,
taskStopWG: taskStopWG,
Task: &apitask.Task{
StartSequenceNumber: 1,
},
}

var waitForHostResourcesWG sync.WaitGroup
waitForHostResourcesWG.Add(1)
go func() {
mtask.waitForHostResources()
waitForHostResourcesWG.Done()
}()

taskStopWG.Done(1)
waitForHostResourcesWG.Wait()
}

func TestWaitForResourceTransition(t *testing.T) {
task := &managedTask{
Task: &apitask.Task{
@@ -2195,3 +2173,56 @@ func TestContainerNextStateDependsStoppedContainer(t *testing.T) {
})
}
}

// TestTaskWaitForHostResources tests task queuing behavior based on available host resources
func TestTaskWaitForHostResources(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// 1 vCPU available on host
hostResourceManager := NewHostResourceManager(getTestHostResources())
taskEngine := &DockerTaskEngine{
managedTasks: make(map[string]*managedTask),
monitorQueuedTaskEvent: make(chan struct{}, 1),
hostResourceManager: &hostResourceManager,
}
go taskEngine.monitorQueuedTasks(ctx)
// 3 tasks requesting 0.5 vCPUs each
tasks := []*apitask.Task{}
for i := 0; i < 3; i++ {
task := testdata.LoadTask("sleep5")
task.Arn = fmt.Sprintf("arn%d", i)
task.CPU = float64(0.5)
mtask := &managedTask{
Task: task,
engine: taskEngine,
consumedHostResourceEvent: make(chan struct{}, 1),
}
tasks = append(tasks, task)
taskEngine.managedTasks[task.Arn] = mtask
}

// acquire for host resources order arn0, arn1, arn2
go func() {
taskEngine.managedTasks["arn0"].waitForHostResources()
taskEngine.managedTasks["arn1"].waitForHostResources()
taskEngine.managedTasks["arn2"].waitForHostResources()
}()
time.Sleep(500 * time.Millisecond)

// Verify waiting queue is waiting at arn2
topTask, err := taskEngine.topTask()
assert.NoError(t, err)
assert.Equal(t, topTask.Arn, "arn2")

// Remove 1 task
taskResources := taskEngine.managedTasks["arn0"].ToHostResources()
taskEngine.hostResourceManager.release("arn0", taskResources)
taskEngine.wakeUpTaskQueueMonitor()

time.Sleep(500 * time.Millisecond)

// Verify arn2 got dequeued
topTask, err = taskEngine.topTask()
assert.Error(t, err)
}
91 changes: 0 additions & 91 deletions agent/utils/sync/sequential_waitgroup.go

This file was deleted.

70 changes: 0 additions & 70 deletions agent/utils/sync/sequential_waitgroup_test.go

This file was deleted.

0 comments on commit e00484f

Please sign in to comment.