Skip to content

Commit

Permalink
TaskStopVerificationACKResponder integration tests (#4282)
Browse files Browse the repository at this point in the history
  • Loading branch information
tinnywang authored Aug 16, 2024
1 parent 6c7758d commit d6fbc04
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 22 deletions.
227 changes: 227 additions & 0 deletions agent/acs/session/task_stop_verification_ack_responder_integ_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
//go:build integration
// +build integration

package session_test

import (
"context"
"fmt"
"testing"

"github.com/aws/amazon-ecs-agent/agent/acs/session"
"github.com/aws/amazon-ecs-agent/agent/api/container"
apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
"github.com/aws/amazon-ecs-agent/agent/data"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
"github.com/aws/amazon-ecs-agent/agent/taskresource/envFiles"
resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs"
acssession "github.com/aws/amazon-ecs-agent/ecs-agent/acs/session"
"github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/testconst"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/metrics"
"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/require"
)

// Tests that a task, its containers, and its resources are all stopped when a task stop verification ACK message is received.
func TestTaskStopVerificationACKResponder_StopsTaskContainersAndResources(t *testing.T) {
taskEngine, done, dockerClient, _ := engine.SetupIntegTestTaskEngine(engine.DefaultTestConfigIntegTest(), nil, t)
defer done()

task := engine.CreateTestTask("test_task")
createEnvironmentFileResources(task, 3)
createLongRunningContainers(task, 3)
go taskEngine.AddTask(task)

for i := 0; i < len(task.Containers); i++ {
engine.VerifyContainerManifestPulledStateChange(t, taskEngine)
}
engine.VerifyTaskManifestPulledStateChange(t, taskEngine)
for i := 0; i < len(task.Containers); i++ {
engine.VerifyContainerRunningStateChange(t, taskEngine)
}
engine.VerifyTaskRunningStateChange(t, taskEngine)

manifestMessageIDAccessor := session.NewManifestMessageIDAccessor()
require.NoError(t, manifestMessageIDAccessor.SetMessageID("manifest_message_id"))

taskStopper := session.NewTaskStopper(taskEngine, data.NewNoopClient())
responder := acssession.NewTaskStopVerificationACKResponder(taskStopper, manifestMessageIDAccessor, metrics.NewNopEntryFactory())

handler := responder.HandlerFunc().(func(*ecsacs.TaskStopVerificationAck))
handler(&ecsacs.TaskStopVerificationAck{
GeneratedAt: aws.Int64(testconst.DummyInt),
MessageId: aws.String(manifestMessageIDAccessor.GetMessageID()),
StopTasks: []*ecsacs.TaskIdentifier{{TaskArn: aws.String(task.Arn)}},
})

// Wait for all state changes before verifying container, resource, and task statuses.
for i := 0; i < len(task.Containers); i++ {
engine.VerifyContainerStoppedStateChange(t, taskEngine)
}
engine.VerifyTaskStoppedStateChange(t, taskEngine)

// Verify that all the task's containers have stopped.
for _, container := range task.Containers {
status, _ := dockerClient.DescribeContainer(context.Background(), container.RuntimeID)
require.Equal(t, apicontainerstatus.ContainerStopped, status)
}
// Verify that all the tasks's resources have been removed.
for _, resource := range task.GetResources() {
require.Equal(t, resourcestatus.ResourceRemoved, resource.GetKnownStatus())
}
// Verify that the task has stopped.
require.Equal(t, apitaskstatus.TaskStopped, task.GetKnownStatus())
}

// Tests that only the tasks specified in the task stop verification ACK message are stopped.
func TestTaskStopVerificationACKResponder_StopsSpecificTasks(t *testing.T) {
taskEngine, done, dockerClient, _ := engine.SetupIntegTestTaskEngine(engine.DefaultTestConfigIntegTest(), nil, t)
defer done()

var tasks []*apitask.Task
for i := 0; i < 3; i++ {
task := engine.CreateTestTask(fmt.Sprintf("test_task_%d", i))
createLongRunningContainers(task, 1)
go taskEngine.AddTask(task)

engine.VerifyContainerManifestPulledStateChange(t, taskEngine)
engine.VerifyTaskManifestPulledStateChange(t, taskEngine)
engine.VerifyContainerRunningStateChange(t, taskEngine)
engine.VerifyTaskRunningStateChange(t, taskEngine)
tasks = append(tasks, task)
}

manifestMessageIDAccessor := session.NewManifestMessageIDAccessor()
require.NoError(t, manifestMessageIDAccessor.SetMessageID("manifest_message_id"))

taskStopper := session.NewTaskStopper(taskEngine, data.NewNoopClient())
responder := acssession.NewTaskStopVerificationACKResponder(taskStopper, manifestMessageIDAccessor, metrics.NewNopEntryFactory())

// Stop the last 2 tasks.
handler := responder.HandlerFunc().(func(*ecsacs.TaskStopVerificationAck))
handler(&ecsacs.TaskStopVerificationAck{
GeneratedAt: aws.Int64(testconst.DummyInt),
MessageId: aws.String(manifestMessageIDAccessor.GetMessageID()),
StopTasks: []*ecsacs.TaskIdentifier{
{TaskArn: aws.String(tasks[1].Arn)},
{TaskArn: aws.String(tasks[2].Arn)},
},
})

// Wait for all state changes before verifying container and task statuses.
for i := 0; i < 2; i++ {
engine.VerifyContainerStoppedStateChange(t, taskEngine)
engine.VerifyTaskStoppedStateChange(t, taskEngine)
}

// Verify that the last 2 tasks and their containers have stopped.
for _, task := range tasks[1:] {
status, _ := dockerClient.DescribeContainer(context.Background(), task.Containers[0].RuntimeID)
require.Equal(t, apicontainerstatus.ContainerStopped, status)
require.Equal(t, apitaskstatus.TaskStopped, task.GetKnownStatus())
}

// Verify that the first task and its container are still running.
status, _ := dockerClient.DescribeContainer(context.Background(), tasks[0].Containers[0].RuntimeID)
require.Equal(t, apicontainerstatus.ContainerRunning, status)
require.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus())
}

// Tests simple test cases, such as the happy path for 1 task with 1 container and edge cases where no tasks are stopped.
func TestTaskStopVerificationACKResponder(t *testing.T) {
testCases := []struct {
description string
messageID string
taskArn string
stopTaskArn string
shouldStop bool
}{
{
description: "stops a task",
messageID: "manifest_message_id",
taskArn: "test_task",
stopTaskArn: "test_task",
shouldStop: true,
},
{
description: "task not found",
messageID: "manifest_message_id",
taskArn: "test_task",
stopTaskArn: "not_found_task",
},
{
description: "invalid message id",
taskArn: "test_task",
stopTaskArn: "test_task",
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
taskEngine, done, dockerClient, _ := engine.SetupIntegTestTaskEngine(engine.DefaultTestConfigIntegTest(), nil, t)
defer done()

task := engine.CreateTestTask(tc.taskArn)
createLongRunningContainers(task, 1)
go taskEngine.AddTask(task)

engine.VerifyContainerManifestPulledStateChange(t, taskEngine)
engine.VerifyTaskManifestPulledStateChange(t, taskEngine)
engine.VerifyContainerRunningStateChange(t, taskEngine)
engine.VerifyTaskRunningStateChange(t, taskEngine)

manifestMessageIDAccessor := session.NewManifestMessageIDAccessor()
manifestMessageIDAccessor.SetMessageID(tc.messageID)

taskStopper := session.NewTaskStopper(taskEngine, data.NewNoopClient())
responder := acssession.NewTaskStopVerificationACKResponder(taskStopper, manifestMessageIDAccessor, metrics.NewNopEntryFactory())

handler := responder.HandlerFunc().(func(*ecsacs.TaskStopVerificationAck))
handler(&ecsacs.TaskStopVerificationAck{
GeneratedAt: aws.Int64(testconst.DummyInt),
MessageId: aws.String(manifestMessageIDAccessor.GetMessageID()),
StopTasks: []*ecsacs.TaskIdentifier{
{TaskArn: aws.String(tc.stopTaskArn)},
},
})

if tc.shouldStop {
engine.VerifyContainerStoppedStateChange(t, taskEngine)
engine.VerifyTaskStoppedStateChange(t, taskEngine)

status, _ := dockerClient.DescribeContainer(context.Background(), task.Containers[0].RuntimeID)
require.Equal(t, apicontainerstatus.ContainerStopped, status)
require.Equal(t, apitaskstatus.TaskStopped, task.GetKnownStatus())
} else {
status, _ := dockerClient.DescribeContainer(context.Background(), task.Containers[0].RuntimeID)
require.Equal(t, apicontainerstatus.ContainerRunning, status)
require.Equal(t, apitaskstatus.TaskRunning, task.GetKnownStatus())
}
})
}
}

func createEnvironmentFileResources(task *apitask.Task, n int) {
task.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
for i := 0; i < n; i++ {
envFile := &envFiles.EnvironmentFileResource{}
// Set known status to ResourceCreated to avoid downloading files from S3.
envFile.SetKnownStatus(resourcestatus.ResourceCreated)
task.AddResource(envFiles.ResourceName, envFile)
}
}

func createLongRunningContainers(task *apitask.Task, n int) {
var containers []*container.Container
for i := 0; i < n; i++ {
container := engine.CreateTestContainer()
container.Command = engine.GetLongRunningCommand()
container.Name = fmt.Sprintf("%s-%d", container.Name, i)
containers = append(containers, container)
}
task.Containers = containers
}
22 changes: 18 additions & 4 deletions agent/engine/common_integ_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ var (
sdkClientFactory sdkclientfactory.Factory
)

const (
taskSteadyStatePollInterval = 100 * time.Millisecond
taskSteadyStatePollIntervalJitter = 10 * time.Millisecond
)

func init() {
sdkClientFactory = sdkclientfactory.NewFactory(context.TODO(), dockerEndpoint)
}
Expand All @@ -69,7 +74,7 @@ func CreateTestTask(arn string) *apitask.Task {
Family: "family",
Version: "1",
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Containers: []*apicontainer.Container{createTestContainer()},
Containers: []*apicontainer.Container{CreateTestContainer()},
}
}

Expand Down Expand Up @@ -119,6 +124,10 @@ func setupGMSALinux(cfg *config.Config, state dockerstate.TaskEngineState, t *te
taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager,
eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager,
resourceFields, execcmd.NewManager(), engineserviceconnect.NewManager(), daemonManagers)
// Set the steady state poll interval to a low value so that tasks transition from their current state to their
// desired state faster. This prevents tests from appearing to hang while waiting for state change events.
taskEngine.taskSteadyStatePollInterval = taskSteadyStatePollInterval
taskEngine.taskSteadyStatePollIntervalJitter = taskSteadyStatePollIntervalJitter
taskEngine.MustInit(context.TODO())
return taskEngine, func() {
taskEngine.Shutdown()
Expand Down Expand Up @@ -159,7 +168,7 @@ func VerifyTaskManifestPulledStateChange(t *testing.T, taskEngine TaskEngine) {
func VerifyContainerRunningStateChange(t *testing.T, taskEngine TaskEngine) {
stateChangeEvents := taskEngine.StateChangeEvents()
event := <-stateChangeEvents
assert.Equal(t, event.(api.ContainerStateChange).Status, apicontainerstatus.ContainerRunning,
assert.Equal(t, apicontainerstatus.ContainerRunning, event.(api.ContainerStateChange).Status,
"Expected container to be RUNNING")
}

Expand All @@ -173,7 +182,7 @@ func VerifyTaskRunningStateChange(t *testing.T, taskEngine TaskEngine) {
func verifyContainerRunningStateChangeWithRuntimeID(t *testing.T, taskEngine TaskEngine) {
stateChangeEvents := taskEngine.StateChangeEvents()
event := <-stateChangeEvents
assert.Equal(t, event.(api.ContainerStateChange).Status, apicontainerstatus.ContainerRunning,
assert.Equal(t, apicontainerstatus.ContainerRunning, event.(api.ContainerStateChange).Status,
"Expected container to be RUNNING")
assert.NotEqual(t, "", event.(api.ContainerStateChange).RuntimeID,
"Expected container runtimeID should not empty")
Expand All @@ -196,8 +205,9 @@ func verifyExecAgentStateChange(t *testing.T, taskEngine TaskEngine,
func VerifyContainerStoppedStateChange(t *testing.T, taskEngine TaskEngine) {
stateChangeEvents := taskEngine.StateChangeEvents()
event := <-stateChangeEvents
sc := event.(api.ContainerStateChange)
assert.Equal(t, event.(api.ContainerStateChange).Status, apicontainerstatus.ContainerStopped,
"Expected container to be STOPPED")
"Expected container %s from task %s to be STOPPED", sc.RuntimeID, sc.TaskArn)
}

func verifyContainerStoppedStateChangeWithReason(t *testing.T, taskEngine TaskEngine, reason string) {
Expand Down Expand Up @@ -259,6 +269,10 @@ func SetupIntegTestTaskEngine(cfg *config.Config, state dockerstate.TaskEngineSt
taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager,
eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager,
nil, execcmd.NewManager(), engineserviceconnect.NewManager(), daemonManagers)
// Set the steady state poll interval to a low value so that tasks transition from their current state to their
// desired state faster. This prevents tests from appearing to hang while waiting for state change events.
taskEngine.taskSteadyStatePollInterval = taskSteadyStatePollInterval
taskEngine.taskSteadyStatePollIntervalJitter = taskSteadyStatePollIntervalJitter
taskEngine.MustInit(context.TODO())
return taskEngine, func() {
taskEngine.Shutdown()
Expand Down
6 changes: 3 additions & 3 deletions agent/engine/common_unix_integ_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ const (
dockerEndpoint = "unix:///var/run/docker.sock"
)

func createTestContainer() *apicontainer.Container {
func CreateTestContainer() *apicontainer.Container {
return createTestContainerWithImageAndName(testRegistryImage, "netcat")
}

// getLongRunningCommand returns the command that keeps the container running for the container
// GetLongRunningCommand returns the command that keeps the container running for the container
// that uses the default integ test image (amazon/amazon-ecs-netkitten for unix)
func getLongRunningCommand() []string {
func GetLongRunningCommand() []string {
return []string{"-loop=true"}
}

Expand Down
2 changes: 1 addition & 1 deletion agent/engine/engine_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestDockerStateToContainerState(t *testing.T) {

// let the container keep running to prevent the edge case where it's already stopped when we check whether
// it's running
container.Command = getLongRunningCommand()
container.Command = GetLongRunningCommand()

client, err := sdkClient.NewClientWithOpts(sdkClient.WithHost(endpoint), sdkClient.WithVersion(sdkclientfactory.GetDefaultVersion().String()))
require.NoError(t, err, "Creating go docker client failed")
Expand Down
Loading

0 comments on commit d6fbc04

Please sign in to comment.