Skip to content

Commit bb70e50

Browse files
add integ tests for task accounting
1 parent 48887fc commit bb70e50

File tree

5 files changed

+173
-6
lines changed

5 files changed

+173
-6
lines changed

agent/api/task/task_windows_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ func TestPostUnmarshalWindowsCanonicalPaths(t *testing.T) {
109109
},
110110
},
111111
},
112-
StartSequenceNumber: 42,
113112
}
114113

115114
seqNum := int64(42)

agent/engine/common_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ import (
3636
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
3737
mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks"
3838
"github.com/aws/amazon-ecs-agent/agent/statechange"
39+
"github.com/aws/amazon-ecs-agent/agent/utils"
3940
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
4041
mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks"
41-
"github.com/aws/amazon-ecs-agent/agent/utils"
4242
"github.com/cihub/seelog"
4343
dockercontainer "github.com/docker/docker/api/types/container"
4444
"github.com/golang/mock/gomock"

agent/engine/docker_task_engine.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,17 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
323323
return nil
324324
}
325325

326-
// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent
326+
// Method to wake up 'monitorQueuedTasks' goroutine, called when
327+
// - a new task enqueues in waitingTaskQueue
328+
// - a task stops (overseeTask)
329+
// as these are the events when resources change/can change on the host
330+
// Always wakes up when at least one event arrives on buffered channel (size 1) 'monitorQueuedTaskEvent'
327331
// but does not block if monitorQueuedTasks is already processing queued tasks
332+
// Buffered channel of size 1 is sufficient because we only want to go through the queue
333+
// once at any point and schedule as many tasks as possible (as many resources are available)
334+
// Calls on 'wakeUpTaskQueueMonitor' when 'monitorQueuedTasks' is doing work are redundant
335+
// as new tasks are enqueued at the end and will be taken into account in the continued loop
336+
// if permitted by design
328337
func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() {
329338
select {
330339
case engine.monitorQueuedTaskEvent <- struct{}{}:
@@ -594,8 +603,8 @@ func (engine *DockerTaskEngine) synchronizeState() {
594603
engine.saveTaskData(task)
595604
}
596605

597-
// Before starting managedTask goroutines, pre-allocate resources for already running
598-
// tasks in host resource manager
606+
// Before starting managedTask goroutines, pre-allocate resources for tasks which
607+
// which have progressed beyond resource check (waitingTaskQueue) stage
599608
engine.reconcileHostResources()
600609
for _, task := range tasksToStart {
601610
engine.startTask(task)

agent/engine/engine_unix_integ_test.go

+159
Original file line numberDiff line numberDiff line change
@@ -1131,3 +1131,162 @@ func TestDockerExecAPI(t *testing.T) {
11311131

11321132
waitFinished(t, finished, testTimeout)
11331133
}
1134+
1135+
// This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager.
1136+
// First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait
1137+
// until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing.
1138+
func TestHostResourceManagerTrickleQueue(t *testing.T) {
1139+
testTimeout := 1 * time.Minute
1140+
taskEngine, done, _ := setupWithDefaultConfig(t)
1141+
defer done()
1142+
1143+
stateChangeEvents := taskEngine.StateChangeEvents()
1144+
1145+
tasks := []*apitask.Task{}
1146+
for i := 0; i < 3; i++ {
1147+
taskArn := fmt.Sprintf("taskArn-%d", i)
1148+
testTask := createTestTask(taskArn)
1149+
1150+
// create container
1151+
A := createTestContainerWithImageAndName(baseImageForOS, "A")
1152+
A.EntryPoint = &entryPointForOS
1153+
A.Command = []string{"sleep 10"}
1154+
A.Essential = true
1155+
testTask.Containers = []*apicontainer.Container{
1156+
A,
1157+
}
1158+
1159+
// task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
1160+
testTask.Memory = int64(512)
1161+
1162+
tasks = append(tasks, testTask)
1163+
}
1164+
1165+
// goroutine to trickle tasks to enforce queueing order
1166+
go func() {
1167+
taskEngine.AddTask(tasks[0])
1168+
time.Sleep(2 * time.Second)
1169+
taskEngine.AddTask(tasks[1])
1170+
time.Sleep(2 * time.Second)
1171+
taskEngine.AddTask(tasks[2])
1172+
}()
1173+
1174+
finished := make(chan interface{})
1175+
1176+
// goroutine to verify task running order
1177+
go func() {
1178+
// Tasks go RUNNING in order
1179+
verifyContainerRunningStateChange(t, taskEngine)
1180+
verifyTaskIsRunning(stateChangeEvents, tasks[0])
1181+
1182+
verifyContainerRunningStateChange(t, taskEngine)
1183+
verifyTaskIsRunning(stateChangeEvents, tasks[1])
1184+
1185+
// First task should stop before 3rd task goes RUNNING
1186+
verifyContainerStoppedStateChange(t, taskEngine)
1187+
verifyTaskIsStopped(stateChangeEvents, tasks[0])
1188+
1189+
verifyContainerRunningStateChange(t, taskEngine)
1190+
verifyTaskIsRunning(stateChangeEvents, tasks[2])
1191+
1192+
verifyContainerStoppedStateChange(t, taskEngine)
1193+
verifyTaskIsStopped(stateChangeEvents, tasks[1])
1194+
1195+
verifyContainerStoppedStateChange(t, taskEngine)
1196+
verifyTaskIsStopped(stateChangeEvents, tasks[2])
1197+
close(finished)
1198+
}()
1199+
1200+
// goroutine to verify task accounting
1201+
// After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued
1202+
go func() {
1203+
time.Sleep(6 * time.Second)
1204+
task, err := taskEngine.(*DockerTaskEngine).topTask()
1205+
assert.NoError(t, err, "one task should be queued up after 6s")
1206+
assert.Equal(t, task.Arn, tasks[2].Arn, "wrong task at top of queue")
1207+
1208+
time.Sleep(6 * time.Second)
1209+
_, err = taskEngine.(*DockerTaskEngine).topTask()
1210+
assert.Error(t, err, "no task should be queued up after 12s")
1211+
}()
1212+
waitFinished(t, finished, testTimeout)
1213+
}
1214+
1215+
// This test verifies if a task which is STOPPING does not block other new tasks
1216+
// from starting if resources for them are available
1217+
func TestHostResourceManagerResourceUtilization(t *testing.T) {
1218+
testTimeout := 1 * time.Minute
1219+
taskEngine, done, _ := setupWithDefaultConfig(t)
1220+
defer done()
1221+
1222+
stateChangeEvents := taskEngine.StateChangeEvents()
1223+
1224+
tasks := []*apitask.Task{}
1225+
for i := 0; i < 2; i++ {
1226+
taskArn := fmt.Sprintf("IntegTaskArn-%d", i)
1227+
testTask := createTestTask(taskArn)
1228+
1229+
// create container
1230+
A := createTestContainerWithImageAndName(baseImageForOS, "A")
1231+
A.EntryPoint = &entryPointForOS
1232+
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 10; exit 1; }; sleep 30"}
1233+
A.Essential = true
1234+
testTask.Containers = []*apicontainer.Container{
1235+
A,
1236+
}
1237+
1238+
// task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
1239+
testTask.Memory = int64(512)
1240+
1241+
tasks = append(tasks, testTask)
1242+
}
1243+
1244+
// Stop task payload from ACS for 1st task
1245+
stopTask := createTestTask("IntegTaskArn-0")
1246+
stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped
1247+
stopTask.Containers = []*apicontainer.Container{}
1248+
1249+
go func() {
1250+
taskEngine.AddTask(tasks[0])
1251+
time.Sleep(2 * time.Second)
1252+
1253+
// single managedTask which should have started
1254+
assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running")
1255+
1256+
// stopTask
1257+
taskEngine.AddTask(stopTask)
1258+
time.Sleep(2 * time.Second)
1259+
1260+
taskEngine.AddTask(tasks[1])
1261+
}()
1262+
1263+
finished := make(chan interface{})
1264+
1265+
// goroutine to verify task running order
1266+
go func() {
1267+
// Tasks go RUNNING in order, 2nd task doesn't wait for 1st task
1268+
// to transition to STOPPED as resources are available
1269+
verifyContainerRunningStateChange(t, taskEngine)
1270+
verifyTaskIsRunning(stateChangeEvents, tasks[0])
1271+
1272+
verifyContainerRunningStateChange(t, taskEngine)
1273+
verifyTaskIsRunning(stateChangeEvents, tasks[1])
1274+
1275+
// At this time, task[0] stopTask is received, and SIGTERM sent to task
1276+
// but the task[0] is still RUNNING due to trap handler
1277+
assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING")
1278+
assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED")
1279+
1280+
// task[0] stops after SIGTERM trap handler finishes
1281+
verifyContainerStoppedStateChange(t, taskEngine)
1282+
verifyTaskIsStopped(stateChangeEvents, tasks[0])
1283+
1284+
// task[1] stops after normal execution
1285+
verifyContainerStoppedStateChange(t, taskEngine)
1286+
verifyTaskIsStopped(stateChangeEvents, tasks[1])
1287+
1288+
close(finished)
1289+
}()
1290+
1291+
waitFinished(t, finished, testTimeout)
1292+
}

agent/statemanager/state_manager_win_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestLoadsDataForGMSATask(t *testing.T) {
3636
defer cleanup()
3737

3838
cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v26", "gmsa")}
39-
taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil)
39+
taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil)
4040
var containerInstanceArn, cluster, savedInstanceID string
4141
var sequenceNumber int64
4242

0 commit comments

Comments
 (0)