From 96a64ef7463c0f8376ec96e68ac03403c3c53274 Mon Sep 17 00:00:00 2001 From: Prateek Chaudhry Date: Wed, 12 Jul 2023 09:53:24 -0700 Subject: [PATCH] Revert reverted changes for task resource accounting (#3796) * Revert "Revert "host resource manager initialization"" This reverts commit dafb96731a04e160176714c0524feb67edc29fbe. * Revert "Revert "Add method to get host resources reserved for a task (#3706)"" This reverts commit 8d824db3666af7995aef00f8baf4e56acba5f921. * Revert "Revert "Add host resource manager methods (#3700)"" This reverts commit bec130321f6c3bdf8235789b624a84eea2e3eefe. * Revert "Revert "Remove task serialization and use host resource manager for task resources (#3723)"" This reverts commit cb54139470bbd0d69a4c7e5c045427642286854b. * Revert "Revert "add integ tests for task accounting (#3741)"" This reverts commit 61ad01020178254bedf49cd26e80a5b2fd359784. * Revert "Revert "Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order (#3747)"" This reverts commit 60a3f4228616d80660d43c9f3fed706ba9f5016d. * Revert "Revert "Dont consume host resources for tasks getting STOPPED while waiting in waitingTasksQueue (#3750)"" This reverts commit 8943792fe4766c5f77525a01e80f078ec9ed6847. --- agent/acs/handler/acs_handler_test.go | 1 - agent/acs/update_handler/updater_test.go | 12 +- agent/api/ecsclient/client.go | 18 + agent/api/interface.go | 2 + agent/api/mocks/api_mocks.go | 15 + agent/api/task/task.go | 167 ++++++-- agent/api/task/task_test.go | 201 ++++++++- agent/api/task/task_windows_test.go | 1 - agent/api/task/taskvolume_test.go | 4 - agent/api/task/taskvolume_windows_test.go | 4 - agent/app/agent.go | 28 +- agent/app/agent_compatibility_linux_test.go | 9 +- agent/app/agent_test.go | 84 +++- agent/app/agent_unix_test.go | 10 +- agent/app/data.go | 5 +- agent/app/data_test.go | 9 +- agent/engine/common_integ_test.go | 12 +- agent/engine/common_test.go | 42 ++ agent/engine/default.go | 6 +- .../engine/docker_image_manager_integ_test.go | 12 +- agent/engine/docker_task_engine.go | 209 +++++++++- agent/engine/docker_task_engine_test.go | 85 +--- agent/engine/dockerstate/json_test.go | 2 - agent/engine/engine_sudo_linux_integ_test.go | 4 +- agent/engine/engine_unix_integ_test.go | 250 ++++++++++++ agent/engine/engine_windows_integ_test.go | 12 +- agent/engine/host_resource_manager.go | 380 ++++++++++++++++++ agent/engine/host_resource_manager_test.go | 220 ++++++++++ agent/engine/task_manager.go | 101 +++-- agent/engine/task_manager_data_test.go | 2 - agent/engine/task_manager_test.go | 88 ++-- agent/sighandlers/termination_handler_test.go | 2 +- agent/statemanager/state_manager_test.go | 26 +- agent/statemanager/state_manager_unix_test.go | 16 +- agent/statemanager/state_manager_win_test.go | 2 +- agent/stats/engine_integ_test.go | 18 +- agent/stats/engine_unix_integ_test.go | 2 +- agent/utils/sync/sequential_waitgroup.go | 91 ----- agent/utils/sync/sequential_waitgroup_test.go | 70 ---- 39 files changed, 1767 insertions(+), 455 deletions(-) create mode 100644 agent/engine/host_resource_manager.go create mode 100644 agent/engine/host_resource_manager_test.go delete mode 100644 agent/utils/sync/sequential_waitgroup.go delete mode 100644 agent/utils/sync/sequential_waitgroup_test.go diff --git a/agent/acs/handler/acs_handler_test.go b/agent/acs/handler/acs_handler_test.go index c5bcc5c147c..cd9f35f924e 100644 --- a/agent/acs/handler/acs_handler_test.go +++ b/agent/acs/handler/acs_handler_test.go @@ -1418,7 +1418,6 @@ func validateAddedTask(expectedTask apitask.Task, addedTask apitask.Task) error Family: addedTask.Family, Version: addedTask.Version, DesiredStatusUnsafe: addedTask.GetDesiredStatus(), - StartSequenceNumber: addedTask.StartSequenceNumber, } if !reflect.DeepEqual(expectedTask, taskToCompareFromAdded) { diff --git a/agent/acs/update_handler/updater_test.go b/agent/acs/update_handler/updater_test.go index b93a2d74cff..739a7674b93 100644 --- a/agent/acs/update_handler/updater_test.go +++ b/agent/acs/update_handler/updater_test.go @@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) { Reason: ptr("Updates are disabled").(*string), }}) - taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) msg := &ecsacs.PerformUpdateMessage{ ClusterArn: ptr("cluster").(*string), ContainerInstanceArn: ptr("containerInstance").(*string), @@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) { require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written") - taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) msg := &ecsacs.PerformUpdateMessage{ ClusterArn: ptr("cluster").(*string), ContainerInstanceArn: ptr("containerInstance").(*string), @@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) { MessageId: ptr("mid").(*string), }}) - taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) msg := &ecsacs.PerformUpdateMessage{ ClusterArn: ptr("cluster").(*string), ContainerInstanceArn: ptr("containerInstance").(*string), @@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) { require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written") - taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) msg := &ecsacs.PerformUpdateMessage{ ClusterArn: ptr("cluster").(*string), ContainerInstanceArn: ptr("containerInstance").(*string), @@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) { require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written") - taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) msg := &ecsacs.PerformUpdateMessage{ ClusterArn: ptr("cluster").(*string), ContainerInstanceArn: ptr("containerInstance").(*string), @@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) { require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written") - taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) msg := &ecsacs.PerformUpdateMessage{ ClusterArn: ptr("cluster").(*string), ContainerInstanceArn: ptr("containerInstance").(*string), diff --git a/agent/api/ecsclient/client.go b/agent/api/ecsclient/client.go index dc808ef756c..7ad6877b4a4 100644 --- a/agent/api/ecsclient/client.go +++ b/agent/api/ecsclient/client.go @@ -306,6 +306,24 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) { return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil } +// GetHostResources calling getHostResources to get a list of CPU, MEMORY, PORTS and PORTS_UPD resources +// and return a resourceMap that map the resource name to each resource +func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) { + resources, err := client.getResources() + if err != nil { + return nil, err + } + resourceMap := make(map[string]*ecs.Resource) + for _, resource := range resources { + if *resource.Name == "PORTS" { + // Except for RCI, TCP Ports are named as PORTS_TCP in agent for Host Resources purpose + resource.Name = utils.Strptr("PORTS_TCP") + } + resourceMap[*resource.Name] = resource + } + return resourceMap, nil +} + func getCpuAndMemory() (int64, int64) { memInfo, err := system.ReadMemInfo() mem := int64(0) diff --git a/agent/api/interface.go b/agent/api/interface.go index b591a44e1a0..bc5d61f08d9 100644 --- a/agent/api/interface.go +++ b/agent/api/interface.go @@ -55,6 +55,8 @@ type ECSClient interface { // UpdateContainerInstancesState updates the given container Instance ID with // the given status. Only valid statuses are ACTIVE and DRAINING. UpdateContainerInstancesState(instanceARN, status string) error + // GetHostResources retrieves a map that map the resource name to the corresponding resource + GetHostResources() (map[string]*ecs.Resource, error) } // ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS diff --git a/agent/api/mocks/api_mocks.go b/agent/api/mocks/api_mocks.go index 33e1bf7872b..5eedc9be661 100644 --- a/agent/api/mocks/api_mocks.go +++ b/agent/api/mocks/api_mocks.go @@ -261,6 +261,21 @@ func (mr *MockECSClientMockRecorder) DiscoverTelemetryEndpoint(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscoverTelemetryEndpoint", reflect.TypeOf((*MockECSClient)(nil).DiscoverTelemetryEndpoint), arg0) } +// GetHostResources mocks base method. +func (m *MockECSClient) GetHostResources() (map[string]*ecs.Resource, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHostResources") + ret0, _ := ret[0].(map[string]*ecs.Resource) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetHostResources indicates an expected call of GetHostResources. +func (mr *MockECSClientMockRecorder) GetHostResources() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHostResources", reflect.TypeOf((*MockECSClient)(nil).GetHostResources)) +} + // GetResourceTags mocks base method. func (m *MockECSClient) GetResourceTags(arg0 string) ([]*ecs.Tag, error) { m.ctrl.T.Helper() diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 10f2c9d5670..16131294f3b 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -25,6 +25,7 @@ import ( "time" apiappmesh "github.com/aws/amazon-ecs-agent/agent/api/appmesh" + "github.com/aws/amazon-ecs-agent/agent/api/container" apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status" "github.com/aws/amazon-ecs-agent/agent/api/serviceconnect" @@ -47,6 +48,7 @@ import ( apieni "github.com/aws/amazon-ecs-agent/ecs-agent/api/eni" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/arn" @@ -233,9 +235,6 @@ type Task struct { // is handled properly so that the state storage continues to work. SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"` - StartSequenceNumber int64 - StopSequenceNumber int64 - // ExecutionCredentialsID is the ID of credentials that are used by agent to // perform some action at the task level, such as pulling image from ECR ExecutionCredentialsID string `json:"executionCredentialsID"` @@ -311,11 +310,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task, if err := json.Unmarshal(data, task); err != nil { return nil, err } - if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil { - task.StartSequenceNumber = *envelope.SeqNum - } else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil { - task.StopSequenceNumber = *envelope.SeqNum - } // Overrides the container command if it's set for _, container := range task.Containers { @@ -2824,22 +2818,6 @@ func (task *Task) GetAppMesh() *apiappmesh.AppMesh { return task.AppMesh } -// GetStopSequenceNumber returns the stop sequence number of a task -func (task *Task) GetStopSequenceNumber() int64 { - task.lock.RLock() - defer task.lock.RUnlock() - - return task.StopSequenceNumber -} - -// SetStopSequenceNumber sets the stop seqence number of a task -func (task *Task) SetStopSequenceNumber(seqnum int64) { - task.lock.Lock() - defer task.lock.Unlock() - - task.StopSequenceNumber = seqnum -} - // SetPullStartedAt sets the task pullstartedat timestamp and returns whether // this field was updated or not func (task *Task) SetPullStartedAt(timestamp time.Time) bool { @@ -3525,3 +3503,144 @@ func (task *Task) IsServiceConnectConnectionDraining() bool { defer task.lock.RUnlock() return task.ServiceConnectConnectionDrainingUnsafe } + +// ToHostResources will convert a task to a map of resources which ECS takes into account when scheduling tasks on instances +// * CPU +// - If task level CPU is set, use that +// - Else add up container CPUs +// +// * Memory +// - If task level memory is set, use that +// - Else add up container level +// - If memoryReservation field is set, use that +// - Else use memory field +// +// * Ports (TCP/UDP) +// - Only account for hostPort +// - Don't need to account for awsvpc mode, each task gets its own namespace +// +// * GPU +// - Return num of gpus requested (len of GPUIDs field) +func (task *Task) ToHostResources() map[string]*ecs.Resource { + resources := make(map[string]*ecs.Resource) + // CPU + if task.CPU > 0 { + // cpu unit is vcpu at task level + // convert to cpushares + taskCPUint64 := int64(task.CPU * 1024) + resources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &taskCPUint64, + } + } else { + // cpu unit is cpushares at container level + containerCPUint64 := int64(0) + for _, container := range task.Containers { + containerCPUint64 += int64(container.CPU) + } + resources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &containerCPUint64, + } + } + + // Memory + if task.Memory > 0 { + // memory unit is MiB at task level + taskMEMint64 := task.Memory + resources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &taskMEMint64, + } + } else { + containerMEMint64 := int64(0) + // To parse memory reservation / soft limit + hostConfig := &dockercontainer.HostConfig{} + + for _, c := range task.Containers { + if c.DockerConfig.HostConfig != nil { + err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig) + if err != nil || hostConfig.MemoryReservation <= 0 { + // container memory unit is MiB, keeping as is + containerMEMint64 += int64(c.Memory) + } else { + // Soft limit is specified in MiB units but translated to bytes while being transferred to Agent + // Converting back to MiB + containerMEMint64 += hostConfig.MemoryReservation / (1024 * 1024) + } + } else { + // container memory unit is MiB, keeping as is + containerMEMint64 += int64(c.Memory) + } + } + resources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &containerMEMint64, + } + } + + // PORTS_TCP and PORTS_UDP + var tcpPortSet []uint16 + var udpPortSet []uint16 + + // AWSVPC tasks have 'host' ports mapped to task ENI, not to host + // So don't need to keep an 'account' of awsvpc tasks with host ports fields assigned + if !task.IsNetworkModeAWSVPC() { + for _, c := range task.Containers { + for _, port := range c.Ports { + hostPort := port.HostPort + protocol := port.Protocol + if hostPort > 0 && protocol == container.TransportProtocolTCP { + tcpPortSet = append(tcpPortSet, hostPort) + } else if hostPort > 0 && protocol == container.TransportProtocolUDP { + udpPortSet = append(udpPortSet, hostPort) + } + } + } + } + resources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: utils.Uint16SliceToStringSlice(tcpPortSet), + } + resources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: utils.Uint16SliceToStringSlice(udpPortSet), + } + + // GPU + var num_gpus int64 + num_gpus = 0 + for _, c := range task.Containers { + num_gpus += int64(len(c.GPUIDs)) + } + resources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &num_gpus, + } + logger.Debug("Task host resources to account for", logger.Fields{ + "taskArn": task.Arn, + "CPU": *resources["CPU"].IntegerValue, + "MEMORY": *resources["MEMORY"].IntegerValue, + "PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue), + "PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue), + "GPU": *resources["GPU"].IntegerValue, + }) + return resources +} + +func (task *Task) HasActiveContainers() bool { + for _, container := range task.Containers { + containerStatus := container.GetKnownStatus() + if containerStatus >= apicontainerstatus.ContainerPulled && containerStatus <= apicontainerstatus.ContainerResourcesProvisioned { + return true + } + } + return false +} diff --git a/agent/api/task/task_test.go b/agent/api/task/task_test.go index b3dbb57ca6f..02a0aa2173d 100644 --- a/agent/api/task/task_test.go +++ b/agent/api/task/task_test.go @@ -53,6 +53,7 @@ import ( apieni "github.com/aws/amazon-ecs-agent/ecs-agent/api/eni" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" mock_credentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" "github.com/aws/aws-sdk-go/service/secretsmanager" "github.com/aws/amazon-ecs-agent/agent/taskresource/asmsecret" @@ -1860,10 +1861,9 @@ func TestTaskFromACS(t *testing.T) { Type: "elastic-inference", }, }, - StartSequenceNumber: 42, - CPU: 2.0, - Memory: 512, - ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), + CPU: 2.0, + Memory: 512, + ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), } seqNum := int64(42) @@ -4854,3 +4854,196 @@ func TestInitializeAndGetCredentialSpecResource(t *testing.T) { _, ok := task.GetCredentialSpecResource() assert.True(t, ok) } + +func getTestTaskResourceMap(cpu int64, mem int64, ports []*string, portsUdp []*string, numGPUs int64) map[string]*ecs.Resource { + taskResources := make(map[string]*ecs.Resource) + taskResources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &cpu, + } + + taskResources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &mem, + } + + taskResources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports, + } + + taskResources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: portsUdp, + } + + taskResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, + } + + return taskResources +} + +func TestToHostResources(t *testing.T) { + //Prepare a simple hostConfig with memory reservation field for test cases + hostConfig := dockercontainer.HostConfig{ + // 400 MiB + Resources: dockercontainer.Resources{ + MemoryReservation: int64(419430400), + }, + } + + rawHostConfig, err := json.Marshal(&hostConfig) + if err != nil { + t.Fatal(err) + } + + // Prefer task level, and check gpu assignment + testTask1 := &Task{ + CPU: 1.0, + Memory: int64(512), + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{ + HostConfig: strptr(string(rawHostConfig)), + }, + GPUIDs: []string{"gpu1", "gpu2"}, + }, + }, + } + + // If task not set, use container level (MemoryReservation pref) + testTask2 := &Task{ + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{ + HostConfig: strptr(string(rawHostConfig)), + }, + }, + }, + } + + // If task not set, if MemoryReservation not set, use container level hard limit (c.Memory) + testTask3 := &Task{ + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{}, + }, + }, + } + + // Check ports + testTask4 := &Task{ + CPU: 1.0, + Memory: int64(512), + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{ + HostConfig: strptr(string(rawHostConfig)), + }, + Ports: []apicontainer.PortBinding{ + { + ContainerPort: 10, + HostPort: 10, + BindIP: "", + Protocol: apicontainer.TransportProtocolTCP, + }, + { + ContainerPort: 20, + HostPort: 20, + BindIP: "", + Protocol: apicontainer.TransportProtocolUDP, + }, + { + ContainerPortRange: "99-999", + BindIP: "", + Protocol: apicontainer.TransportProtocolTCP, + }, + { + ContainerPortRange: "121-221", + BindIP: "", + Protocol: apicontainer.TransportProtocolUDP, + }, + }, + }, + }, + } + + portsTCP := []uint16{10} + portsUDP := []uint16{20} + + testCases := []struct { + task *Task + expectedResources map[string]*ecs.Resource + }{ + { + task: testTask1, + expectedResources: getTestTaskResourceMap(int64(1024), int64(512), []*string{}, []*string{}, int64(2)), + }, + { + task: testTask2, + expectedResources: getTestTaskResourceMap(int64(1200), int64(400), []*string{}, []*string{}, int64(0)), + }, + { + task: testTask3, + expectedResources: getTestTaskResourceMap(int64(1200), int64(1200), []*string{}, []*string{}, int64(0)), + }, + { + task: testTask4, + expectedResources: getTestTaskResourceMap(int64(1024), int64(512), utils.Uint16SliceToStringSlice(portsTCP), utils.Uint16SliceToStringSlice(portsUDP), int64(0)), + }, + } + + for _, tc := range testCases { + calcResources := tc.task.ToHostResources() + + //CPU + assert.Equal(t, tc.expectedResources["CPU"].IntegerValue, calcResources["CPU"].IntegerValue, "Error converting task CPU tesources") + + //MEMORY + assert.Equal(t, tc.expectedResources["MEMORY"].IntegerValue, calcResources["MEMORY"].IntegerValue, "Error converting task Memory tesources") + + //GPU + assert.Equal(t, tc.expectedResources["GPU"].IntegerValue, calcResources["GPU"].IntegerValue, "Error converting task GPU tesources") + + //PORTS + for _, expectedPort := range tc.expectedResources["PORTS_TCP"].StringSetValue { + found := false + for _, calcPort := range calcResources["PORTS_TCP"].StringSetValue { + if *expectedPort == *calcPort { + found = true + break + } + } + assert.True(t, found, "Could not convert TCP port resources") + } + assert.Equal(t, len(tc.expectedResources["PORTS_TCP"].StringSetValue), len(calcResources["PORTS_TCP"].StringSetValue), "Error converting task TCP port tesources") + + //PORTS_UDP + for _, expectedPort := range tc.expectedResources["PORTS_UDP"].StringSetValue { + found := false + for _, calcPort := range calcResources["PORTS_UDP"].StringSetValue { + if *expectedPort == *calcPort { + found = true + break + } + } + assert.True(t, found, "Could not convert UDP port resources") + } + assert.Equal(t, len(tc.expectedResources["PORTS_UDP"].StringSetValue), len(calcResources["PORTS_UDP"].StringSetValue), "Error converting task UDP port tesources") + } +} diff --git a/agent/api/task/task_windows_test.go b/agent/api/task/task_windows_test.go index ce55a5251ea..883d9ecd3f4 100644 --- a/agent/api/task/task_windows_test.go +++ b/agent/api/task/task_windows_test.go @@ -109,7 +109,6 @@ func TestPostUnmarshalWindowsCanonicalPaths(t *testing.T) { }, }, }, - StartSequenceNumber: 42, } seqNum := int64(42) diff --git a/agent/api/task/taskvolume_test.go b/agent/api/task/taskvolume_test.go index ed0f8ad9a91..c0602bf7705 100644 --- a/agent/api/task/taskvolume_test.go +++ b/agent/api/task/taskvolume_test.go @@ -119,8 +119,6 @@ func TestMarshalTaskVolumesEFS(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, @@ -168,8 +166,6 @@ func TestUnmarshalTaskVolumesEFS(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, diff --git a/agent/api/task/taskvolume_windows_test.go b/agent/api/task/taskvolume_windows_test.go index 98070de3f3f..85e6d954bde 100644 --- a/agent/api/task/taskvolume_windows_test.go +++ b/agent/api/task/taskvolume_windows_test.go @@ -77,8 +77,6 @@ func TestMarshalTaskVolumeFSxWindowsFileServer(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, @@ -118,8 +116,6 @@ func TestUnmarshalTaskVolumeFSxWindowsFileServer(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, diff --git a/agent/app/agent.go b/agent/app/agent.go index 1b6d17bc0d1..6b7d4386129 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -306,17 +306,36 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre return exitcodes.ExitTerminal } } + hostResources, err := client.GetHostResources() + if err != nil { + seelog.Critical("Unable to fetch host resources") + return exitcodes.ExitError + } + numGPUs := int64(0) if agent.cfg.GPUSupportEnabled { err := agent.initializeGPUManager() if err != nil { seelog.Criticalf("Could not initialize Nvidia GPU Manager: %v", err) return exitcodes.ExitError } + // Find number of GPUs instance has + platformDevices := agent.getPlatformDevices() + for _, device := range platformDevices { + if *device.Type == ecs.PlatformDeviceTypeGpu { + numGPUs++ + } + } + } + + hostResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, } // Create the task engine taskEngine, currentEC2InstanceID, err := agent.newTaskEngine( - containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, agent.serviceconnectManager) + containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, agent.serviceconnectManager) if err != nil { seelog.Criticalf("Unable to initialize new task engine: %v", err) return exitcodes.ExitTerminal @@ -523,6 +542,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve credentialsManager credentials.Manager, state dockerstate.TaskEngineState, imageManager engine.ImageManager, + hostResources map[string]*ecs.Resource, execCmdMgr execcmd.Manager, serviceConnectManager engineserviceconnect.Manager) (engine.TaskEngine, string, error) { @@ -531,11 +551,11 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve if !agent.cfg.Checkpoint.Enabled() { seelog.Info("Checkpointing not enabled; a new container instance will be created each time the agent is run") return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager, - containerChangeEventStream, imageManager, state, + containerChangeEventStream, imageManager, hostResources, state, agent.metadataManager, agent.resourceFields, execCmdMgr, serviceConnectManager), "", nil } - savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager) + savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager) if err != nil { seelog.Criticalf("Error loading previously saved state: %v", err) return nil, "", err @@ -560,7 +580,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve state.Reset() // Reset taskEngine; all the other values are still default return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager, - containerChangeEventStream, imageManager, state, agent.metadataManager, + containerChangeEventStream, imageManager, hostResources, state, agent.metadataManager, agent.resourceFields, execCmdMgr, serviceConnectManager), currentEC2InstanceID, nil } diff --git a/agent/app/agent_compatibility_linux_test.go b/agent/app/agent_compatibility_linux_test.go index b7833d0a16c..6b45034214e 100644 --- a/agent/app/agent_compatibility_linux_test.go +++ b/agent/app/agent_compatibility_linux_test.go @@ -65,7 +65,8 @@ func TestCompatibilityEnabledSuccess(t *testing.T) { defer cancel() containerChangeEventStream := eventstream.NewEventStream("events", ctx) - _, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager) + hostResources := getTestHostResources() + _, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) assert.True(t, cfg.TaskCPUMemLimit.Enabled()) @@ -106,7 +107,8 @@ func TestCompatibilityNotSetFail(t *testing.T) { defer cancel() containerChangeEventStream := eventstream.NewEventStream("events", ctx) - _, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager) + hostResources := getTestHostResources() + _, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) assert.False(t, cfg.TaskCPUMemLimit.Enabled()) @@ -146,7 +148,8 @@ func TestCompatibilityExplicitlyEnabledFail(t *testing.T) { defer cancel() containerChangeEventStream := eventstream.NewEventStream("events", ctx) - _, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager) + hostResources := getTestHostResources() + _, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager) assert.Error(t, err) } diff --git a/agent/app/agent_test.go b/agent/app/agent_test.go index 7f2ffd50f81..a7fec4d4a52 100644 --- a/agent/app/agent_test.go +++ b/agent/app/agent_test.go @@ -44,6 +44,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" "github.com/aws/amazon-ecs-agent/agent/statemanager" mock_statemanager "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks" + "github.com/aws/amazon-ecs-agent/agent/utils" mock_loader "github.com/aws/amazon-ecs-agent/agent/utils/loader/mocks" mock_mobypkgwrapper "github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper/mocks" "github.com/aws/amazon-ecs-agent/agent/version" @@ -78,6 +79,20 @@ var apiVersions = []dockerclient.DockerVersion{ dockerclient.Version_1_22, dockerclient.Version_1_23} var capabilities []*ecs.Attribute +var testHostCPU = int64(1024) +var testHostMEMORY = int64(1024) +var testHostResource = map[string]*ecs.Resource{ + "CPU": &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &testHostCPU, + }, + "MEMORY": &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &testHostMEMORY, + }, +} func setup(t *testing.T) (*gomock.Controller, *mock_credentials.MockManager, @@ -169,6 +184,7 @@ func TestDoStartNewTaskEngineError(t *testing.T) { ec2MetadataClient := mock_ec2.NewMockEC2MetadataClient(ctrl) gomock.InOrder( dockerClient.EXPECT().SupportedVersions().Return(apiVersions), + client.EXPECT().GetHostResources().Return(testHostResource, nil), saveableOptionFactory.EXPECT().AddSaveable("TaskEngine", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("ContainerInstanceArn", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), @@ -225,6 +241,7 @@ func TestDoStartRegisterContainerInstanceErrorTerminal(t *testing.T) { mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes() gomock.InOrder( dockerClient.EXPECT().SupportedVersions().Return(apiVersions), + client.EXPECT().GetHostResources().Return(testHostResource, nil), mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil), dockerClient.EXPECT().SupportedVersions().Return(nil), dockerClient.EXPECT().KnownVersions().Return(nil), @@ -281,6 +298,7 @@ func TestDoStartRegisterContainerInstanceErrorNonTerminal(t *testing.T) { mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes() gomock.InOrder( dockerClient.EXPECT().SupportedVersions().Return(apiVersions), + client.EXPECT().GetHostResources().Return(testHostResource, nil), mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil), dockerClient.EXPECT().SupportedVersions().Return(nil), dockerClient.EXPECT().KnownVersions().Return(nil), @@ -322,6 +340,7 @@ func TestDoStartWarmPoolsError(t *testing.T) { mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl) gomock.InOrder( dockerClient.EXPECT().SupportedVersions().Return(apiVersions), + client.EXPECT().GetHostResources().Return(testHostResource, nil), ) cfg := getTestConfig() @@ -441,6 +460,7 @@ func testDoStartHappyPathWithConditions(t *testing.T, blackholed bool, warmPools gomock.InOrder( dockerClient.EXPECT().SupportedVersions().Return(apiVersions), + client.EXPECT().GetHostResources().Return(testHostResource, nil), mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil), dockerClient.EXPECT().SupportedVersions().Return(nil), dockerClient.EXPECT().KnownVersions().Return(nil), @@ -562,8 +582,10 @@ func TestNewTaskEngineRestoreFromCheckpointNoEC2InstanceIDToLoadHappyPath(t *tes saveableOptionFactory: saveableOptionFactory, } + hostResources := getTestHostResources() + _, instanceID, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx), - credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) assert.Equal(t, expectedInstanceID, instanceID) assert.Equal(t, "prev-container-inst", agent.containerInstanceARN) @@ -624,9 +646,10 @@ func TestNewTaskEngineRestoreFromCheckpointPreviousEC2InstanceIDLoadedHappyPath( ec2MetadataClient: ec2MetadataClient, saveableOptionFactory: saveableOptionFactory, } + hostResources := getTestHostResources() _, instanceID, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx), - credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) assert.Equal(t, expectedInstanceID, instanceID) assert.NotEqual(t, "prev-container-inst", agent.containerInstanceARN) @@ -686,8 +709,10 @@ func TestNewTaskEngineRestoreFromCheckpointClusterIDMismatch(t *testing.T) { saveableOptionFactory: saveableOptionFactory, } + hostResources := getTestHostResources() + _, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx), - credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.Error(t, err) assert.IsType(t, clusterMismatchError{}, err) } @@ -731,8 +756,10 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) { saveableOptionFactory: saveableOptionFactory, } + hostResources := getTestHostResources() + _, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx), - credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.Error(t, err) assert.False(t, isTransient(err)) } @@ -777,8 +804,10 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) { saveableOptionFactory: saveableOptionFactory, } + hostResources := getTestHostResources() + _, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx), - credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.Error(t, err) assert.False(t, isTransient(err)) } @@ -816,8 +845,10 @@ func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) { } state := dockerstate.NewTaskEngineState() + hostResources := getTestHostResources() + _, instanceID, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx), - credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) assert.Equal(t, testEC2InstanceID, instanceID) @@ -1346,6 +1377,7 @@ func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) { mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes() gomock.InOrder( dockerClient.EXPECT().SupportedVersions().Return(apiVersions), + client.EXPECT().GetHostResources().Return(testHostResource, nil), mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil), dockerClient.EXPECT().SupportedVersions().Return(nil), dockerClient.EXPECT().KnownVersions().Return(nil), @@ -1637,6 +1669,46 @@ func getTestConfig() config.Config { return cfg } +func getTestHostResources() map[string]*ecs.Resource { + hostResources := make(map[string]*ecs.Resource) + CPUs := int64(1024) + hostResources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &CPUs, + } + //MEMORY + memory := int64(1024) + hostResources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &memory, + } + //PORTS + ports_tcp := []*string{} + hostResources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports_tcp, + } + + //PORTS_UDP + ports_udp := []*string{} + hostResources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports_udp, + } + //GPUs + numGPUs := int64(3) + hostResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, + } + return hostResources +} + func newTestDataClient(t *testing.T) data.Client { testDir := t.TempDir() diff --git a/agent/app/agent_unix_test.go b/agent/app/agent_unix_test.go index 03567e11369..824934624c5 100644 --- a/agent/app/agent_unix_test.go +++ b/agent/app/agent_unix_test.go @@ -116,6 +116,7 @@ func TestDoStartTaskENIHappyPath(t *testing.T) { mockServiceConnectManager.EXPECT().GetLoadedImageName().Return("service_connect_agent:v1").AnyTimes() imageManager.EXPECT().AddImageToCleanUpExclusionList(gomock.Eq("service_connect_agent:v1")).Times(1) mockUdevMonitor.EXPECT().Monitor(gomock.Any()).Return(monitoShutdownEvents).AnyTimes() + client.EXPECT().GetHostResources().Return(testHostResource, nil).Times(1) gomock.InOrder( mockMetadata.EXPECT().PrimaryENIMAC().Return(mac, nil), @@ -460,6 +461,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) { mockServiceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes() mockServiceConnectManager.EXPECT().GetLoadedImageName().Return("service_connect_agent:v1").AnyTimes() imageManager.EXPECT().AddImageToCleanUpExclusionList(gomock.Eq("service_connect_agent:v1")).Times(1) + client.EXPECT().GetHostResources().Return(testHostResource, nil).Times(1) gomock.InOrder( mockControl.EXPECT().Init().Return(nil), @@ -476,6 +478,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) { state.EXPECT().AllImageStates().Return(nil), state.EXPECT().AllENIAttachments().Return(nil), state.EXPECT().AllTasks().Return(nil), + state.EXPECT().AllTasks().Return(nil), client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Do(func(x interface{}) { // Ensures that the test waits until acs session has bee started discoverEndpointsInvoked.Done() @@ -624,6 +627,8 @@ func TestDoStartGPUManagerHappyPath(t *testing.T) { mockServiceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes() mockServiceConnectManager.EXPECT().GetLoadedImageName().Return("service_connect_agent:v1").AnyTimes() imageManager.EXPECT().AddImageToCleanUpExclusionList(gomock.Eq("service_connect_agent:v1")).Times(1) + client.EXPECT().GetHostResources().Return(testHostResource, nil).Times(1) + mockGPUManager.EXPECT().GetDevices().Return(devices).AnyTimes() gomock.InOrder( mockGPUManager.EXPECT().Initialize().Return(nil), @@ -634,7 +639,7 @@ func TestDoStartGPUManagerHappyPath(t *testing.T) { dockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]string{}, nil), mockGPUManager.EXPECT().GetDriverVersion().Return("396.44"), - mockGPUManager.EXPECT().GetDevices().Return(devices), + mockGPUManager.EXPECT().GetDevices().Return(devices).AnyTimes(), client.EXPECT().RegisterContainerInstance(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), devices, gomock.Any()).Return("arn", "", nil), imageManager.EXPECT().SetDataClient(gomock.Any()), @@ -642,6 +647,7 @@ func TestDoStartGPUManagerHappyPath(t *testing.T) { state.EXPECT().AllImageStates().Return(nil), state.EXPECT().AllENIAttachments().Return(nil), state.EXPECT().AllTasks().Return(nil), + state.EXPECT().AllTasks().Return(nil), client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Do(func(x interface{}) { // Ensures that the test waits until acs session has been started discoverEndpointsInvoked.Done() @@ -717,6 +723,7 @@ func TestDoStartGPUManagerInitError(t *testing.T) { mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes() mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes() mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes() + client.EXPECT().GetHostResources().Return(testHostResource, nil).Times(1) cfg := getTestConfig() cfg.GPUSupportEnabled = true @@ -766,6 +773,7 @@ func TestDoStartTaskENIPauseError(t *testing.T) { dockerapi.ListContainersResponse{}).AnyTimes() imageManager.EXPECT().StartImageCleanupProcess(gomock.Any()).MaxTimes(1) mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("error")).AnyTimes() + client.EXPECT().GetHostResources().Return(testHostResource, nil).Times(1) cfg := getTestConfig() cfg.TaskENIEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled} diff --git a/agent/app/data.go b/agent/app/data.go index 77dcafe94ab..7b747c6734d 100644 --- a/agent/app/data.go +++ b/agent/app/data.go @@ -23,6 +23,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/pkg/errors" @@ -65,11 +66,13 @@ func (agent *ecsAgent) loadData(containerChangeEventStream *eventstream.EventStr credentialsManager credentials.Manager, state dockerstate.TaskEngineState, imageManager engine.ImageManager, + hostResources map[string]*ecs.Resource, execCmdMgr execcmd.Manager, serviceConnectManager serviceconnect.Manager) (*savedData, error) { + s := &savedData{ taskEngine: engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager, - containerChangeEventStream, imageManager, state, + containerChangeEventStream, imageManager, hostResources, state, agent.metadataManager, agent.resourceFields, execCmdMgr, serviceConnectManager), } s.taskEngine.SetDataClient(agent.dataClient) diff --git a/agent/app/data_test.go b/agent/app/data_test.go index cad597134fa..5adcaff6901 100644 --- a/agent/app/data_test.go +++ b/agent/app/data_test.go @@ -112,9 +112,10 @@ func TestLoadDataNoPreviousState(t *testing.T) { stateManagerFactory: stateManagerFactory, saveableOptionFactory: factory.NewSaveableOption(), } + hostResources := getTestHostResources() _, err := agent.loadData(eventstream.NewEventStream("events", ctx), - credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) } @@ -143,8 +144,9 @@ func TestLoadDataLoadFromBoltDB(t *testing.T) { } state := dockerstate.NewTaskEngineState() + hostResources := getTestHostResources() s, err := agent.loadData(eventstream.NewEventStream("events", ctx), - credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) checkLoadedData(state, s, t) } @@ -181,8 +183,9 @@ func TestLoadDataLoadFromStateFile(t *testing.T) { } state := dockerstate.NewTaskEngineState() + hostResources := getTestHostResources() s, err := agent.loadData(eventstream.NewEventStream("events", ctx), - credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager) + credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager) assert.NoError(t, err) checkLoadedData(state, s, t) diff --git a/agent/engine/common_integ_test.go b/agent/engine/common_integ_test.go index eb15a1ab2f5..b4e2d0e5f77 100644 --- a/agent/engine/common_integ_test.go +++ b/agent/engine/common_integ_test.go @@ -111,9 +111,11 @@ func setupGMSALinux(cfg *config.Config, state dockerstate.TaskEngineState, t *te }, DockerClient: dockerClient, } + hostResources := getTestHostResources() + hostResourceManager := NewHostResourceManager(hostResources) taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager, - eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, state, metadataManager, + eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager, resourceFields, execcmd.NewManager(), engineserviceconnect.NewManager()) taskEngine.MustInit(context.TODO()) return taskEngine, func() { @@ -202,9 +204,11 @@ func setup(cfg *config.Config, state dockerstate.TaskEngineState, t *testing.T) imageManager := NewImageManager(cfg, dockerClient, state) imageManager.SetDataClient(data.NewNoopClient()) metadataManager := containermetadata.NewManager(dockerClient, cfg) + hostResources := getTestHostResources() + hostResourceManager := NewHostResourceManager(hostResources) taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager, - eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, state, metadataManager, + eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager, nil, execcmd.NewManager(), engineserviceconnect.NewManager()) taskEngine.MustInit(context.TODO()) return taskEngine, func() { @@ -221,6 +225,8 @@ func skipIntegTestIfApplicable(t *testing.T) { } } +// Values in host resources from getTestHostResources() should be looked at and CPU/Memory assigned +// accordingly func createTestContainerWithImageAndName(image string, name string) *apicontainer.Container { return &apicontainer.Container{ Name: name, @@ -228,7 +234,7 @@ func createTestContainerWithImageAndName(image string, name string) *apicontaine Command: []string{}, Essential: true, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 1024, + CPU: 256, Memory: 128, } } diff --git a/agent/engine/common_test.go b/agent/engine/common_test.go index ca0c5a16d6b..f6bfa564e8a 100644 --- a/agent/engine/common_test.go +++ b/agent/engine/common_test.go @@ -35,7 +35,9 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" "github.com/aws/amazon-ecs-agent/agent/statechange" + "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks" "github.com/cihub/seelog" dockercontainer "github.com/docker/docker/api/types/container" @@ -365,3 +367,43 @@ func enableExecCommandAgentForContainer(container *apicontainer.Container, state }, } } + +func getTestHostResources() map[string]*ecs.Resource { + hostResources := make(map[string]*ecs.Resource) + CPUs := int64(1024) + hostResources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &CPUs, + } + //MEMORY + memory := int64(1024) + hostResources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &memory, + } + //PORTS + ports_tcp := []*string{} + hostResources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports_tcp, + } + + //PORTS_UDP + ports_udp := []*string{} + hostResources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports_udp, + } + //GPUs + numGPUs := int64(3) + hostResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, + } + return hostResources +} diff --git a/agent/engine/default.go b/agent/engine/default.go index d83c53a89cc..84bc08f850b 100644 --- a/agent/engine/default.go +++ b/agent/engine/default.go @@ -24,6 +24,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" "github.com/aws/amazon-ecs-agent/agent/taskresource" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" ) @@ -31,14 +32,15 @@ import ( func NewTaskEngine(cfg *config.Config, client dockerapi.DockerClient, credentialsManager credentials.Manager, containerChangeEventStream *eventstream.EventStream, - imageManager ImageManager, state dockerstate.TaskEngineState, + imageManager ImageManager, hostResources map[string]*ecs.Resource, state dockerstate.TaskEngineState, metadataManager containermetadata.Manager, resourceFields *taskresource.ResourceFields, execCmdMgr execcmd.Manager, serviceConnectManager serviceconnect.Manager) TaskEngine { + hostResourceManager := NewHostResourceManager(hostResources) taskEngine := NewDockerTaskEngine(cfg, client, credentialsManager, - containerChangeEventStream, imageManager, + containerChangeEventStream, imageManager, &hostResourceManager, state, metadataManager, resourceFields, execCmdMgr, serviceConnectManager) return taskEngine diff --git a/agent/engine/docker_image_manager_integ_test.go b/agent/engine/docker_image_manager_integ_test.go index 031a7bd1415..938a77a3c53 100644 --- a/agent/engine/docker_image_manager_integ_test.go +++ b/agent/engine/docker_image_manager_integ_test.go @@ -568,7 +568,7 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task { Image: test1Image1Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -576,7 +576,7 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task { Image: test1Image2Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -584,7 +584,7 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task { Image: test1Image3Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, }, @@ -603,7 +603,7 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task { Image: test2Image1Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -611,7 +611,7 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task { Image: test2Image2Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -619,7 +619,7 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task { Image: test2Image3Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, }, diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 0ed71d7d80c..51ac7074289 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -46,7 +46,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/taskresource/credentialspec" "github.com/aws/amazon-ecs-agent/agent/taskresource/firelens" "github.com/aws/amazon-ecs-agent/agent/utils" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/ecs-agent/api/appnet" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" @@ -135,10 +134,12 @@ type DockerTaskEngine struct { state dockerstate.TaskEngineState managedTasks map[string]*managedTask - taskStopGroup *utilsync.SequentialWaitGroup + // waitingTasksQueue is a FIFO queue of tasks waiting to acquire host resources + waitingTaskQueue []*managedTask - events <-chan dockerapi.DockerContainerChangeEvent - stateChangeEvents chan statechange.Event + events <-chan dockerapi.DockerContainerChangeEvent + monitorQueuedTaskEvent chan struct{} + stateChangeEvents chan statechange.Event client dockerapi.DockerClient dataClient data.Client @@ -154,6 +155,13 @@ type DockerTaskEngine struct { // all tasks, it must not acquire it for any significant duration // The write mutex should be taken when adding and removing tasks from managedTasks. tasksLock sync.RWMutex + // waitingTasksLock is a mutex for operations on waitingTasksQueue + waitingTasksLock sync.RWMutex + + // monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which + // allocate host resources and wakes up waiting host resources. This should be used + // for synchronizing task desired status updates and queue operations + monitorQueuedTasksLock sync.RWMutex credentialsManager credentials.Manager _time ttime.Time @@ -162,6 +170,7 @@ type DockerTaskEngine struct { containerStatusToTransitionFunction map[apicontainerstatus.ContainerStatus]transitionApplyFunc metadataManager containermetadata.Manager serviceconnectManager serviceconnect.Manager + hostResourceManager *HostResourceManager serviceconnectRelay *apitask.Task // taskSteadyStatePollInterval is the duration that a managed task waits @@ -195,6 +204,7 @@ func NewDockerTaskEngine(cfg *config.Config, credentialsManager credentials.Manager, containerChangeEventStream *eventstream.EventStream, imageManager ImageManager, + hostResourceManager *HostResourceManager, state dockerstate.TaskEngineState, metadataManager containermetadata.Manager, resourceFields *taskresource.ResourceFields, @@ -205,15 +215,16 @@ func NewDockerTaskEngine(cfg *config.Config, client: client, dataClient: data.NewNoopClient(), - state: state, - managedTasks: make(map[string]*managedTask), - taskStopGroup: utilsync.NewSequentialWaitGroup(), - stateChangeEvents: make(chan statechange.Event), + state: state, + managedTasks: make(map[string]*managedTask), + stateChangeEvents: make(chan statechange.Event), + monitorQueuedTaskEvent: make(chan struct{}, 1), credentialsManager: credentialsManager, containerChangeEventStream: containerChangeEventStream, imageManager: imageManager, + hostResourceManager: hostResourceManager, cniClient: ecscni.NewClient(cfg.CNIPluginsPath), appnetClient: appnet.Client(), @@ -235,6 +246,37 @@ func NewDockerTaskEngine(cfg *config.Config, return dockerTaskEngine } +// Reconcile state of host resource manager with task status in managedTasks Slice +// Done on agent restarts +func (engine *DockerTaskEngine) reconcileHostResources() { + logger.Info("Reconciling host resources") + for _, task := range engine.state.AllTasks() { + taskStatus := task.GetKnownStatus() + resources := task.ToHostResources() + + // Release stopped tasks host resources + // Call to release here for stopped tasks should always succeed + // Idempotent release call + if taskStatus.Terminal() { + err := engine.hostResourceManager.release(task.Arn, resources) + if err != nil { + logger.Critical("Failed to release resources during reconciliation", logger.Fields{field.TaskARN: task.Arn}) + } + continue + } + + // Consume host resources if task has progressed (check if any container has progressed) + // Call to consume here should always succeed + // Idempotent consume call + if !task.IsInternal && task.HasActiveContainers() { + consumed, err := engine.hostResourceManager.consume(task.Arn, resources) + if err != nil || !consumed { + logger.Critical("Failed to consume resources for created/running tasks during reconciliation", logger.Fields{field.TaskARN: task.Arn}) + } + } + } +} + func (engine *DockerTaskEngine) initializeContainerStatusToTransitionFunction() { containerStatusToTransitionFunction := map[apicontainerstatus.ContainerStatus]transitionApplyFunc{ apicontainerstatus.ContainerPulled: engine.pullContainer, @@ -277,6 +319,7 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error { return err } engine.synchronizeState() + go engine.monitorQueuedTasks(derivedCtx) // Now catch up and start processing new events per normal go engine.handleDockerEvents(derivedCtx) engine.initialized = true @@ -285,6 +328,131 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error { return nil } +// Method to wake up 'monitorQueuedTasks' goroutine, called when +// - a new task enqueues in waitingTaskQueue +// - a task stops (overseeTask) +// as these are the events when resources change/can change on the host +// Always wakes up when at least one event arrives on buffered channel (size 1) 'monitorQueuedTaskEvent' +// but does not block if monitorQueuedTasks is already processing queued tasks +// Buffered channel of size 1 is sufficient because we only want to go through the queue +// once at any point and schedule as many tasks as possible (as many resources are available) +// Calls on 'wakeUpTaskQueueMonitor' when 'monitorQueuedTasks' is doing work are redundant +// as new tasks are enqueued at the end and will be taken into account in the continued loop +// if permitted by design +func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() { + select { + case engine.monitorQueuedTaskEvent <- struct{}{}: + default: + // do nothing + } +} + +func (engine *DockerTaskEngine) topTask() (*managedTask, error) { + engine.waitingTasksLock.Lock() + defer engine.waitingTasksLock.Unlock() + if len(engine.waitingTaskQueue) > 0 { + return engine.waitingTaskQueue[0], nil + } + return nil, fmt.Errorf("no tasks in waiting queue") +} + +func (engine *DockerTaskEngine) enqueueTask(task *managedTask) { + engine.waitingTasksLock.Lock() + engine.waitingTaskQueue = append(engine.waitingTaskQueue, task) + engine.waitingTasksLock.Unlock() + logger.Debug("Enqueued task in Waiting Task Queue", logger.Fields{field.TaskARN: task.Arn}) + engine.wakeUpTaskQueueMonitor() +} + +func (engine *DockerTaskEngine) dequeueTask() (*managedTask, error) { + engine.waitingTasksLock.Lock() + defer engine.waitingTasksLock.Unlock() + if len(engine.waitingTaskQueue) > 0 { + task := engine.waitingTaskQueue[0] + engine.waitingTaskQueue = engine.waitingTaskQueue[1:] + logger.Debug("Dequeued task from Waiting Task Queue", logger.Fields{field.TaskARN: task.Arn}) + return task, nil + } + + return nil, fmt.Errorf("no tasks in waiting queue") +} + +// monitorQueuedTasks starts as many tasks as possible based on FIFO order of waitingTaskQueue +// and availability of host resources. When no more tasks can be started, it will wait on +// monitorQueuedTaskEvent channel. This channel receives (best effort) messages when +// - a task stops +// - a new task is queued up +// It does not need to receive all messages, as if the routine is going through the queue, it +// may schedule more than one task for a single 'event' received +func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { + logger.Info("Monitoring Task Queue started") + for { + select { + case <-ctx.Done(): + return + case <-engine.monitorQueuedTaskEvent: + // Dequeue as many tasks as possible and start wake up their goroutines + for { + task, err := engine.topTask() + if err != nil { + break + } + dequeuedTask := engine.tryDequeueWaitingTasks(task) + if !dequeuedTask { + break + } + } + logger.Debug("No more tasks could be started at this moment, waiting") + } + } +} + +func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool { + // Isolate monitorQueuedTasks processing from changes of desired status updates to prevent + // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks + // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks + // could be processing + engine.monitorQueuedTasksLock.Lock() + defer engine.monitorQueuedTasksLock.Unlock() + taskDesiredStatus := task.GetDesiredStatus() + if taskDesiredStatus.Terminal() { + logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn}) + engine.returnWaitingTask() + return true + } + taskHostResources := task.ToHostResources() + consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) + if err != nil { + engine.failWaitingTask(err) + return true + } + if consumed { + engine.startWaitingTask() + return true + } + return false + // not consumed, go to wait +} + +// To be called when resources are not to be consumed by host resource manager, just dequeues and returns +func (engine *DockerTaskEngine) returnWaitingTask() { + task, _ := engine.dequeueTask() + task.consumedHostResourceEvent <- struct{}{} +} + +func (engine *DockerTaskEngine) failWaitingTask(err error) { + task, _ := engine.dequeueTask() + logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn}) + task.SetDesiredStatus(apitaskstatus.TaskStopped) + task.consumedHostResourceEvent <- struct{}{} +} + +func (engine *DockerTaskEngine) startWaitingTask() { + task, _ := engine.dequeueTask() + logger.Info("Host resources consumed, progressing task", logger.Fields{field.TaskARN: task.Arn}) + task.consumedHostResourceEvent <- struct{}{} +} + func (engine *DockerTaskEngine) startPeriodicExecAgentsMonitoring(ctx context.Context) { engine.monitorExecAgentsTicker = time.NewTicker(engine.monitorExecAgentsInterval) for { @@ -460,6 +628,14 @@ func (engine *DockerTaskEngine) synchronizeState() { } tasks := engine.state.AllTasks() + // For normal task progress, overseeTask 'consume's resources through waitForHostResources in host_resource_manager before progressing + // For agent restarts (state restore), we pre-consume resources for tasks that had progressed beyond waitForHostResources stage - + // so these tasks do not wait during 'waitForHostResources' call again - do not go through queuing again + // + // Call reconcileHostResources before + // - filterTasksToStartUnsafe which will reconcile container statuses for the duration the agent was stopped + // - starting managedTask's overseeTask goroutines + engine.reconcileHostResources() tasksToStart := engine.filterTasksToStartUnsafe(tasks) for _, task := range tasks { task.InitializeResources(engine.resourceFields) @@ -490,11 +666,6 @@ func (engine *DockerTaskEngine) filterTasksToStartUnsafe(tasks []*apitask.Task) } tasksToStart = append(tasksToStart, task) - - // Put tasks that are stopped by acs but hasn't been stopped in wait group - if task.GetDesiredStatus().Terminal() && task.GetStopSequenceNumber() != 0 { - engine.taskStopGroup.Add(task.GetStopSequenceNumber(), 1) - } } return tasksToStart @@ -782,6 +953,15 @@ func (engine *DockerTaskEngine) deleteTask(task *apitask.Task) { } func (engine *DockerTaskEngine) emitTaskEvent(task *apitask.Task, reason string) { + if task.GetKnownStatus().Terminal() { + // Always do (idempotent) release host resources whenever state change with + // known status == STOPPED is done to ensure sync between tasks and host resource manager + resourcesToRelease := task.ToHostResources() + err := engine.hostResourceManager.release(task.Arn, resourcesToRelease) + if err != nil { + logger.Critical("Failed to release resources after test stopped", logger.Fields{field.TaskARN: task.Arn}) + } + } event, err := api.NewTaskStateChangeEvent(task, reason) if err != nil { if _, ok := err.(api.ErrShouldNotSendEvent); ok { @@ -2178,16 +2358,13 @@ func (engine *DockerTaskEngine) updateTaskUnsafe(task *apitask.Task, update *api logger.Debug("Putting update on the acs channel", logger.Fields{ field.TaskID: task.GetID(), field.DesiredStatus: updateDesiredStatus.String(), - field.Sequence: update.StopSequenceNumber, }) managedTask.emitACSTransition(acsTransition{ desiredStatus: updateDesiredStatus, - seqnum: update.StopSequenceNumber, }) logger.Debug("Update taken off the acs channel", logger.Fields{ field.TaskID: task.GetID(), field.DesiredStatus: updateDesiredStatus.String(), - field.Sequence: update.StopSequenceNumber, }) } diff --git a/agent/engine/docker_task_engine_test.go b/agent/engine/docker_task_engine_test.go index 49733eb745f..4c3c386ceed 100644 --- a/agent/engine/docker_task_engine_test.go +++ b/agent/engine/docker_task_engine_test.go @@ -179,9 +179,10 @@ func mocks(t *testing.T, ctx context.Context, cfg *config.Config) (*gomock.Contr imageManager := mock_engine.NewMockImageManager(ctrl) metadataManager := mock_containermetadata.NewMockManager(ctrl) execCmdMgr := mock_execcmdagent.NewMockManager(ctrl) + hostResources := getTestHostResources() taskEngine := NewTaskEngine(cfg, client, credentialsManager, containerChangeEventStream, - imageManager, dockerstate.NewTaskEngineState(), metadataManager, nil, execCmdMgr, nil) + imageManager, hostResources, dockerstate.NewTaskEngineState(), metadataManager, nil, execCmdMgr, nil) taskEngine.(*DockerTaskEngine)._time = mockTime taskEngine.(*DockerTaskEngine).ctx = ctx taskEngine.(*DockerTaskEngine).stopContainerBackoffMin = time.Millisecond @@ -568,7 +569,6 @@ func TestStopWithPendingStops(t *testing.T) { testTime.EXPECT().After(gomock.Any()).AnyTimes() sleepTask1 := testdata.LoadTask("sleep5") - sleepTask1.StartSequenceNumber = 5 sleepTask2 := testdata.LoadTask("sleep5") sleepTask2.Arn = "arn2" eventStream := make(chan dockerapi.DockerContainerChangeEvent) @@ -596,13 +596,11 @@ func TestStopWithPendingStops(t *testing.T) { stopSleep2 := testdata.LoadTask("sleep5") stopSleep2.Arn = "arn2" stopSleep2.SetDesiredStatus(apitaskstatus.TaskStopped) - stopSleep2.StopSequenceNumber = 4 taskEngine.AddTask(stopSleep2) taskEngine.AddTask(sleepTask1) stopSleep1 := testdata.LoadTask("sleep5") stopSleep1.SetDesiredStatus(apitaskstatus.TaskStopped) - stopSleep1.StopSequenceNumber = 5 taskEngine.AddTask(stopSleep1) pullDone <- true // this means the PullImage is only called once due to the task is stopped before it @@ -1641,11 +1639,11 @@ func TestPullAndUpdateContainerReference(t *testing.T) { // agent starts, container created, metadata file created, agent restarted, container recovered // during task engine init, metadata file updated func TestMetadataFileUpdatedAgentRestart(t *testing.T) { - conf := &defaultConfig + conf := defaultConfig conf.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled} ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - ctrl, client, _, privateTaskEngine, _, imageManager, metadataManager, serviceConnectManager := mocks(t, ctx, conf) + ctrl, client, _, privateTaskEngine, _, imageManager, metadataManager, serviceConnectManager := mocks(t, ctx, &conf) defer ctrl.Finish() var metadataUpdateWG sync.WaitGroup @@ -1871,81 +1869,6 @@ func TestNewTaskTransitionOnRestart(t *testing.T) { assert.True(t, ok, "task wasnot started") } -// TestTaskWaitForHostResourceOnRestart tests task stopped by acs but hasn't -// reached stopped should block the later task to start -func TestTaskWaitForHostResourceOnRestart(t *testing.T) { - // Task 1 stopped by backend - taskStoppedByACS := testdata.LoadTask("sleep5") - taskStoppedByACS.SetDesiredStatus(apitaskstatus.TaskStopped) - taskStoppedByACS.SetStopSequenceNumber(1) - taskStoppedByACS.SetKnownStatus(apitaskstatus.TaskRunning) - // Task 2 has essential container stopped - taskEssentialContainerStopped := testdata.LoadTask("sleep5") - taskEssentialContainerStopped.Arn = "task_Essential_Container_Stopped" - taskEssentialContainerStopped.SetDesiredStatus(apitaskstatus.TaskStopped) - taskEssentialContainerStopped.SetKnownStatus(apitaskstatus.TaskRunning) - // Normal task 3 needs to be started - taskNotStarted := testdata.LoadTask("sleep5") - taskNotStarted.Arn = "task_Not_started" - - conf := &defaultConfig - conf.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled} - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - ctrl, client, _, privateTaskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, conf) - defer ctrl.Finish() - - client.EXPECT().Version(gomock.Any(), gomock.Any()).MaxTimes(1) - client.EXPECT().ContainerEvents(gomock.Any()).MaxTimes(1) - serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes() - - err := privateTaskEngine.Init(ctx) - assert.NoError(t, err) - - taskEngine := privateTaskEngine.(*DockerTaskEngine) - taskEngine.State().AddTask(taskStoppedByACS) - taskEngine.State().AddTask(taskNotStarted) - taskEngine.State().AddTask(taskEssentialContainerStopped) - - taskEngine.State().AddContainer(&apicontainer.DockerContainer{ - Container: taskStoppedByACS.Containers[0], - DockerID: containerID + "1", - DockerName: dockerContainerName + "1", - }, taskStoppedByACS) - taskEngine.State().AddContainer(&apicontainer.DockerContainer{ - Container: taskNotStarted.Containers[0], - DockerID: containerID + "2", - DockerName: dockerContainerName + "2", - }, taskNotStarted) - taskEngine.State().AddContainer(&apicontainer.DockerContainer{ - Container: taskEssentialContainerStopped.Containers[0], - DockerID: containerID + "3", - DockerName: dockerContainerName + "3", - }, taskEssentialContainerStopped) - - // these are performed in synchronizeState on restart - client.EXPECT().DescribeContainer(gomock.Any(), gomock.Any()).Return(apicontainerstatus.ContainerRunning, dockerapi.DockerContainerMetadata{ - DockerID: containerID, - }).Times(3) - imageManager.EXPECT().RecordContainerReference(gomock.Any()).Times(3) - // start the two tasks - taskEngine.synchronizeState() - - var waitStopWG sync.WaitGroup - waitStopWG.Add(1) - go func() { - // This is to confirm the other task is waiting - time.Sleep(1 * time.Second) - // Remove the task sequence number 1 from waitgroup - taskEngine.taskStopGroup.Done(1) - waitStopWG.Done() - }() - - // task with sequence number 2 should wait until 1 is removed from the waitgroup - taskEngine.taskStopGroup.Wait(2) - waitStopWG.Wait() -} - // TestPullStartedStoppedAtWasSetCorrectly tests the PullStartedAt and PullStoppedAt // was set correctly func TestPullStartedStoppedAtWasSetCorrectly(t *testing.T) { diff --git a/agent/engine/dockerstate/json_test.go b/agent/engine/dockerstate/json_test.go index c584c35787a..dd8f8222642 100644 --- a/agent/engine/dockerstate/json_test.go +++ b/agent/engine/dockerstate/json_test.go @@ -124,8 +124,6 @@ const ( "KnownStatus": "RUNNING", "KnownTime": "2017-11-01T20:24:21.449897483Z", "SentStatus": "RUNNING", - "StartSequenceNumber": 9, - "StopSequenceNumber": 0, "ENI": { "ec2Id": "eni-abcd", "IPV4Addresses": [ diff --git a/agent/engine/engine_sudo_linux_integ_test.go b/agent/engine/engine_sudo_linux_integ_test.go index 2ddd9a47b07..9160cec66fd 100644 --- a/agent/engine/engine_sudo_linux_integ_test.go +++ b/agent/engine/engine_sudo_linux_integ_test.go @@ -584,9 +584,11 @@ func setupEngineForExecCommandAgent(t *testing.T, hostBinDir string) (TaskEngine imageManager.SetDataClient(data.NewNoopClient()) metadataManager := containermetadata.NewManager(dockerClient, cfg) execCmdMgr := execcmd.NewManagerWithBinDir(hostBinDir) + hostResources := getTestHostResources() + hostResourceManager := NewHostResourceManager(hostResources) taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager, - eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, state, metadataManager, + eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager, nil, execCmdMgr, engineserviceconnect.NewManager()) taskEngine.monitorExecAgentsInterval = time.Second taskEngine.MustInit(context.TODO()) diff --git a/agent/engine/engine_unix_integ_test.go b/agent/engine/engine_unix_integ_test.go index b68e510b82c..a6bbcc5a553 100644 --- a/agent/engine/engine_unix_integ_test.go +++ b/agent/engine/engine_unix_integ_test.go @@ -1131,3 +1131,253 @@ func TestDockerExecAPI(t *testing.T) { waitFinished(t, finished, testTimeout) } + +// This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager. +// First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait +// until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing. +func TestHostResourceManagerTrickleQueue(t *testing.T) { + testTimeout := 1 * time.Minute + taskEngine, done, _ := setupWithDefaultConfig(t) + defer done() + + stateChangeEvents := taskEngine.StateChangeEvents() + + tasks := []*apitask.Task{} + for i := 0; i < 3; i++ { + taskArn := fmt.Sprintf("taskArn-%d", i) + testTask := createTestTask(taskArn) + + // create container + A := createTestContainerWithImageAndName(baseImageForOS, "A") + A.EntryPoint = &entryPointForOS + A.Command = []string{"sleep 10"} + A.Essential = true + testTask.Containers = []*apicontainer.Container{ + A, + } + + // task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources() + testTask.Memory = int64(512) + + tasks = append(tasks, testTask) + } + + // goroutine to trickle tasks to enforce queueing order + go func() { + taskEngine.AddTask(tasks[0]) + time.Sleep(2 * time.Second) + taskEngine.AddTask(tasks[1]) + time.Sleep(2 * time.Second) + taskEngine.AddTask(tasks[2]) + }() + + finished := make(chan interface{}) + + // goroutine to verify task running order + go func() { + // Tasks go RUNNING in order + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[0]) + + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[1]) + + // First task should stop before 3rd task goes RUNNING + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[0]) + + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[2]) + + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[1]) + + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[2]) + close(finished) + }() + + // goroutine to verify task accounting + // After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued + go func() { + time.Sleep(6 * time.Second) + task, err := taskEngine.(*DockerTaskEngine).topTask() + assert.NoError(t, err, "one task should be queued up after 6s") + assert.Equal(t, task.Arn, tasks[2].Arn, "wrong task at top of queue") + + time.Sleep(6 * time.Second) + _, err = taskEngine.(*DockerTaskEngine).topTask() + assert.Error(t, err, "no task should be queued up after 12s") + }() + waitFinished(t, finished, testTimeout) +} + +// This test verifies if a task which is STOPPING does not block other new tasks +// from starting if resources for them are available +func TestHostResourceManagerResourceUtilization(t *testing.T) { + testTimeout := 1 * time.Minute + taskEngine, done, _ := setupWithDefaultConfig(t) + defer done() + + stateChangeEvents := taskEngine.StateChangeEvents() + + tasks := []*apitask.Task{} + for i := 0; i < 2; i++ { + taskArn := fmt.Sprintf("IntegTaskArn-%d", i) + testTask := createTestTask(taskArn) + + // create container + A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i)) + A.EntryPoint = &entryPointForOS + A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} + A.Essential = true + A.StopTimeout = uint(6) + testTask.Containers = []*apicontainer.Container{ + A, + } + + tasks = append(tasks, testTask) + } + + // Stop task payload from ACS for 1st task + stopTask := createTestTask("IntegTaskArn-0") + stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped + stopTask.Containers = []*apicontainer.Container{} + + go func() { + taskEngine.AddTask(tasks[0]) + time.Sleep(2 * time.Second) + + // single managedTask which should have started + assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running") + + // stopTask + taskEngine.AddTask(stopTask) + time.Sleep(2 * time.Second) + + taskEngine.AddTask(tasks[1]) + }() + + finished := make(chan interface{}) + + // goroutine to verify task running order + go func() { + // Tasks go RUNNING in order, 2nd task doesn't wait for 1st task + // to transition to STOPPED as resources are available + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[0]) + + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[1]) + + // At this time, task[0] stopTask is received, and SIGTERM sent to task + // but the task[0] is still RUNNING due to trap handler + assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING") + assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED") + + // task[0] stops after SIGTERM trap handler finishes + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[0]) + + // task[1] stops after normal execution + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[1]) + + close(finished) + }() + + waitFinished(t, finished, testTimeout) +} + +// This task verifies resources are properly released for all tasks for the case where +// stopTask is received from ACS for a task which is queued up in waitingTasksQueue +func TestHostResourceManagerStopTaskNotBlockWaitingTasks(t *testing.T) { + testTimeout := 1 * time.Minute + taskEngine, done, _ := setupWithDefaultConfig(t) + defer done() + + stateChangeEvents := taskEngine.StateChangeEvents() + + tasks := []*apitask.Task{} + stopTasks := []*apitask.Task{} + for i := 0; i < 2; i++ { + taskArn := fmt.Sprintf("IntegTaskArn-%d", i) + testTask := createTestTask(taskArn) + testTask.Memory = int64(768) + + // create container + A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i)) + A.EntryPoint = &entryPointForOS + A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} + A.Essential = true + A.StopTimeout = uint(6) + testTask.Containers = []*apicontainer.Container{ + A, + } + + tasks = append(tasks, testTask) + + // Stop task payloads from ACS for the tasks + stopTask := createTestTask(fmt.Sprintf("IntegTaskArn-%d", i)) + stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped + stopTask.Containers = []*apicontainer.Container{} + stopTasks = append(stopTasks, stopTask) + } + + // goroutine to schedule tasks + go func() { + taskEngine.AddTask(tasks[0]) + time.Sleep(2 * time.Second) + + // single managedTask which should have started + assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running") + + // stopTask[0] - stop running task[0], this task will go to STOPPING due to trap handler defined and STOPPED after 6s + taskEngine.AddTask(stopTasks[0]) + + time.Sleep(2 * time.Second) + + // this task (task[1]) goes in waitingTasksQueue because not enough memory available + taskEngine.AddTask(tasks[1]) + + time.Sleep(2 * time.Second) + + // stopTask[1] - stop waiting task - task[1] + taskEngine.AddTask(stopTasks[1]) + }() + + finished := make(chan interface{}) + + // goroutine to verify task running order and verify assertions + go func() { + // 1st task goes to RUNNING + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[0]) + + time.Sleep(2500 * time.Millisecond) + + // At this time, task[0] stopTask is received, and SIGTERM sent to task + // but the task[0] is still RUNNING due to trap handler + assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING") + assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED") + + time.Sleep(2 * time.Second) + + // task[1] stops while in waitingTasksQueue while task[0] is in progress + // This is because it is still waiting to progress, has no containers created + // and does not need to wait for stopTimeout, can immediately STSC out + verifyTaskIsStopped(stateChangeEvents, tasks[1]) + + // task[0] stops + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[0]) + + // Verify resources are properly released in host resource manager + assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[0].Arn), "task 0 resources not released") + assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[1].Arn), "task 1 resources not released") + + close(finished) + }() + + waitFinished(t, finished, testTimeout) +} diff --git a/agent/engine/engine_windows_integ_test.go b/agent/engine/engine_windows_integ_test.go index b244ade6fc7..e7444ee13cf 100644 --- a/agent/engine/engine_windows_integ_test.go +++ b/agent/engine/engine_windows_integ_test.go @@ -75,13 +75,15 @@ var endpoint = utils.DefaultIfBlank(os.Getenv(DockerEndpointEnvVariable), docker // TODO implement this func isDockerRunning() bool { return true } +// Values in host resources from getTestHoustResources() should be looked at and CPU/Memory assigned +// accordingly func createTestContainer() *apicontainer.Container { return &apicontainer.Container{ Name: "windows", Image: testBaseImage, Essential: true, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, } } @@ -552,9 +554,11 @@ func setupGMSA(cfg *config.Config, state dockerstate.TaskEngineState, t *testing }, DockerClient: dockerClient, } + hostResources := getTestHostResources() + hostResourceManager := NewHostResourceManager(hostResources) taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager, - eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, state, metadataManager, + eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager, resourceFields, execcmd.NewManager(), engineserviceconnect.NewManager()) taskEngine.MustInit(context.TODO()) return taskEngine, func() { @@ -794,9 +798,11 @@ func setupEngineForExecCommandAgent(t *testing.T, hostBinDir string) (TaskEngine imageManager.SetDataClient(data.NewNoopClient()) metadataManager := containermetadata.NewManager(dockerClient, cfg) execCmdMgr := execcmd.NewManagerWithBinDir(hostBinDir) + hostResources := getTestHostResources() + hostResourceManager := NewHostResourceManager(hostResources) taskEngine := NewDockerTaskEngine(cfg, dockerClient, credentialsManager, - eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, state, metadataManager, + eventstream.NewEventStream("ENGINEINTEGTEST", context.Background()), imageManager, &hostResourceManager, state, metadataManager, nil, execCmdMgr, engineserviceconnect.NewManager()) taskEngine.monitorExecAgentsInterval = time.Second taskEngine.MustInit(context.TODO()) diff --git a/agent/engine/host_resource_manager.go b/agent/engine/host_resource_manager.go new file mode 100644 index 00000000000..9293af933e9 --- /dev/null +++ b/agent/engine/host_resource_manager.go @@ -0,0 +1,380 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package engine contains the core logic for managing tasks + +package engine + +import ( + "fmt" + "sync" + + "github.com/aws/amazon-ecs-agent/agent/utils" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/aws-sdk-go/aws" +) + +const ( + CPU = "CPU" + GPU = "GPU" + MEMORY = "MEMORY" + PORTSTCP = "PORTS_TCP" + PORTSUDP = "PORTS_UDP" +) + +// HostResourceManager keeps account of host resources allocated for tasks set to be created/running tasks +type HostResourceManager struct { + initialHostResource map[string]*ecs.Resource + consumedResource map[string]*ecs.Resource + hostResourceManagerRWLock sync.Mutex + + //task.arn to boolean whether host resources consumed or not + taskConsumed map[string]bool +} + +type ResourceNotFoundForTask struct { + resource string +} + +func (e *ResourceNotFoundForTask) Error() string { + return fmt.Sprintf("no %s in task resources", e.resource) +} + +type InvalidHostResource struct { + resource string +} + +func (e *InvalidHostResource) Error() string { + return fmt.Sprintf("no %s resource found in host resources", e.resource) +} + +type ResourceIsNilForTask struct { + resource string +} + +func (e *ResourceIsNilForTask) Error() string { + return fmt.Sprintf("resource %s is nil in task resources", e.resource) +} + +func (h *HostResourceManager) logResources(msg string, taskArn string) { + logger.Debug(msg, logger.Fields{ + "taskArn": taskArn, + "CPU": *h.consumedResource[CPU].IntegerValue, + "MEMORY": *h.consumedResource[MEMORY].IntegerValue, + "PORTS_TCP": aws.StringValueSlice(h.consumedResource[PORTSTCP].StringSetValue), + "PORTS_UDP": aws.StringValueSlice(h.consumedResource[PORTSUDP].StringSetValue), + "GPU": *h.consumedResource[GPU].IntegerValue, + }) +} + +func (h *HostResourceManager) consumeIntType(resourceType string, resources map[string]*ecs.Resource) { + *h.consumedResource[resourceType].IntegerValue += *resources[resourceType].IntegerValue +} + +func (h *HostResourceManager) consumeStringSetType(resourceType string, resources map[string]*ecs.Resource) { + resource, ok := resources[resourceType] + if ok { + h.consumedResource[resourceType].StringSetValue = append(h.consumedResource[resourceType].StringSetValue, resource.StringSetValue...) + } +} + +func (h *HostResourceManager) checkTaskConsumed(taskArn string) bool { + h.hostResourceManagerRWLock.Lock() + defer h.hostResourceManagerRWLock.Unlock() + _, ok := h.taskConsumed[taskArn] + return ok +} + +// Returns if resources consumed or not and error status +// false, nil -> did not consume, task should stay pending +// false, err -> resources map has errors, task should fail as cannot schedule with 'wrong' resource map (this basically never happens) +// true, nil -> successfully consumed, task should progress with task creation +func (h *HostResourceManager) consume(taskArn string, resources map[string]*ecs.Resource) (bool, error) { + h.hostResourceManagerRWLock.Lock() + defer h.hostResourceManagerRWLock.Unlock() + defer h.logResources("Consumed resources after task consume call", taskArn) + + // Check if already consumed + _, ok := h.taskConsumed[taskArn] + if ok { + // Nothing to do, already consumed, return + logger.Info("Resources pre-consumed, continue to task creation", logger.Fields{"taskArn": taskArn}) + return true, nil + } + + ok, err := h.consumable(resources) + if err != nil { + logger.Error("Resources failing to consume, error in task resources", logger.Fields{ + "taskArn": taskArn, + field.Error: err, + }) + return false, err + } + if ok { + for resourceKey := range resources { + if *resources[resourceKey].Type == "INTEGER" { + // CPU, MEMORY, GPU + h.consumeIntType(resourceKey, resources) + } else if *resources[resourceKey].Type == "STRINGSET" { + // PORTS_TCP, PORTS_UDP + h.consumeStringSetType(resourceKey, resources) + } + } + + // Set consumed status + h.taskConsumed[taskArn] = true + logger.Info("Resources successfully consumed, continue to task creation", logger.Fields{"taskArn": taskArn}) + return true, nil + } + logger.Info("Resources not consumed, enough resources not available", logger.Fields{"taskArn": taskArn}) + return false, nil +} + +// Functions checkConsumableIntType and checkConsumableStringSetType to be called +// only after checking for resource map health +func (h *HostResourceManager) checkConsumableIntType(resourceName string, resources map[string]*ecs.Resource) bool { + resourceConsumableStatus := *(h.initialHostResource[resourceName].IntegerValue) >= *(h.consumedResource[resourceName].IntegerValue)+*(resources[resourceName].IntegerValue) + return resourceConsumableStatus +} + +func (h *HostResourceManager) checkConsumableStringSetType(resourceName string, resources map[string]*ecs.Resource) bool { + resourceSlice := resources[resourceName].StringSetValue + + // (optimizization) Get a resource specific map to ease look up + resourceMap := make(map[string]struct{}, len(resourceSlice)) + for _, v := range resourceSlice { + resourceMap[*v] = struct{}{} + } + + // Check intersection of resource StringSetValue is empty with consumedResource + for _, obj1 := range h.consumedResource[resourceName].StringSetValue { + _, ok := resourceMap[*obj1] + if ok { + // If resource is already reserved by some other task, this 'resources' object can not be consumed + return false + } + } + return true +} + +func checkResourceExistsInt(resourceName string, resources map[string]*ecs.Resource) error { + _, ok := resources[resourceName] + if ok { + if resources[resourceName].IntegerValue == nil { + return &ResourceIsNilForTask{resourceName} + } + } else { + return &ResourceNotFoundForTask{resourceName} + } + return nil +} + +func checkResourceExistsStringSet(resourceName string, resources map[string]*ecs.Resource) error { + _, ok := resources[resourceName] + if ok { + for _, obj := range resources[resourceName].StringSetValue { + if obj == nil { + return &ResourceIsNilForTask{resourceName} + } + } + } else { + return &ResourceNotFoundForTask{resourceName} + } + return nil +} + +// Checks all resources exists and their values are not nil +func (h *HostResourceManager) checkResourcesHealth(resources map[string]*ecs.Resource) error { + for resourceKey, resourceVal := range resources { + _, ok := h.initialHostResource[resourceKey] + if !ok { + logger.Error(fmt.Sprintf("resource %s not found in host resources", resourceKey)) + return &InvalidHostResource{resourceKey} + } + + // CPU, MEMORY, GPU are INTEGER; + // PORTS_TCP, PORTS_UDP are STRINGSET + // Check if either of these data types exist + if resourceVal.Type == nil || !(*resourceVal.Type == "INTEGER" || *resourceVal.Type == "STRINGSET") { + logger.Error(fmt.Sprintf("type not assigned for resource %s", resourceKey)) + return fmt.Errorf("invalid resource type for %s", resourceKey) + } + + // CPU, MEMORY, GPU + if *resourceVal.Type == "INTEGER" { + err := checkResourceExistsInt(resourceKey, resources) + return err + } + + // PORTS_TCP, PORTS_UDP + if *resourceVal.Type == "STRINGSET" { + err := checkResourceExistsStringSet(resourceKey, resources) + return err + } + } + return nil +} + +// Helper function for consume to check if resources are consumable with the current account +// we have for the host resources. Should not call host resource manager lock in this func +// return values +func (h *HostResourceManager) consumable(resources map[string]*ecs.Resource) (bool, error) { + err := h.checkResourcesHealth(resources) + if err != nil { + return false, err + } + + for resourceKey := range resources { + if *resources[resourceKey].Type == "INTEGER" { + consumable := h.checkConsumableIntType(resourceKey, resources) + if !consumable { + return false, nil + } + } + + if *resources[resourceKey].Type == "STRINGSET" { + consumable := h.checkConsumableStringSetType(resourceKey, resources) + if !consumable { + return false, nil + } + } + } + + return true, nil +} + +// Utility function to manage release of ports +// s2 is contiguous sub slice of s1, each is unique (ports) +// returns a slice after removing s2 from s1, if found +func removeSubSlice(s1 []*string, s2 []*string) []*string { + begin := 0 + end := len(s1) - 1 + if len(s2) == 0 { + return s1 + } + for ; begin < len(s1); begin++ { + if *s1[begin] == *s2[0] { + break + } + } + // no intersection found + if begin == len(s1) { + return s1 + } + + end = begin + len(s2) + newSlice := append(s1[:begin], s1[end:]...) + return newSlice +} + +func (h *HostResourceManager) releaseIntType(resourceType string, resources map[string]*ecs.Resource) { + *h.consumedResource[resourceType].IntegerValue -= *resources[resourceType].IntegerValue +} + +func (h *HostResourceManager) releaseStringSetType(resourceType string, resources map[string]*ecs.Resource) { + newSlice := removeSubSlice(h.consumedResource[resourceType].StringSetValue, resources[resourceType].StringSetValue) + h.consumedResource[resourceType].StringSetValue = newSlice +} + +// Returns error if task resource map has error, else releases resources +// Task resource map should never have errors as it is made by task ToHostResources method +// In cases releases fails due to errors, those resources will be failed to be released +// by HostResourceManager +func (h *HostResourceManager) release(taskArn string, resources map[string]*ecs.Resource) error { + h.hostResourceManagerRWLock.Lock() + defer h.hostResourceManagerRWLock.Unlock() + defer h.logResources("Consumed resources after task release call", taskArn) + + if h.taskConsumed[taskArn] { + err := h.checkResourcesHealth(resources) + if err != nil { + return err + } + + for resourceKey := range resources { + if *resources[resourceKey].Type == "INTEGER" { + h.releaseIntType(resourceKey, resources) + } + if *resources[resourceKey].Type == "STRINGSET" { + h.releaseStringSetType(resourceKey, resources) + } + } + + // Set consumed status + delete(h.taskConsumed, taskArn) + } + return nil +} + +// NewHostResourceManager initialize host resource manager with available host resource values +func NewHostResourceManager(resourceMap map[string]*ecs.Resource) HostResourceManager { + // for resources in resourceMap, some are "available resources" like CPU, mem, while + // some others are "reserved/consumed resources" like ports + consumedResourceMap := make(map[string]*ecs.Resource) + taskConsumed := make(map[string]bool) + // assigns CPU, MEMORY, PORTS_TCP, PORTS_UDP from host + //CPU + CPUs := int64(0) + consumedResourceMap[CPU] = &ecs.Resource{ + Name: utils.Strptr(CPU), + Type: utils.Strptr("INTEGER"), + IntegerValue: &CPUs, + } + //MEMORY + memory := int64(0) + consumedResourceMap[MEMORY] = &ecs.Resource{ + Name: utils.Strptr(MEMORY), + Type: utils.Strptr("INTEGER"), + IntegerValue: &memory, + } + //PORTS_TCP + //Copying ports from host resources as consumed ports for initializing + portsTcp := []*string{} + if resourceMap != nil && resourceMap[PORTSTCP] != nil { + portsTcp = resourceMap[PORTSTCP].StringSetValue + } + consumedResourceMap[PORTSTCP] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: portsTcp, + } + + //PORTS_UDP + portsUdp := []*string{} + if resourceMap != nil && resourceMap[PORTSUDP] != nil { + portsUdp = resourceMap[PORTSUDP].StringSetValue + } + consumedResourceMap[PORTSUDP] = &ecs.Resource{ + Name: utils.Strptr(PORTSUDP), + Type: utils.Strptr("STRINGSET"), + StringSetValue: portsUdp, + } + + //GPUs + numGPUs := int64(0) + consumedResourceMap[GPU] = &ecs.Resource{ + Name: utils.Strptr(GPU), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, + } + + logger.Info("Initializing host resource manager, initialHostResource", logger.Fields{"initialHostResource": resourceMap}) + logger.Info("Initializing host resource manager, consumed resource", logger.Fields{"consumedResource": consumedResourceMap}) + return HostResourceManager{ + initialHostResource: resourceMap, + consumedResource: consumedResourceMap, + taskConsumed: taskConsumed, + } +} diff --git a/agent/engine/host_resource_manager_test.go b/agent/engine/host_resource_manager_test.go new file mode 100644 index 00000000000..70659bb6f1f --- /dev/null +++ b/agent/engine/host_resource_manager_test.go @@ -0,0 +1,220 @@ +package engine + +import ( + "testing" + + "github.com/aws/amazon-ecs-agent/agent/utils" + "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" + "github.com/stretchr/testify/assert" +) + +func getTestHostResourceManager(cpu int64, mem int64, ports []*string, portsUdp []*string, numGPUs int64) *HostResourceManager { + hostResources := make(map[string]*ecs.Resource) + hostResources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &cpu, + } + + hostResources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &mem, + } + + hostResources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports, + } + + hostResources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: portsUdp, + } + + hostResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + IntegerValue: &numGPUs, + } + + hostResourceManager := NewHostResourceManager(hostResources) + + return &hostResourceManager +} + +func getTestTaskResourceMap(cpu int64, mem int64, ports []*string, portsUdp []*string, numGPUs int64) map[string]*ecs.Resource { + taskResources := make(map[string]*ecs.Resource) + taskResources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &cpu, + } + + taskResources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &mem, + } + + taskResources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports, + } + + taskResources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: portsUdp, + } + + taskResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, + } + + return taskResources +} +func TestHostResourceConsumeSuccess(t *testing.T) { + hostResourcePort1 := "22" + hostResourcePort2 := "1000" + numGPUs := 4 + h := getTestHostResourceManager(int64(2048), int64(2048), []*string{&hostResourcePort1}, []*string{&hostResourcePort2}, int64(numGPUs)) + + testTaskArn := "arn:aws:ecs:us-east-1::task/cluster-name/11111" + taskPort1 := "23" + taskPort2 := "1001" + taskResources := getTestTaskResourceMap(int64(512), int64(768), []*string{&taskPort1}, []*string{&taskPort2}, 1) + + consumed, _ := h.consume(testTaskArn, taskResources) + assert.Equal(t, consumed, true, "Incorrect consumed status") + assert.Equal(t, *h.consumedResource["CPU"].IntegerValue, int64(512), "Incorrect cpu resource accounting during consume") + assert.Equal(t, *h.consumedResource["MEMORY"].IntegerValue, int64(768), "Incorrect memory resource accounting during consume") + assert.Equal(t, *h.consumedResource["PORTS_TCP"].StringSetValue[0], "22", "Incorrect port resource accounting during consume") + assert.Equal(t, *h.consumedResource["PORTS_TCP"].StringSetValue[1], "23", "Incorrect port resource accounting during consume") + assert.Equal(t, len(h.consumedResource["PORTS_TCP"].StringSetValue), 2, "Incorrect port resource accounting during consume") + assert.Equal(t, *h.consumedResource["PORTS_UDP"].StringSetValue[0], "1000", "Incorrect udp port resource accounting during consume") + assert.Equal(t, *h.consumedResource["PORTS_UDP"].StringSetValue[1], "1001", "Incorrect udp port resource accounting during consume") + assert.Equal(t, len(h.consumedResource["PORTS_UDP"].StringSetValue), 2, "Incorrect port resource accounting during consume") + assert.Equal(t, *h.consumedResource["GPU"].IntegerValue, int64(1), "Incorrect gpu resource accounting during consume") +} +func TestHostResourceConsumeFail(t *testing.T) { + hostResourcePort1 := "22" + hostResourcePort2 := "1000" + numGPUs := 4 + h := getTestHostResourceManager(int64(2048), int64(2048), []*string{&hostResourcePort1}, []*string{&hostResourcePort2}, int64(numGPUs)) + + testTaskArn := "arn:aws:ecs:us-east-1::task/cluster-name/11111" + taskPort1 := "22" + taskPort2 := "1001" + taskResources := getTestTaskResourceMap(int64(512), int64(768), []*string{&taskPort1}, []*string{&taskPort2}, 1) + + consumed, _ := h.consume(testTaskArn, taskResources) + assert.Equal(t, consumed, false, "Incorrect consumed status") + assert.Equal(t, *h.consumedResource["CPU"].IntegerValue, int64(0), "Incorrect cpu resource accounting during consume") + assert.Equal(t, *h.consumedResource["MEMORY"].IntegerValue, int64(0), "Incorrect memory resource accounting during consume") + assert.Equal(t, *h.consumedResource["PORTS_TCP"].StringSetValue[0], "22", "Incorrect port resource accounting during consume") + assert.Equal(t, len(h.consumedResource["PORTS_TCP"].StringSetValue), 1, "Incorrect port resource accounting during consume") + assert.Equal(t, *h.consumedResource["PORTS_UDP"].StringSetValue[0], "1000", "Incorrect udp port resource accounting during consume") + assert.Equal(t, len(h.consumedResource["PORTS_UDP"].StringSetValue), 1, "Incorrect port resource accounting during consume") + assert.Equal(t, *h.consumedResource["GPU"].IntegerValue, int64(0), "Incorrect gpu resource accounting during consume") +} + +func TestHostResourceRelease(t *testing.T) { + hostResourcePort1 := "22" + hostResourcePort2 := "1000" + numGPUs := 4 + h := getTestHostResourceManager(int64(2048), int64(2048), []*string{&hostResourcePort1}, []*string{&hostResourcePort2}, int64(numGPUs)) + + testTaskArn := "arn:aws:ecs:us-east-1::task/cluster-name/11111" + taskPort1 := "23" + taskPort2 := "1001" + taskResources := getTestTaskResourceMap(int64(512), int64(768), []*string{&taskPort1}, []*string{&taskPort2}, 1) + + h.consume(testTaskArn, taskResources) + h.release(testTaskArn, taskResources) + + assert.Equal(t, *h.consumedResource["CPU"].IntegerValue, int64(0), "Incorrect cpu resource accounting during release") + assert.Equal(t, *h.consumedResource["MEMORY"].IntegerValue, int64(0), "Incorrect memory resource accounting during release") + assert.Equal(t, *h.consumedResource["PORTS_TCP"].StringSetValue[0], "22", "Incorrect port resource accounting during release") + assert.Equal(t, len(h.consumedResource["PORTS_TCP"].StringSetValue), 1, "Incorrect port resource accounting during release") + assert.Equal(t, *h.consumedResource["PORTS_UDP"].StringSetValue[0], "1000", "Incorrect udp port resource accounting during release") + assert.Equal(t, len(h.consumedResource["PORTS_UDP"].StringSetValue), 1, "Incorrect udp port resource accounting during release") + assert.Equal(t, *h.consumedResource["GPU"].IntegerValue, int64(0), "Incorrect gpu resource accounting during release") +} + +func TestConsumable(t *testing.T) { + hostResourcePort1 := "22" + hostResourcePort2 := "1000" + numGPUs := 4 + h := getTestHostResourceManager(int64(2048), int64(2048), []*string{&hostResourcePort1}, []*string{&hostResourcePort2}, int64(numGPUs)) + + testCases := []struct { + cpu int64 + mem int64 + ports []uint16 + portsUdp []uint16 + gpus int64 + canBeConsumed bool + }{ + { + cpu: int64(1024), + mem: int64(1024), + ports: []uint16{25}, + portsUdp: []uint16{1003}, + gpus: int64(2), + canBeConsumed: true, + }, + { + cpu: int64(2500), + mem: int64(1024), + ports: []uint16{}, + portsUdp: []uint16{}, + gpus: int64(0), + canBeConsumed: false, + }, + { + cpu: int64(1024), + mem: int64(2500), + ports: []uint16{}, + portsUdp: []uint16{}, + gpus: int64(0), + canBeConsumed: false, + }, + { + cpu: int64(1024), + mem: int64(1024), + ports: []uint16{22}, + portsUdp: []uint16{}, + gpus: int64(0), + canBeConsumed: false, + }, + { + cpu: int64(1024), + mem: int64(1024), + ports: []uint16{}, + portsUdp: []uint16{1000}, + gpus: int64(0), + canBeConsumed: false, + }, + { + cpu: int64(1024), + mem: int64(1024), + ports: []uint16{}, + portsUdp: []uint16{1000}, + gpus: int64(5), + canBeConsumed: false, + }, + } + + for _, tc := range testCases { + resources := getTestTaskResourceMap(tc.cpu, tc.mem, utils.Uint16SliceToStringSlice(tc.ports), utils.Uint16SliceToStringSlice(tc.portsUdp), tc.gpus) + canBeConsumed, err := h.consumable(resources) + assert.Equal(t, canBeConsumed, tc.canBeConsumed, "Error in checking if resources can be successfully consumed") + assert.Equal(t, err, nil, "Error in checking if resources can be successfully consumed, error returned from consumable") + } +} diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index 61654592a70..c710330a36e 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -36,7 +36,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/taskresource" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" @@ -135,12 +134,12 @@ type managedTask struct { credentialsManager credentials.Manager cniClient ecscni.CNIClient dockerClient dockerapi.DockerClient - taskStopWG *utilsync.SequentialWaitGroup acsMessages chan acsTransition dockerMessages chan dockerContainerChange resourceStateChangeEvent chan resourceStateChange stateChangeEvents chan statechange.Event + consumedHostResourceEvent chan struct{} containerChangeEventStream *eventstream.EventStream // unexpectedStart is a once that controls stopping a container that @@ -177,6 +176,7 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask acsMessages: make(chan acsTransition), dockerMessages: make(chan dockerContainerChange), resourceStateChangeEvent: make(chan resourceStateChange), + consumedHostResourceEvent: make(chan struct{}, 1), engine: engine, cfg: engine.cfg, stateChangeEvents: engine.stateChangeEvents, @@ -184,7 +184,6 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask credentialsManager: engine.credentialsManager, cniClient: engine.cniClient, dockerClient: engine.client, - taskStopWG: engine.taskStopGroup, steadyStatePollInterval: engine.taskSteadyStatePollInterval, steadyStatePollIntervalJitter: engine.taskSteadyStatePollIntervalJitter, } @@ -200,12 +199,18 @@ func (mtask *managedTask) overseeTask() { // `desiredstatus`es which are a construct of the engine used only here, // not present on the backend mtask.UpdateStatus() - // If this was a 'state restore', send all unsent statuses - mtask.emitCurrentStatus() - // Wait for host resources required by this task to become available + // Wait here until enough resources are available on host for the task to progress + // - Waits until host resource manager succesfully 'consume's task resources and returns + // - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately + // - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately + // - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources + // (resources are later 'release'd on Stopped task emitTaskEvent call) mtask.waitForHostResources() + // If this was a 'state restore', send all unsent statuses + mtask.emitCurrentStatus() + // Main infinite loop. This is where we receive messages and dispatch work. for { if mtask.shouldExit() { @@ -243,13 +248,8 @@ func (mtask *managedTask) overseeTask() { mtask.engine.checkTearDownPauseContainer(mtask.Task) // TODO [SC]: We need to also tear down pause containets in bridge mode for SC-enabled tasks mtask.cleanupCredentials() - if mtask.StopSequenceNumber != 0 { - logger.Debug("Marking done for this sequence", logger.Fields{ - field.TaskID: mtask.GetID(), - field.Sequence: mtask.StopSequenceNumber, - }) - mtask.taskStopWG.Done(mtask.StopSequenceNumber) - } + // Send event to monitor queue task routine to check for any pending tasks to progress + mtask.engine.wakeUpTaskQueueMonitor() // TODO: make this idempotent on agent restart go mtask.releaseIPInIPAM() mtask.cleanupTask(retry.AddJitter(mtask.cfg.TaskCleanupWaitDuration, mtask.cfg.TaskCleanupWaitDurationJitter)) @@ -275,43 +275,28 @@ func (mtask *managedTask) emitCurrentStatus() { } // waitForHostResources waits for host resources to become available to start -// the task. This involves waiting for previous stops to complete so the -// resources become free. +// the task. It will wait for event on this task's consumedHostResourceEvent +// channel from monitorQueuedTasks routine to wake up func (mtask *managedTask) waitForHostResources() { - if mtask.StartSequenceNumber == 0 { - // This is the first transition on this host. No need to wait + if mtask.GetKnownStatus().Terminal() { + // Task's known status is STOPPED. No need to wait in this case and proceed to cleanup + // This is relevant when agent restarts and a task has stopped - do not attempt + // to consume resources in host resource manager return } - if mtask.GetDesiredStatus().Terminal() { - // Task's desired status is STOPPED. No need to wait in this case either - return - } - - logger.Info("Waiting for any previous stops to complete", logger.Fields{ - field.TaskID: mtask.GetID(), - field.Sequence: mtask.StartSequenceNumber, - }) - - othersStoppedCtx, cancel := context.WithCancel(mtask.ctx) - defer cancel() - go func() { - mtask.taskStopWG.Wait(mtask.StartSequenceNumber) - cancel() - }() - - for !mtask.waitEvent(othersStoppedCtx.Done()) { - if mtask.GetDesiredStatus().Terminal() { - // If we end up here, that means we received a start then stop for this - // task before a task that was expected to stop before it could - // actually stop - break + if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) { + // Internal tasks are started right away as their resources are not accounted for + mtask.engine.enqueueTask(mtask) + for !mtask.waitEvent(mtask.consumedHostResourceEvent) { + if mtask.GetDesiredStatus().Terminal() { + // If we end up here, that means we received a start then stop for this + // task before a task that was expected to stop before it could + // actually stop + break + } } } - logger.Info("Wait over; ready to move towards desired status", logger.Fields{ - field.TaskID: mtask.GetID(), - field.DesiredStatus: mtask.GetDesiredStatus().String(), - }) } // waitSteady waits for a task to leave steady-state by waiting for a new @@ -402,30 +387,27 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool { func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) { // Handle acs message changes this task's desired status to whatever // acs says it should be if it is compatible + + // Isolate change of desired status updates from monitorQueuedTasks processing to prevent + // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks + // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks + // could be processing + mtask.engine.monitorQueuedTasksLock.Lock() + defer mtask.engine.monitorQueuedTasksLock.Unlock() + logger.Info("New acs transition", logger.Fields{ field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), field.Sequence: seqnum, - "StopNumber": mtask.StopSequenceNumber, }) if desiredStatus <= mtask.GetDesiredStatus() { logger.Debug("Redundant task transition; ignoring", logger.Fields{ field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), field.Sequence: seqnum, - "StopNumber": mtask.StopSequenceNumber, }) return } - if desiredStatus == apitaskstatus.TaskStopped && seqnum != 0 && mtask.GetStopSequenceNumber() == 0 { - logger.Info("Managed task moving to stopped, adding to stopgroup with sequence number", - logger.Fields{ - field.TaskID: mtask.GetID(), - field.Sequence: seqnum, - }) - mtask.SetStopSequenceNumber(seqnum) - mtask.taskStopWG.Add(seqnum, 1) - } mtask.SetDesiredStatus(desiredStatus) mtask.UpdateDesiredStatus() mtask.engine.saveTaskData(mtask.Task) @@ -606,6 +588,15 @@ func getContainerEventLogFields(c api.ContainerStateChange) logger.Fields { func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) { taskKnownStatus := task.GetKnownStatus() + // Always do (idempotent) release host resources whenever state change with + // known status == STOPPED is done to ensure sync between tasks and host resource manager + if taskKnownStatus.Terminal() { + resourcesToRelease := mtask.ToHostResources() + err := mtask.engine.hostResourceManager.release(mtask.Arn, resourcesToRelease) + if err != nil { + logger.Critical("Failed to release resources after tast stopped", logger.Fields{field.TaskARN: mtask.Arn}) + } + } if !taskKnownStatus.BackendRecognized() { logger.Debug("Skipping event emission for task", logger.Fields{ field.TaskID: mtask.GetID(), diff --git a/agent/engine/task_manager_data_test.go b/agent/engine/task_manager_data_test.go index ada649d37b4..5ac73a4c3be 100644 --- a/agent/engine/task_manager_data_test.go +++ b/agent/engine/task_manager_data_test.go @@ -30,7 +30,6 @@ import ( resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" resourcetype "github.com/aws/amazon-ecs-agent/agent/taskresource/types" "github.com/aws/amazon-ecs-agent/agent/taskresource/volume" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/pkg/errors" @@ -84,7 +83,6 @@ func TestHandleDesiredStatusChangeSaveData(t *testing.T) { engine: &DockerTaskEngine{ dataClient: dataClient, }, - taskStopWG: utilsync.NewSequentialWaitGroup(), } mtask.handleDesiredStatusChange(tc.targetDesiredStatus, int64(1)) tasks, err := dataClient.GetTasks() diff --git a/agent/engine/task_manager_test.go b/agent/engine/task_manager_test.go index 247bdba0419..d6c87b43374 100644 --- a/agent/engine/task_manager_test.go +++ b/agent/engine/task_manager_test.go @@ -30,7 +30,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/taskresource" mock_taskresource "github.com/aws/amazon-ecs-agent/agent/taskresource/mocks" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/agent/api" apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" @@ -831,6 +830,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) { containerChangeEventStream := eventstream.NewEventStream(eventStreamName, context.Background()) containerChangeEventStream.StartListening() + hostResourceManager := NewHostResourceManager(getTestHostResources()) stateChangeEvents := make(chan statechange.Event) task := &managedTask{ @@ -844,6 +844,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) { containerChangeEventStream: containerChangeEventStream, stateChangeEvents: stateChangeEvents, dataClient: data.NewNoopClient(), + hostResourceManager: &hostResourceManager, }, stateChangeEvents: stateChangeEvents, containerChangeEventStream: containerChangeEventStream, @@ -964,13 +965,15 @@ func TestWaitForContainerTransitionsForTerminalTask(t *testing.T) { func TestOnContainersUnableToTransitionStateForDesiredStoppedTask(t *testing.T) { stateChangeEvents := make(chan statechange.Event) + hostResourceManager := NewHostResourceManager(getTestHostResources()) task := &managedTask{ Task: &apitask.Task{ Containers: []*apicontainer.Container{}, DesiredStatusUnsafe: apitaskstatus.TaskStopped, }, engine: &DockerTaskEngine{ - stateChangeEvents: stateChangeEvents, + stateChangeEvents: stateChangeEvents, + hostResourceManager: &hostResourceManager, }, stateChangeEvents: stateChangeEvents, ctx: context.TODO(), @@ -1417,7 +1420,8 @@ func TestTaskWaitForExecutionCredentials(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() task := &managedTask{ - ctx: ctx, + ctx: ctx, + engine: &DockerTaskEngine{}, Task: &apitask.Task{ KnownStatusUnsafe: apitaskstatus.TaskRunning, DesiredStatusUnsafe: apitaskstatus.TaskRunning, @@ -1780,31 +1784,6 @@ func TestHandleContainerChangeUpdateMetadataRedundant(t *testing.T) { assert.Equal(t, timeNow, containerCreateTime) } -func TestWaitForHostResources(t *testing.T) { - taskStopWG := utilsync.NewSequentialWaitGroup() - taskStopWG.Add(1, 1) - ctx, cancel := context.WithCancel(context.Background()) - - mtask := &managedTask{ - ctx: ctx, - cancel: cancel, - taskStopWG: taskStopWG, - Task: &apitask.Task{ - StartSequenceNumber: 1, - }, - } - - var waitForHostResourcesWG sync.WaitGroup - waitForHostResourcesWG.Add(1) - go func() { - mtask.waitForHostResources() - waitForHostResourcesWG.Done() - }() - - taskStopWG.Done(1) - waitForHostResourcesWG.Wait() -} - func TestWaitForResourceTransition(t *testing.T) { task := &managedTask{ Task: &apitask.Task{ @@ -2195,3 +2174,56 @@ func TestContainerNextStateDependsStoppedContainer(t *testing.T) { }) } } + +// TestTaskWaitForHostResources tests task queuing behavior based on available host resources +func TestTaskWaitForHostResources(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // 1 vCPU available on host + hostResourceManager := NewHostResourceManager(getTestHostResources()) + taskEngine := &DockerTaskEngine{ + managedTasks: make(map[string]*managedTask), + monitorQueuedTaskEvent: make(chan struct{}, 1), + hostResourceManager: &hostResourceManager, + } + go taskEngine.monitorQueuedTasks(ctx) + // 3 tasks requesting 0.5 vCPUs each + tasks := []*apitask.Task{} + for i := 0; i < 3; i++ { + task := testdata.LoadTask("sleep5") + task.Arn = fmt.Sprintf("arn%d", i) + task.CPU = float64(0.5) + mtask := &managedTask{ + Task: task, + engine: taskEngine, + consumedHostResourceEvent: make(chan struct{}, 1), + } + tasks = append(tasks, task) + taskEngine.managedTasks[task.Arn] = mtask + } + + // acquire for host resources order arn0, arn1, arn2 + go func() { + taskEngine.managedTasks["arn0"].waitForHostResources() + taskEngine.managedTasks["arn1"].waitForHostResources() + taskEngine.managedTasks["arn2"].waitForHostResources() + }() + time.Sleep(500 * time.Millisecond) + + // Verify waiting queue is waiting at arn2 + topTask, err := taskEngine.topTask() + assert.NoError(t, err) + assert.Equal(t, topTask.Arn, "arn2") + + // Remove 1 task + taskResources := taskEngine.managedTasks["arn0"].ToHostResources() + taskEngine.hostResourceManager.release("arn0", taskResources) + taskEngine.wakeUpTaskQueueMonitor() + + time.Sleep(500 * time.Millisecond) + + // Verify arn2 got dequeued + topTask, err = taskEngine.topTask() + assert.Error(t, err) +} diff --git a/agent/sighandlers/termination_handler_test.go b/agent/sighandlers/termination_handler_test.go index d0e21b5e941..a282ee6aaf5 100644 --- a/agent/sighandlers/termination_handler_test.go +++ b/agent/sighandlers/termination_handler_test.go @@ -44,7 +44,7 @@ func TestFinalSave(t *testing.T) { state := dockerstate.NewTaskEngineState() taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, - nil, nil, state, nil, nil, nil, nil) + nil, nil, nil, state, nil, nil, nil, nil) task := &apitask.Task{ Arn: taskARN, diff --git a/agent/statemanager/state_manager_test.go b/agent/statemanager/state_manager_test.go index 41815dcf55d..069ee8b142e 100644 --- a/agent/statemanager/state_manager_test.go +++ b/agent/statemanager/state_manager_test.go @@ -44,7 +44,7 @@ func TestLoadsV1DataCorrectly(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v1", "1")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 @@ -90,7 +90,7 @@ func TestLoadsV13DataCorrectly(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v13", "1")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 @@ -138,7 +138,7 @@ func TestLoadsDataForContainerHealthCheckTask(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v10", "container-health-check")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 @@ -180,7 +180,7 @@ func TestLoadsDataForPrivateRegistryTask(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v14", "private-registry")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 @@ -226,7 +226,7 @@ func TestLoadsDataForSecretsTask(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v17", "secrets")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -264,7 +264,7 @@ func TestLoadsDataForAddingAvailabilityZoneInTask(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v18", "availabilityZone")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID, availabilityZone string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -295,7 +295,7 @@ func TestLoadsDataForASMSecretsTask(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v18", "secrets")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -334,7 +334,7 @@ func TestLoadsDataForContainerOrdering(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v20", "containerOrdering")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -369,7 +369,7 @@ func TestLoadsDataForPerContainerTimeouts(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v20", "perContainerTimeouts")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -404,7 +404,7 @@ func TestLoadsDataForContainerRuntimeID(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v23", "perContainerRuntimeID")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -435,7 +435,7 @@ func TestLoadsDataForContainerImageDigest(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v24", "perContainerImageDigest")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -466,7 +466,7 @@ func TestLoadsDataSeqTaskManifest(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v25", "seqNumTaskManifest")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber, seqNumTaskManifest int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -493,7 +493,7 @@ func TestLoadsDataForEnvFiles(t *testing.T) { require.Nil(t, err, "Failed to set up test") defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v28", "environmentFiles")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, diff --git a/agent/statemanager/state_manager_unix_test.go b/agent/statemanager/state_manager_unix_test.go index 8b735eea20d..e97e4891b77 100644 --- a/agent/statemanager/state_manager_unix_test.go +++ b/agent/statemanager/state_manager_unix_test.go @@ -46,7 +46,7 @@ func TestStateManager(t *testing.T) { // Now let's make some state to save containerInstanceArn := "" - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) manager, err = statemanager.NewStateManager(cfg, statemanager.AddSaveable("TaskEngine", taskEngine), @@ -64,7 +64,7 @@ func TestStateManager(t *testing.T) { assertFileMode(t, filepath.Join(tmpDir, "ecs_agent_data.json")) // Now make sure we can load that state sanely - loadedTaskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), + loadedTaskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var loadedContainerInstanceArn string @@ -110,7 +110,7 @@ func TestLoadsDataForAWSVPCTask(t *testing.T) { t.Run(tc.name, func(t *testing.T) { cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v11", tc.dir)} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string stateManager, err := statemanager.NewStateManager(cfg, @@ -149,7 +149,7 @@ func TestLoadsDataForAWSVPCTask(t *testing.T) { // verify that the state manager correctly loads gpu related fields in state file func TestLoadsDataForGPU(t *testing.T) { cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v18", "gpu")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -194,7 +194,7 @@ func TestLoadsDataForGPU(t *testing.T) { func TestLoadsDataForFirelensTask(t *testing.T) { cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v23", "firelens")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -242,7 +242,7 @@ func TestLoadsDataForFirelensTask(t *testing.T) { func TestLoadsDataForFirelensTaskWithExternalConfig(t *testing.T) { cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v24", "firelens")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -296,7 +296,7 @@ func TestLoadsDataForFirelensTaskWithExternalConfig(t *testing.T) { func TestLoadsDataForEFSGATask(t *testing.T) { cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v27", "efs")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 stateManager, err := statemanager.NewStateManager(cfg, @@ -333,7 +333,7 @@ func TestLoadsDataForEFSGATask(t *testing.T) { func TestLoadsDataForGMSATask(t *testing.T) { cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v31", "gmsalinux")} taskEngineState := dockerstate.NewTaskEngineState() - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, taskEngineState, nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, taskEngineState, nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 diff --git a/agent/statemanager/state_manager_win_test.go b/agent/statemanager/state_manager_win_test.go index 125e6a55ac8..999c6267e6c 100644 --- a/agent/statemanager/state_manager_win_test.go +++ b/agent/statemanager/state_manager_win_test.go @@ -36,7 +36,7 @@ func TestLoadsDataForGMSATask(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v26", "gmsa")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64 diff --git a/agent/stats/engine_integ_test.go b/agent/stats/engine_integ_test.go index 65878661b19..ab951ce6ea9 100644 --- a/agent/stats/engine_integ_test.go +++ b/agent/stats/engine_integ_test.go @@ -83,7 +83,7 @@ func TestStatsEngineWithExistingContainersWithoutHealth(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithExistingContainersWithoutHealth") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("default") // Populate Tasks and Container map in the engine. dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) @@ -142,7 +142,7 @@ func TestStatsEngineWithNewContainersWithoutHealth(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithNewContainers") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("default") // Populate Tasks and Container map in the engine. dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) @@ -220,7 +220,7 @@ func TestStatsEngineWithExistingContainers(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithExistingContainers") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("bridge") // enable container health check for this container testTask.Containers[0].HealthCheckType = "docker" @@ -287,7 +287,7 @@ func TestStatsEngineWithNewContainers(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithNewContainers") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("bridge") // enable health check of the container @@ -369,7 +369,7 @@ func TestStatsEngineWithNewContainersWithPolling(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithNewContainers") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("bridge") // enable health check of the container @@ -434,7 +434,7 @@ func TestStatsEngineWithNewContainersWithPolling(t *testing.T) { func TestStatsEngineWithDockerTaskEngine(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithDockerTaskEngine") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) container, err := createHealthContainer(client) require.NoError(t, err, "creating container failed") ctx, cancel := context.WithCancel(context.TODO()) @@ -518,7 +518,7 @@ func TestStatsEngineWithDockerTaskEngine(t *testing.T) { func TestStatsEngineWithDockerTaskEngineMissingRemoveEvent(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithDockerTaskEngineMissingRemoveEvent") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -606,7 +606,7 @@ func testNetworkModeStatsInteg(t *testing.T, networkMode string, statsEmpty bool containerChangeEventStream := eventStream("TestStatsEngineWithNetworkStats") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask(networkMode) // Populate Tasks and Container map in the engine. @@ -687,7 +687,7 @@ func TestStorageStats(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithStorageStats") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("bridge") // Populate Tasks and Container map in the engine. diff --git a/agent/stats/engine_unix_integ_test.go b/agent/stats/engine_unix_integ_test.go index ed3af6a7a7a..a5762b51086 100644 --- a/agent/stats/engine_unix_integ_test.go +++ b/agent/stats/engine_unix_integ_test.go @@ -110,7 +110,7 @@ func TestStatsEngineWithServiceConnectMetrics(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithServiceConnectMetrics") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) testTask := createRunningTask("bridge") testTask.ServiceConnectConfig = &serviceconnect.Config{ ContainerName: serviceConnectContainerName, diff --git a/agent/utils/sync/sequential_waitgroup.go b/agent/utils/sync/sequential_waitgroup.go deleted file mode 100644 index 78b0e266196..00000000000 --- a/agent/utils/sync/sequential_waitgroup.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -// Package sync is an analogue to the stdlib sync package. -// It contains lowlevel synchonization primitives, but not quite as low level as 'sync' does -package sync - -import stdsync "sync" - -// A SequentialWaitGroup waits for a collection of goroutines to finish. Each -// goroutine may add itself to the waitgroup with 'Add', providing a sequence -// number. Each goroutine should then call 'Done' with its sequence number when finished. -// Elsewhere, 'Wait' can be used to wait for all groups at or below the -// provided sequence number to complete. -type SequentialWaitGroup struct { - mutex stdsync.Mutex - // Implement our own semaphore over using sync.WaitGroup so that we can safely GC our map - semaphores map[int64]int - change *stdsync.Cond -} - -func NewSequentialWaitGroup() *SequentialWaitGroup { - return &SequentialWaitGroup{ - semaphores: make(map[int64]int), - change: stdsync.NewCond(&stdsync.Mutex{}), - } -} - -// Add adds the given delta to the waitgroup at the given sequence -func (s *SequentialWaitGroup) Add(sequence int64, delta int) { - s.mutex.Lock() - defer s.mutex.Unlock() - - _, ok := s.semaphores[sequence] - if ok { - s.semaphores[sequence] += delta - } else { - s.semaphores[sequence] = delta - } - if s.semaphores[sequence] <= 0 { - delete(s.semaphores, sequence) - s.change.Broadcast() - } -} - -// Done decrements the waitgroup at the given sequence by one -func (s *SequentialWaitGroup) Done(sequence int64) { - s.mutex.Lock() - defer s.mutex.Unlock() - _, ok := s.semaphores[sequence] - if ok { - s.semaphores[sequence]-- - if s.semaphores[sequence] == 0 { - delete(s.semaphores, sequence) - s.change.Broadcast() - } - } -} - -// Wait waits for all waitgroups at or below the given sequence to complete. -// Please note that this is *INCLUSIVE* of the sequence -func (s *SequentialWaitGroup) Wait(sequence int64) { - waitOver := func() bool { - s.mutex.Lock() - defer s.mutex.Unlock() - for storedSequence := range s.semaphores { - if storedSequence <= sequence { - // At least one non-empty seqnum greater than ours; wait more - return false - } - } - return true - } - - s.change.L.Lock() - defer s.change.L.Unlock() - // Wake up to check if our wait is over after each element being deleted from the map - for !waitOver() { - s.change.Wait() - } -} diff --git a/agent/utils/sync/sequential_waitgroup_test.go b/agent/utils/sync/sequential_waitgroup_test.go deleted file mode 100644 index ff289f364c6..00000000000 --- a/agent/utils/sync/sequential_waitgroup_test.go +++ /dev/null @@ -1,70 +0,0 @@ -//go:build unit -// +build unit - -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sync - -import "testing" - -func TestSequentialWaitgroup(t *testing.T) { - wg := NewSequentialWaitGroup() - wg.Add(1, 1) - wg.Add(2, 1) - wg.Add(1, 1) - - // Wait for '0' should not fail, nothing for sequence numbers below it - wg.Wait(0) - - done := make(chan bool) - go func() { - wg.Done(1) - wg.Done(1) - wg.Wait(1) - wg.Done(2) - wg.Wait(2) - done <- true - }() - <-done -} - -func TestManyDones(t *testing.T) { - wg := NewSequentialWaitGroup() - - waitGroupCount := 10 - for i := 1; i < waitGroupCount; i++ { - wg.Add(int64(i), i) - } - - for i := 1; i < waitGroupCount; i++ { - wg.Wait(int64(i - 1)) - - isAwake := make(chan bool) - go func(i int64) { - wg.Wait(i) - isAwake <- true - }(int64(i)) - - for j := 0; j < i; j++ { - if j < i-1 { - select { - case <-isAwake: - t.Fatal("Should not be awake before all dones called") - default: - } - } - wg.Done(int64(i)) - } - } -}