Skip to content

Commit

Permalink
Revert reverted changes for task resource accounting (#3796)
Browse files Browse the repository at this point in the history
* Revert "Revert "host resource manager initialization""

This reverts commit dafb967.

* Revert "Revert "Add method to get host resources reserved for a task (#3706)""

This reverts commit 8d824db.

* Revert "Revert "Add host resource manager methods (#3700)""

This reverts commit bec1303.

* Revert "Revert "Remove task serialization and use host resource manager for task resources (#3723)""

This reverts commit cb54139.

* Revert "Revert "add integ tests for task accounting (#3741)""

This reverts commit 61ad010.

* Revert "Revert "Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order (#3747)""

This reverts commit 60a3f42.

* Revert "Revert "Dont consume host resources for tasks getting STOPPED while waiting in waitingTasksQueue (#3750)""

This reverts commit 8943792.
  • Loading branch information
prateekchaudhry authored Jul 12, 2023
1 parent 1e72259 commit 96a64ef
Show file tree
Hide file tree
Showing 39 changed files with 1,767 additions and 455 deletions.
1 change: 0 additions & 1 deletion agent/acs/handler/acs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,6 @@ func validateAddedTask(expectedTask apitask.Task, addedTask apitask.Task) error
Family: addedTask.Family,
Version: addedTask.Version,
DesiredStatusUnsafe: addedTask.GetDesiredStatus(),
StartSequenceNumber: addedTask.StartSequenceNumber,
}

if !reflect.DeepEqual(expectedTask, taskToCompareFromAdded) {
Expand Down
12 changes: 6 additions & 6 deletions agent/acs/update_handler/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) {
Reason: ptr("Updates are disabled").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) {
MessageId: ptr("mid").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) {

require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down
18 changes: 18 additions & 0 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,24 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) {
return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil
}

// GetHostResources calling getHostResources to get a list of CPU, MEMORY, PORTS and PORTS_UPD resources
// and return a resourceMap that map the resource name to each resource
func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) {
resources, err := client.getResources()
if err != nil {
return nil, err
}
resourceMap := make(map[string]*ecs.Resource)
for _, resource := range resources {
if *resource.Name == "PORTS" {
// Except for RCI, TCP Ports are named as PORTS_TCP in agent for Host Resources purpose
resource.Name = utils.Strptr("PORTS_TCP")
}
resourceMap[*resource.Name] = resource
}
return resourceMap, nil
}

func getCpuAndMemory() (int64, int64) {
memInfo, err := system.ReadMemInfo()
mem := int64(0)
Expand Down
2 changes: 2 additions & 0 deletions agent/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type ECSClient interface {
// UpdateContainerInstancesState updates the given container Instance ID with
// the given status. Only valid statuses are ACTIVE and DRAINING.
UpdateContainerInstancesState(instanceARN, status string) error
// GetHostResources retrieves a map that map the resource name to the corresponding resource
GetHostResources() (map[string]*ecs.Resource, error)
}

// ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS
Expand Down
15 changes: 15 additions & 0 deletions agent/api/mocks/api_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

167 changes: 143 additions & 24 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

apiappmesh "github.com/aws/amazon-ecs-agent/agent/api/appmesh"
"github.com/aws/amazon-ecs-agent/agent/api/container"
apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
Expand All @@ -47,6 +48,7 @@ import (
apieni "github.com/aws/amazon-ecs-agent/ecs-agent/api/eni"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/arn"
Expand Down Expand Up @@ -233,9 +235,6 @@ type Task struct {
// is handled properly so that the state storage continues to work.
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`

StartSequenceNumber int64
StopSequenceNumber int64

// ExecutionCredentialsID is the ID of credentials that are used by agent to
// perform some action at the task level, such as pulling image from ECR
ExecutionCredentialsID string `json:"executionCredentialsID"`
Expand Down Expand Up @@ -311,11 +310,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
if err := json.Unmarshal(data, task); err != nil {
return nil, err
}
if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil {
task.StartSequenceNumber = *envelope.SeqNum
} else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil {
task.StopSequenceNumber = *envelope.SeqNum
}

// Overrides the container command if it's set
for _, container := range task.Containers {
Expand Down Expand Up @@ -2824,22 +2818,6 @@ func (task *Task) GetAppMesh() *apiappmesh.AppMesh {
return task.AppMesh
}

// GetStopSequenceNumber returns the stop sequence number of a task
func (task *Task) GetStopSequenceNumber() int64 {
task.lock.RLock()
defer task.lock.RUnlock()

return task.StopSequenceNumber
}

// SetStopSequenceNumber sets the stop seqence number of a task
func (task *Task) SetStopSequenceNumber(seqnum int64) {
task.lock.Lock()
defer task.lock.Unlock()

task.StopSequenceNumber = seqnum
}

// SetPullStartedAt sets the task pullstartedat timestamp and returns whether
// this field was updated or not
func (task *Task) SetPullStartedAt(timestamp time.Time) bool {
Expand Down Expand Up @@ -3525,3 +3503,144 @@ func (task *Task) IsServiceConnectConnectionDraining() bool {
defer task.lock.RUnlock()
return task.ServiceConnectConnectionDrainingUnsafe
}

// ToHostResources will convert a task to a map of resources which ECS takes into account when scheduling tasks on instances
// * CPU
// - If task level CPU is set, use that
// - Else add up container CPUs
//
// * Memory
// - If task level memory is set, use that
// - Else add up container level
// - If memoryReservation field is set, use that
// - Else use memory field
//
// * Ports (TCP/UDP)
// - Only account for hostPort
// - Don't need to account for awsvpc mode, each task gets its own namespace
//
// * GPU
// - Return num of gpus requested (len of GPUIDs field)
func (task *Task) ToHostResources() map[string]*ecs.Resource {
resources := make(map[string]*ecs.Resource)
// CPU
if task.CPU > 0 {
// cpu unit is vcpu at task level
// convert to cpushares
taskCPUint64 := int64(task.CPU * 1024)
resources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &taskCPUint64,
}
} else {
// cpu unit is cpushares at container level
containerCPUint64 := int64(0)
for _, container := range task.Containers {
containerCPUint64 += int64(container.CPU)
}
resources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &containerCPUint64,
}
}

// Memory
if task.Memory > 0 {
// memory unit is MiB at task level
taskMEMint64 := task.Memory
resources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &taskMEMint64,
}
} else {
containerMEMint64 := int64(0)
// To parse memory reservation / soft limit
hostConfig := &dockercontainer.HostConfig{}

for _, c := range task.Containers {
if c.DockerConfig.HostConfig != nil {
err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig)
if err != nil || hostConfig.MemoryReservation <= 0 {
// container memory unit is MiB, keeping as is
containerMEMint64 += int64(c.Memory)
} else {
// Soft limit is specified in MiB units but translated to bytes while being transferred to Agent
// Converting back to MiB
containerMEMint64 += hostConfig.MemoryReservation / (1024 * 1024)
}
} else {
// container memory unit is MiB, keeping as is
containerMEMint64 += int64(c.Memory)
}
}
resources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &containerMEMint64,
}
}

// PORTS_TCP and PORTS_UDP
var tcpPortSet []uint16
var udpPortSet []uint16

// AWSVPC tasks have 'host' ports mapped to task ENI, not to host
// So don't need to keep an 'account' of awsvpc tasks with host ports fields assigned
if !task.IsNetworkModeAWSVPC() {
for _, c := range task.Containers {
for _, port := range c.Ports {
hostPort := port.HostPort
protocol := port.Protocol
if hostPort > 0 && protocol == container.TransportProtocolTCP {
tcpPortSet = append(tcpPortSet, hostPort)
} else if hostPort > 0 && protocol == container.TransportProtocolUDP {
udpPortSet = append(udpPortSet, hostPort)
}
}
}
}
resources["PORTS_TCP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_TCP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: utils.Uint16SliceToStringSlice(tcpPortSet),
}
resources["PORTS_UDP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_UDP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: utils.Uint16SliceToStringSlice(udpPortSet),
}

// GPU
var num_gpus int64
num_gpus = 0
for _, c := range task.Containers {
num_gpus += int64(len(c.GPUIDs))
}
resources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &num_gpus,
}
logger.Debug("Task host resources to account for", logger.Fields{
"taskArn": task.Arn,
"CPU": *resources["CPU"].IntegerValue,
"MEMORY": *resources["MEMORY"].IntegerValue,
"PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue),
"PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue),
"GPU": *resources["GPU"].IntegerValue,
})
return resources
}

func (task *Task) HasActiveContainers() bool {
for _, container := range task.Containers {
containerStatus := container.GetKnownStatus()
if containerStatus >= apicontainerstatus.ContainerPulled && containerStatus <= apicontainerstatus.ContainerResourcesProvisioned {
return true
}
}
return false
}
Loading

0 comments on commit 96a64ef

Please sign in to comment.