Skip to content

Commit 8b96a11

Browse files
Dont consume host resources for tasks getting STOPPED while waiting in waitingTasksQueue (aws#3750)
* dont consume resources for acs stopped tasks * add integ test for the stopTask in waitingTaskQueue case * remove discardConsumedHostResourceEvents
1 parent 9e77f6f commit 8b96a11

File tree

4 files changed

+146
-12
lines changed

4 files changed

+146
-12
lines changed

agent/engine/docker_task_engine.go

+41-10
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,14 @@ type DockerTaskEngine struct {
155155
// all tasks, it must not acquire it for any significant duration
156156
// The write mutex should be taken when adding and removing tasks from managedTasks.
157157
tasksLock sync.RWMutex
158-
// waitingTasksLock is a mutext for operations on waitingTasksQueue
158+
// waitingTasksLock is a mutex for operations on waitingTasksQueue
159159
waitingTasksLock sync.RWMutex
160160

161+
// monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which
162+
// allocate host resources and wakes up waiting host resources. This should be used
163+
// for synchronizing task desired status updates and queue operations
164+
monitorQueuedTasksLock sync.RWMutex
165+
161166
credentialsManager credentials.Manager
162167
_time ttime.Time
163168
_timeOnce sync.Once
@@ -392,15 +397,8 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
392397
if err != nil {
393398
break
394399
}
395-
taskHostResources := task.ToHostResources()
396-
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
397-
if err != nil {
398-
engine.failWaitingTask(err)
399-
}
400-
if consumed {
401-
engine.startWaitingTask()
402-
} else {
403-
// not consumed, go to wait
400+
dequeuedTask := engine.tryDequeueWaitingTasks(task)
401+
if !dequeuedTask {
404402
break
405403
}
406404
}
@@ -409,6 +407,39 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
409407
}
410408
}
411409

410+
func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool {
411+
// Isolate monitorQueuedTasks processing from changes of desired status updates to prevent
412+
// unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks
413+
// For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks
414+
// could be processing
415+
engine.monitorQueuedTasksLock.Lock()
416+
defer engine.monitorQueuedTasksLock.Unlock()
417+
taskDesiredStatus := task.GetDesiredStatus()
418+
if taskDesiredStatus.Terminal() {
419+
logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn})
420+
engine.returnWaitingTask()
421+
return true
422+
}
423+
taskHostResources := task.ToHostResources()
424+
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
425+
if err != nil {
426+
engine.failWaitingTask(err)
427+
return true
428+
}
429+
if consumed {
430+
engine.startWaitingTask()
431+
return true
432+
}
433+
return false
434+
// not consumed, go to wait
435+
}
436+
437+
// To be called when resources are not to be consumed by host resource manager, just dequeues and returns
438+
func (engine *DockerTaskEngine) returnWaitingTask() {
439+
task, _ := engine.dequeueTask()
440+
task.consumedHostResourceEvent <- struct{}{}
441+
}
442+
412443
func (engine *DockerTaskEngine) failWaitingTask(err error) {
413444
task, _ := engine.dequeueTask()
414445
logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn})

agent/engine/engine_unix_integ_test.go

+94-1
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,7 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) {
12271227
testTask := createTestTask(taskArn)
12281228

12291229
// create container
1230-
A := createTestContainerWithImageAndName(baseImageForOS, "A")
1230+
A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i))
12311231
A.EntryPoint = &entryPointForOS
12321232
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"}
12331233
A.Essential = true
@@ -1288,3 +1288,96 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) {
12881288

12891289
waitFinished(t, finished, testTimeout)
12901290
}
1291+
1292+
// This task verifies resources are properly released for all tasks for the case where
1293+
// stopTask is received from ACS for a task which is queued up in waitingTasksQueue
1294+
func TestHostResourceManagerStopTaskNotBlockWaitingTasks(t *testing.T) {
1295+
testTimeout := 1 * time.Minute
1296+
taskEngine, done, _ := setupWithDefaultConfig(t)
1297+
defer done()
1298+
1299+
stateChangeEvents := taskEngine.StateChangeEvents()
1300+
1301+
tasks := []*apitask.Task{}
1302+
stopTasks := []*apitask.Task{}
1303+
for i := 0; i < 2; i++ {
1304+
taskArn := fmt.Sprintf("IntegTaskArn-%d", i)
1305+
testTask := createTestTask(taskArn)
1306+
testTask.Memory = int64(768)
1307+
1308+
// create container
1309+
A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i))
1310+
A.EntryPoint = &entryPointForOS
1311+
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"}
1312+
A.Essential = true
1313+
A.StopTimeout = uint(6)
1314+
testTask.Containers = []*apicontainer.Container{
1315+
A,
1316+
}
1317+
1318+
tasks = append(tasks, testTask)
1319+
1320+
// Stop task payloads from ACS for the tasks
1321+
stopTask := createTestTask(fmt.Sprintf("IntegTaskArn-%d", i))
1322+
stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped
1323+
stopTask.Containers = []*apicontainer.Container{}
1324+
stopTasks = append(stopTasks, stopTask)
1325+
}
1326+
1327+
// goroutine to schedule tasks
1328+
go func() {
1329+
taskEngine.AddTask(tasks[0])
1330+
time.Sleep(2 * time.Second)
1331+
1332+
// single managedTask which should have started
1333+
assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running")
1334+
1335+
// stopTask[0] - stop running task[0], this task will go to STOPPING due to trap handler defined and STOPPED after 6s
1336+
taskEngine.AddTask(stopTasks[0])
1337+
1338+
time.Sleep(2 * time.Second)
1339+
1340+
// this task (task[1]) goes in waitingTasksQueue because not enough memory available
1341+
taskEngine.AddTask(tasks[1])
1342+
1343+
time.Sleep(2 * time.Second)
1344+
1345+
// stopTask[1] - stop waiting task - task[1]
1346+
taskEngine.AddTask(stopTasks[1])
1347+
}()
1348+
1349+
finished := make(chan interface{})
1350+
1351+
// goroutine to verify task running order and verify assertions
1352+
go func() {
1353+
// 1st task goes to RUNNING
1354+
verifyContainerRunningStateChange(t, taskEngine)
1355+
verifyTaskIsRunning(stateChangeEvents, tasks[0])
1356+
1357+
time.Sleep(2500 * time.Millisecond)
1358+
1359+
// At this time, task[0] stopTask is received, and SIGTERM sent to task
1360+
// but the task[0] is still RUNNING due to trap handler
1361+
assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING")
1362+
assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED")
1363+
1364+
time.Sleep(2 * time.Second)
1365+
1366+
// task[1] stops while in waitingTasksQueue while task[0] is in progress
1367+
// This is because it is still waiting to progress, has no containers created
1368+
// and does not need to wait for stopTimeout, can immediately STSC out
1369+
verifyTaskIsStopped(stateChangeEvents, tasks[1])
1370+
1371+
// task[0] stops
1372+
verifyContainerStoppedStateChange(t, taskEngine)
1373+
verifyTaskIsStopped(stateChangeEvents, tasks[0])
1374+
1375+
// Verify resources are properly released in host resource manager
1376+
assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[0].Arn), "task 0 resources not released")
1377+
assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[1].Arn), "task 1 resources not released")
1378+
1379+
close(finished)
1380+
}()
1381+
1382+
waitFinished(t, finished, testTimeout)
1383+
}

agent/engine/task_manager.go

+9
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ func (mtask *managedTask) overseeTask() {
204204
// - Waits until host resource manager succesfully 'consume's task resources and returns
205205
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
206206
// - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately
207+
// - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources
207208
// (resources are later 'release'd on Stopped task emitTaskEvent call)
208209
mtask.waitForHostResources()
209210

@@ -386,6 +387,14 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool {
386387
func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) {
387388
// Handle acs message changes this task's desired status to whatever
388389
// acs says it should be if it is compatible
390+
391+
// Isolate change of desired status updates from monitorQueuedTasks processing to prevent
392+
// unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks
393+
// For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks
394+
// could be processing
395+
mtask.engine.monitorQueuedTasksLock.Lock()
396+
defer mtask.engine.monitorQueuedTasksLock.Unlock()
397+
389398
logger.Info("New acs transition", logger.Fields{
390399
field.TaskID: mtask.GetID(),
391400
field.DesiredStatus: desiredStatus.String(),

agent/engine/task_manager_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1420,7 +1420,8 @@ func TestTaskWaitForExecutionCredentials(t *testing.T) {
14201420
ctx, cancel := context.WithCancel(context.TODO())
14211421
defer cancel()
14221422
task := &managedTask{
1423-
ctx: ctx,
1423+
ctx: ctx,
1424+
engine: &DockerTaskEngine{},
14241425
Task: &apitask.Task{
14251426
KnownStatusUnsafe: apitaskstatus.TaskRunning,
14261427
DesiredStatusUnsafe: apitaskstatus.TaskRunning,

0 commit comments

Comments
 (0)