Skip to content

Commit fa4da21

Browse files
Merge Feature/task-resource-accounting to dev (#3819)
* Revert reverted changes for task resource accounting (#3796) * Revert "Revert "host resource manager initialization"" This reverts commit dafb967. * Revert "Revert "Add method to get host resources reserved for a task (#3706)"" This reverts commit 8d824db. * Revert "Revert "Add host resource manager methods (#3700)"" This reverts commit bec1303. * Revert "Revert "Remove task serialization and use host resource manager for task resources (#3723)"" This reverts commit cb54139. * Revert "Revert "add integ tests for task accounting (#3741)"" This reverts commit 61ad010. * Revert "Revert "Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order (#3747)"" This reverts commit 60a3f42. * Revert "Revert "Dont consume host resources for tasks getting STOPPED while waiting in waitingTasksQueue (#3750)"" This reverts commit 8943792. * fix memory resource accounting for multiple containers in single task (#3782) * fix memory resource accounting for multiple containers * change unit tests for multiple containers, add unit test for awsvpc
1 parent e919279 commit fa4da21

39 files changed

+1871
-455
lines changed

agent/acs/handler/acs_handler_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -1418,7 +1418,6 @@ func validateAddedTask(expectedTask apitask.Task, addedTask apitask.Task) error
14181418
Family: addedTask.Family,
14191419
Version: addedTask.Version,
14201420
DesiredStatusUnsafe: addedTask.GetDesiredStatus(),
1421-
StartSequenceNumber: addedTask.StartSequenceNumber,
14221421
}
14231422

14241423
if !reflect.DeepEqual(expectedTask, taskToCompareFromAdded) {

agent/acs/update_handler/updater_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) {
128128
Reason: ptr("Updates are disabled").(*string),
129129
}})
130130

131-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
131+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
132132
msg := &ecsacs.PerformUpdateMessage{
133133
ClusterArn: ptr("cluster").(*string),
134134
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) {
182182

183183
require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")
184184

185-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
185+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
186186
msg := &ecsacs.PerformUpdateMessage{
187187
ClusterArn: ptr("cluster").(*string),
188188
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) {
250250
MessageId: ptr("mid").(*string),
251251
}})
252252

253-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
253+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
254254
msg := &ecsacs.PerformUpdateMessage{
255255
ClusterArn: ptr("cluster").(*string),
256256
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) {
308308

309309
require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")
310310

311-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
311+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
312312
msg := &ecsacs.PerformUpdateMessage{
313313
ClusterArn: ptr("cluster").(*string),
314314
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) {
377377

378378
require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")
379379

380-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
380+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
381381
msg := &ecsacs.PerformUpdateMessage{
382382
ClusterArn: ptr("cluster").(*string),
383383
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) {
448448

449449
require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written")
450450

451-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
451+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
452452
msg := &ecsacs.PerformUpdateMessage{
453453
ClusterArn: ptr("cluster").(*string),
454454
ContainerInstanceArn: ptr("containerInstance").(*string),

agent/api/ecsclient/client.go

+18
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,24 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) {
332332
return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil
333333
}
334334

335+
// GetHostResources calling getHostResources to get a list of CPU, MEMORY, PORTS and PORTS_UPD resources
336+
// and return a resourceMap that map the resource name to each resource
337+
func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) {
338+
resources, err := client.getResources()
339+
if err != nil {
340+
return nil, err
341+
}
342+
resourceMap := make(map[string]*ecs.Resource)
343+
for _, resource := range resources {
344+
if *resource.Name == "PORTS" {
345+
// Except for RCI, TCP Ports are named as PORTS_TCP in agent for Host Resources purpose
346+
resource.Name = utils.Strptr("PORTS_TCP")
347+
}
348+
resourceMap[*resource.Name] = resource
349+
}
350+
return resourceMap, nil
351+
}
352+
335353
func getCpuAndMemory() (int64, int64) {
336354
memInfo, err := system.ReadMemInfo()
337355
mem := int64(0)

agent/api/interface.go

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type ECSClient interface {
5555
// UpdateContainerInstancesState updates the given container Instance ID with
5656
// the given status. Only valid statuses are ACTIVE and DRAINING.
5757
UpdateContainerInstancesState(instanceARN, status string) error
58+
// GetHostResources retrieves a map that map the resource name to the corresponding resource
59+
GetHostResources() (map[string]*ecs.Resource, error)
5860
}
5961

6062
// ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS

agent/api/mocks/api_mocks.go

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/api/task/task.go

+144-24
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
apiappmesh "github.com/aws/amazon-ecs-agent/agent/api/appmesh"
28+
"github.com/aws/amazon-ecs-agent/agent/api/container"
2829
apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
2930
apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status"
3031
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
@@ -47,6 +48,7 @@ import (
4748
apieni "github.com/aws/amazon-ecs-agent/ecs-agent/api/eni"
4849
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
4950
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
51+
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
5052
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
5153
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
5254
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/arn"
@@ -233,9 +235,6 @@ type Task struct {
233235
// is handled properly so that the state storage continues to work.
234236
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`
235237

236-
StartSequenceNumber int64
237-
StopSequenceNumber int64
238-
239238
// ExecutionCredentialsID is the ID of credentials that are used by agent to
240239
// perform some action at the task level, such as pulling image from ECR
241240
ExecutionCredentialsID string `json:"executionCredentialsID"`
@@ -311,11 +310,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
311310
if err := json.Unmarshal(data, task); err != nil {
312311
return nil, err
313312
}
314-
if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil {
315-
task.StartSequenceNumber = *envelope.SeqNum
316-
} else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil {
317-
task.StopSequenceNumber = *envelope.SeqNum
318-
}
319313

320314
// Overrides the container command if it's set
321315
for _, container := range task.Containers {
@@ -2824,22 +2818,6 @@ func (task *Task) GetAppMesh() *apiappmesh.AppMesh {
28242818
return task.AppMesh
28252819
}
28262820

2827-
// GetStopSequenceNumber returns the stop sequence number of a task
2828-
func (task *Task) GetStopSequenceNumber() int64 {
2829-
task.lock.RLock()
2830-
defer task.lock.RUnlock()
2831-
2832-
return task.StopSequenceNumber
2833-
}
2834-
2835-
// SetStopSequenceNumber sets the stop seqence number of a task
2836-
func (task *Task) SetStopSequenceNumber(seqnum int64) {
2837-
task.lock.Lock()
2838-
defer task.lock.Unlock()
2839-
2840-
task.StopSequenceNumber = seqnum
2841-
}
2842-
28432821
// SetPullStartedAt sets the task pullstartedat timestamp and returns whether
28442822
// this field was updated or not
28452823
func (task *Task) SetPullStartedAt(timestamp time.Time) bool {
@@ -3525,3 +3503,145 @@ func (task *Task) IsServiceConnectConnectionDraining() bool {
35253503
defer task.lock.RUnlock()
35263504
return task.ServiceConnectConnectionDrainingUnsafe
35273505
}
3506+
3507+
// ToHostResources will convert a task to a map of resources which ECS takes into account when scheduling tasks on instances
3508+
// * CPU
3509+
// - If task level CPU is set, use that
3510+
// - Else add up container CPUs
3511+
//
3512+
// * Memory
3513+
// - If task level memory is set, use that
3514+
// - Else add up container level
3515+
// - If memoryReservation field is set, use that
3516+
// - Else use memory field
3517+
//
3518+
// * Ports (TCP/UDP)
3519+
// - Only account for hostPort
3520+
// - Don't need to account for awsvpc mode, each task gets its own namespace
3521+
//
3522+
// * GPU
3523+
// - Return num of gpus requested (len of GPUIDs field)
3524+
func (task *Task) ToHostResources() map[string]*ecs.Resource {
3525+
resources := make(map[string]*ecs.Resource)
3526+
// CPU
3527+
if task.CPU > 0 {
3528+
// cpu unit is vcpu at task level
3529+
// convert to cpushares
3530+
taskCPUint64 := int64(task.CPU * 1024)
3531+
resources["CPU"] = &ecs.Resource{
3532+
Name: utils.Strptr("CPU"),
3533+
Type: utils.Strptr("INTEGER"),
3534+
IntegerValue: &taskCPUint64,
3535+
}
3536+
} else {
3537+
// cpu unit is cpushares at container level
3538+
containerCPUint64 := int64(0)
3539+
for _, container := range task.Containers {
3540+
containerCPUint64 += int64(container.CPU)
3541+
}
3542+
resources["CPU"] = &ecs.Resource{
3543+
Name: utils.Strptr("CPU"),
3544+
Type: utils.Strptr("INTEGER"),
3545+
IntegerValue: &containerCPUint64,
3546+
}
3547+
}
3548+
3549+
// Memory
3550+
if task.Memory > 0 {
3551+
// memory unit is MiB at task level
3552+
taskMEMint64 := task.Memory
3553+
resources["MEMORY"] = &ecs.Resource{
3554+
Name: utils.Strptr("MEMORY"),
3555+
Type: utils.Strptr("INTEGER"),
3556+
IntegerValue: &taskMEMint64,
3557+
}
3558+
} else {
3559+
containerMEMint64 := int64(0)
3560+
3561+
for _, c := range task.Containers {
3562+
// To parse memory reservation / soft limit
3563+
hostConfig := &dockercontainer.HostConfig{}
3564+
3565+
if c.DockerConfig.HostConfig != nil {
3566+
err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig)
3567+
if err != nil || hostConfig.MemoryReservation <= 0 {
3568+
// container memory unit is MiB, keeping as is
3569+
containerMEMint64 += int64(c.Memory)
3570+
} else {
3571+
// Soft limit is specified in MiB units but translated to bytes while being transferred to Agent
3572+
// Converting back to MiB
3573+
containerMEMint64 += hostConfig.MemoryReservation / (1024 * 1024)
3574+
}
3575+
} else {
3576+
// container memory unit is MiB, keeping as is
3577+
containerMEMint64 += int64(c.Memory)
3578+
}
3579+
}
3580+
resources["MEMORY"] = &ecs.Resource{
3581+
Name: utils.Strptr("MEMORY"),
3582+
Type: utils.Strptr("INTEGER"),
3583+
IntegerValue: &containerMEMint64,
3584+
}
3585+
}
3586+
3587+
// PORTS_TCP and PORTS_UDP
3588+
var tcpPortSet []uint16
3589+
var udpPortSet []uint16
3590+
3591+
// AWSVPC tasks have 'host' ports mapped to task ENI, not to host
3592+
// So don't need to keep an 'account' of awsvpc tasks with host ports fields assigned
3593+
if !task.IsNetworkModeAWSVPC() {
3594+
for _, c := range task.Containers {
3595+
for _, port := range c.Ports {
3596+
hostPort := port.HostPort
3597+
protocol := port.Protocol
3598+
if hostPort > 0 && protocol == container.TransportProtocolTCP {
3599+
tcpPortSet = append(tcpPortSet, hostPort)
3600+
} else if hostPort > 0 && protocol == container.TransportProtocolUDP {
3601+
udpPortSet = append(udpPortSet, hostPort)
3602+
}
3603+
}
3604+
}
3605+
}
3606+
resources["PORTS_TCP"] = &ecs.Resource{
3607+
Name: utils.Strptr("PORTS_TCP"),
3608+
Type: utils.Strptr("STRINGSET"),
3609+
StringSetValue: utils.Uint16SliceToStringSlice(tcpPortSet),
3610+
}
3611+
resources["PORTS_UDP"] = &ecs.Resource{
3612+
Name: utils.Strptr("PORTS_UDP"),
3613+
Type: utils.Strptr("STRINGSET"),
3614+
StringSetValue: utils.Uint16SliceToStringSlice(udpPortSet),
3615+
}
3616+
3617+
// GPU
3618+
var num_gpus int64
3619+
num_gpus = 0
3620+
for _, c := range task.Containers {
3621+
num_gpus += int64(len(c.GPUIDs))
3622+
}
3623+
resources["GPU"] = &ecs.Resource{
3624+
Name: utils.Strptr("GPU"),
3625+
Type: utils.Strptr("INTEGER"),
3626+
IntegerValue: &num_gpus,
3627+
}
3628+
logger.Debug("Task host resources to account for", logger.Fields{
3629+
"taskArn": task.Arn,
3630+
"CPU": *resources["CPU"].IntegerValue,
3631+
"MEMORY": *resources["MEMORY"].IntegerValue,
3632+
"PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue),
3633+
"PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue),
3634+
"GPU": *resources["GPU"].IntegerValue,
3635+
})
3636+
return resources
3637+
}
3638+
3639+
func (task *Task) HasActiveContainers() bool {
3640+
for _, container := range task.Containers {
3641+
containerStatus := container.GetKnownStatus()
3642+
if containerStatus >= apicontainerstatus.ContainerPulled && containerStatus <= apicontainerstatus.ContainerResourcesProvisioned {
3643+
return true
3644+
}
3645+
}
3646+
return false
3647+
}

0 commit comments

Comments
 (0)