Skip to content

Commit

Permalink
add integ tests for task accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
prateekchaudhry committed Jun 8, 2023
1 parent 48887fc commit aba476f
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 4 deletions.
2 changes: 1 addition & 1 deletion agent/engine/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks"
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/cihub/seelog"
dockercontainer "github.com/docker/docker/api/types/container"
"github.com/golang/mock/gomock"
Expand Down
15 changes: 12 additions & 3 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,17 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return nil
}

// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent
// Method to wake up 'monitorQueuedTasks' goroutine, called when
// - a new task enqueues in waitingTaskQueue
// - a task stops (overseeTask)
// as these are the events when resources change/can change on the host
// Always wakes up when at least one event arrives on buffered channel (size 1) 'monitorQueuedTaskEvent'
// but does not block if monitorQueuedTasks is already processing queued tasks
// Buffered channel of size 1 is sufficient because we only want to go through the queue
// once at any point and schedule as many tasks as possible (as many resources are available)
// Calls on 'wakeUpTaskQueueMonitor' when 'monitorQueuedTasks' is doing work are redundant
// as new tasks are enqueued at the end and will be taken into account in the continued loop
// if permitted by design
func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() {
select {
case engine.monitorQueuedTaskEvent <- struct{}{}:
Expand Down Expand Up @@ -594,8 +603,8 @@ func (engine *DockerTaskEngine) synchronizeState() {
engine.saveTaskData(task)
}

// Before starting managedTask goroutines, pre-allocate resources for already running
// tasks in host resource manager
// Before starting managedTask goroutines, pre-allocate resources for tasks which
// which have progressed beyond resource check (waitingTaskQueue) stage
engine.reconcileHostResources()
for _, task := range tasksToStart {
engine.startTask(task)
Expand Down
159 changes: 159 additions & 0 deletions agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,3 +1131,162 @@ func TestDockerExecAPI(t *testing.T) {

waitFinished(t, finished, testTimeout)
}

// This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager.
// First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait
// until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing.
func TestHostResourceManagerTrickleQueue(t *testing.T) {
testTimeout := 1 * time.Minute
taskEngine, done, _ := setupWithDefaultConfig(t)
defer done()

stateChangeEvents := taskEngine.StateChangeEvents()

tasks := []*apitask.Task{}
for i := 0; i < 3; i++ {
taskArn := fmt.Sprintf("taskArn-%d", i)
testTask := createTestTask(taskArn)

// create container
A := createTestContainerWithImageAndName(baseImageForOS, "A")
A.EntryPoint = &entryPointForOS
A.Command = []string{"sleep 10"}
A.Essential = true
testTask.Containers = []*apicontainer.Container{
A,
}

// task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
testTask.Memory = int64(512)

tasks = append(tasks, testTask)
}

// goroutine to trickle tasks to enforce queueing order
go func() {
taskEngine.AddTask(tasks[0])
time.Sleep(2 * time.Second)
taskEngine.AddTask(tasks[1])
time.Sleep(2 * time.Second)
taskEngine.AddTask(tasks[2])
}()

finished := make(chan interface{})

// goroutine to verify task running order
go func() {
// Tasks go RUNNING in order
verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[0])

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[1])

// First task should stop before 3rd task goes RUNNING
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[0])

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[2])

verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[1])

verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[2])
close(finished)
}()

// goroutine to verify task accounting
// After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued
go func() {
time.Sleep(6 * time.Second)
task, err := taskEngine.(*DockerTaskEngine).topTask()
assert.NoError(t, err, "one task should be queued up after 6s")
assert.Equal(t, task.Arn, tasks[2].Arn, "wrong task at top of queue")

time.Sleep(6 * time.Second)
_, err = taskEngine.(*DockerTaskEngine).topTask()
assert.Error(t, err, "no task should be queued up after 12s")
}()
waitFinished(t, finished, testTimeout)
}

// This test verifies if a task which is STOPPING does not block other new tasks
// from starting if resources for them are available
func TestHostResourceManagerResourceUtilization(t *testing.T) {
testTimeout := 1 * time.Minute
taskEngine, done, _ := setupWithDefaultConfig(t)
defer done()

stateChangeEvents := taskEngine.StateChangeEvents()

tasks := []*apitask.Task{}
for i := 0; i < 2; i++ {
taskArn := fmt.Sprintf("IntegTaskArn-%d", i)
testTask := createTestTask(taskArn)

// create container
A := createTestContainerWithImageAndName(baseImageForOS, "A")
A.EntryPoint = &entryPointForOS
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 10; exit 1; }; sleep 30"}
A.Essential = true
testTask.Containers = []*apicontainer.Container{
A,
}

// task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
testTask.Memory = int64(512)

tasks = append(tasks, testTask)
}

// Stop task payload from ACS for 1st task
stopTask := createTestTask("IntegTaskArn-0")
stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped
stopTask.Containers = []*apicontainer.Container{}

go func() {
taskEngine.AddTask(tasks[0])
time.Sleep(2 * time.Second)

// single managedTask which should have started
assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running")

// stopTask
taskEngine.AddTask(stopTask)
time.Sleep(2 * time.Second)

taskEngine.AddTask(tasks[1])
}()

finished := make(chan interface{})

// goroutine to verify task running order
go func() {
// Tasks go RUNNING in order, 2nd task doesn't wait for 1st task
// to transition to STOPPED as resources are available
verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[0])

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[1])

// At this time, task[0] stopTask is received, and SIGTERM sent to task
// but the task[0] is still RUNNING due to trap handler
assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING")
assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED")

// task[0] stops after SIGTERM trap handler finishes
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[0])

// task[1] stops after normal execution
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[1])

close(finished)
}()

waitFinished(t, finished, testTimeout)
}

0 comments on commit aba476f

Please sign in to comment.