Skip to content

Commit

Permalink
Revert "Dont consume host resources for tasks getting STOPPED while w…
Browse files Browse the repository at this point in the history
…aiting in waitingTasksQueue (aws#3750)"

This reverts commit 8b96a11.
  • Loading branch information
sparrc committed Jun 30, 2023
1 parent 98722bd commit da9adc0
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 146 deletions.
51 changes: 10 additions & 41 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,9 @@ type DockerTaskEngine struct {
// all tasks, it must not acquire it for any significant duration
// The write mutex should be taken when adding and removing tasks from managedTasks.
tasksLock sync.RWMutex
// waitingTasksLock is a mutex for operations on waitingTasksQueue
// waitingTasksLock is a mutext for operations on waitingTasksQueue
waitingTasksLock sync.RWMutex

// monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which
// allocate host resources and wakes up waiting host resources. This should be used
// for synchronizing task desired status updates and queue operations
monitorQueuedTasksLock sync.RWMutex

credentialsManager credentials.Manager
_time ttime.Time
_timeOnce sync.Once
Expand Down Expand Up @@ -397,8 +392,15 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
if err != nil {
break
}
dequeuedTask := engine.tryDequeueWaitingTasks(task)
if !dequeuedTask {
taskHostResources := task.ToHostResources()
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
}
if consumed {
engine.startWaitingTask()
} else {
// not consumed, go to wait
break
}
}
Expand All @@ -407,39 +409,6 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
}
}

func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool {
// Isolate monitorQueuedTasks processing from changes of desired status updates to prevent
// unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks
// For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks
// could be processing
engine.monitorQueuedTasksLock.Lock()
defer engine.monitorQueuedTasksLock.Unlock()
taskDesiredStatus := task.GetDesiredStatus()
if taskDesiredStatus.Terminal() {
logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn})
engine.returnWaitingTask()
return true
}
taskHostResources := task.ToHostResources()
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
return true
}
if consumed {
engine.startWaitingTask()
return true
}
return false
// not consumed, go to wait
}

// To be called when resources are not to be consumed by host resource manager, just dequeues and returns
func (engine *DockerTaskEngine) returnWaitingTask() {
task, _ := engine.dequeueTask()
task.consumedHostResourceEvent <- struct{}{}
}

func (engine *DockerTaskEngine) failWaitingTask(err error) {
task, _ := engine.dequeueTask()
logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn})
Expand Down
95 changes: 1 addition & 94 deletions agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) {
testTask := createTestTask(taskArn)

// create container
A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i))
A := createTestContainerWithImageAndName(baseImageForOS, "A")
A.EntryPoint = &entryPointForOS
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"}
A.Essential = true
Expand Down Expand Up @@ -1288,96 +1288,3 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) {

waitFinished(t, finished, testTimeout)
}

// This task verifies resources are properly released for all tasks for the case where
// stopTask is received from ACS for a task which is queued up in waitingTasksQueue
func TestHostResourceManagerStopTaskNotBlockWaitingTasks(t *testing.T) {
testTimeout := 1 * time.Minute
taskEngine, done, _ := setupWithDefaultConfig(t)
defer done()

stateChangeEvents := taskEngine.StateChangeEvents()

tasks := []*apitask.Task{}
stopTasks := []*apitask.Task{}
for i := 0; i < 2; i++ {
taskArn := fmt.Sprintf("IntegTaskArn-%d", i)
testTask := createTestTask(taskArn)
testTask.Memory = int64(768)

// create container
A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i))
A.EntryPoint = &entryPointForOS
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"}
A.Essential = true
A.StopTimeout = uint(6)
testTask.Containers = []*apicontainer.Container{
A,
}

tasks = append(tasks, testTask)

// Stop task payloads from ACS for the tasks
stopTask := createTestTask(fmt.Sprintf("IntegTaskArn-%d", i))
stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped
stopTask.Containers = []*apicontainer.Container{}
stopTasks = append(stopTasks, stopTask)
}

// goroutine to schedule tasks
go func() {
taskEngine.AddTask(tasks[0])
time.Sleep(2 * time.Second)

// single managedTask which should have started
assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running")

// stopTask[0] - stop running task[0], this task will go to STOPPING due to trap handler defined and STOPPED after 6s
taskEngine.AddTask(stopTasks[0])

time.Sleep(2 * time.Second)

// this task (task[1]) goes in waitingTasksQueue because not enough memory available
taskEngine.AddTask(tasks[1])

time.Sleep(2 * time.Second)

// stopTask[1] - stop waiting task - task[1]
taskEngine.AddTask(stopTasks[1])
}()

finished := make(chan interface{})

// goroutine to verify task running order and verify assertions
go func() {
// 1st task goes to RUNNING
verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[0])

time.Sleep(2500 * time.Millisecond)

// At this time, task[0] stopTask is received, and SIGTERM sent to task
// but the task[0] is still RUNNING due to trap handler
assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING")
assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED")

time.Sleep(2 * time.Second)

// task[1] stops while in waitingTasksQueue while task[0] is in progress
// This is because it is still waiting to progress, has no containers created
// and does not need to wait for stopTimeout, can immediately STSC out
verifyTaskIsStopped(stateChangeEvents, tasks[1])

// task[0] stops
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[0])

// Verify resources are properly released in host resource manager
assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[0].Arn), "task 0 resources not released")
assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[1].Arn), "task 1 resources not released")

close(finished)
}()

waitFinished(t, finished, testTimeout)
}
9 changes: 0 additions & 9 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (mtask *managedTask) overseeTask() {
// - Waits until host resource manager succesfully 'consume's task resources and returns
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
// - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately
// - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources
// (resources are later 'release'd on Stopped task emitTaskEvent call)
mtask.waitForHostResources()

Expand Down Expand Up @@ -387,14 +386,6 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool {
func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) {
// Handle acs message changes this task's desired status to whatever
// acs says it should be if it is compatible

// Isolate change of desired status updates from monitorQueuedTasks processing to prevent
// unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks
// For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks
// could be processing
mtask.engine.monitorQueuedTasksLock.Lock()
defer mtask.engine.monitorQueuedTasksLock.Unlock()

logger.Info("New acs transition", logger.Fields{
field.TaskID: mtask.GetID(),
field.DesiredStatus: desiredStatus.String(),
Expand Down
3 changes: 1 addition & 2 deletions agent/engine/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,7 @@ func TestTaskWaitForExecutionCredentials(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
task := &managedTask{
ctx: ctx,
engine: &DockerTaskEngine{},
ctx: ctx,
Task: &apitask.Task{
KnownStatusUnsafe: apitaskstatus.TaskRunning,
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Expand Down

0 comments on commit da9adc0

Please sign in to comment.