Skip to content

Commit

Permalink
Support for setting PIDs limit for ECS tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Sep 25, 2023
1 parent 78703a3 commit 44fbb6c
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ additional details on each available environment variable.
| `CREDENTIALS_FETCHER_HOST` | `unix:///var/credentials-fetcher/socket/credentials_fetcher.sock` | Used to create a connection to the [credentials-fetcher daemon](https://github.com/aws/credentials-fetcher); to support gMSA on Linux. The default is fine for most users, only needs to be modified if user is configuring a custom credentials-fetcher socket path, ie, [CF_UNIX_DOMAIN_SOCKET_DIR](https://github.com/aws/credentials-fetcher#default-environment-variables). | `unix:///var/credentials-fetcher/socket/credentials_fetcher.sock` | Not Applicable |
| `CREDENTIALS_FETCHER_SECRET_NAME_FOR_DOMAINLESS_GMSA` | `secretmanager-secretname` | Used to support scaling option for gMSA on Linux [credentials-fetcher daemon](https://github.com/aws/credentials-fetcher). If user is configuring gMSA on a non-domain joined instance, they need to create an Active Directory user with access to retrieve principals for the gMSA account and store it in secrets manager | `secretmanager-secretname` | Not Applicable |
| `ECS_DYNAMIC_HOST_PORT_RANGE` | `100-200` | This specifies the dynamic host port range that the agent uses to assign host ports from, for container ports mapping. If there are no available ports in the range for containers, including customer containers and Service Connect Agent containers (if Service Connect is enabled), service deployments would fail. | Defined by `/proc/sys/net/ipv4/ip_local_port_range` | `49152-65535` |
| `ECS_TASK_PIDS_LIMIT` | `100` | Specifies the per-task pids limit cgroup setting for each task launched on the container instance. This setting maps to the pids.max cgroup setting at the ECS task level. See https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#pid. If unset, pids will be unlimited. Min value is 1 and max value is 4194304 (4*1024*1024) | `unset` | Not Supported on Windows |

Additionally, the following environment variable(s) can be used to configure the behavior of the ecs-init service. When using ECS-Init, all env variables, including the ECS Agent variables above, are read from path `/etc/ecs/ecs.config`:
| Environment Variable Name | Example Value(s) | Description | Default value |
Expand Down
7 changes: 4 additions & 3 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ func (task *Task) PostUnmarshalTask(cfg *config.Config,

task.adjustForPlatform(cfg)

// TODO, add rudimentary plugin support and call any plugins that want to
// hook into this
if err := task.initializeCgroupResourceSpec(cfg.CgroupPath, cfg.CgroupCPUPeriod, resourceFields); err != nil {
// Initialize cgroup resource spec definition for later cgroup resource creation.
// This sets up the cgroup spec for cpu, memory, and pids limits for the task.
// Actual cgroup creation happens later.
if err := task.initializeCgroupResourceSpec(cfg.CgroupPath, cfg.CgroupCPUPeriod, cfg.TaskPidsLimit, resourceFields); err != nil {
logger.Error("Could not initialize resource", logger.Fields{
field.TaskID: task.GetID(),
field.Error: err,
Expand Down
14 changes: 11 additions & 3 deletions agent/api/task/task_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (task *Task) adjustForPlatform(cfg *config.Config) {
task.MemoryCPULimitsEnabled = cfg.TaskCPUMemLimit.Enabled()
}

func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPeriod time.Duration, resourceFields *taskresource.ResourceFields) error {
func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPeriod time.Duration, taskPidsLimit int, resourceFields *taskresource.ResourceFields) error {
if !task.MemoryCPULimitsEnabled {
if task.CPU > 0 || task.Memory > 0 {
// Client-side validation/warning if a task with task-level CPU/memory limits specified somehow lands on an instance
Expand All @@ -74,7 +74,7 @@ func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPerio
if err != nil {
return errors.Wrapf(err, "cgroup resource: unable to determine cgroup root for task")
}
resSpec, err := task.BuildLinuxResourceSpec(cGroupCPUPeriod)
resSpec, err := task.BuildLinuxResourceSpec(cGroupCPUPeriod, taskPidsLimit)
if err != nil {
return errors.Wrapf(err, "cgroup resource: unable to build resource spec for task")
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func buildCgroupV2Root(taskID string) string {
}

// BuildLinuxResourceSpec returns a linuxResources object for the task cgroup
func (task *Task) BuildLinuxResourceSpec(cGroupCPUPeriod time.Duration) (specs.LinuxResources, error) {
func (task *Task) BuildLinuxResourceSpec(cGroupCPUPeriod time.Duration, taskPidsLimit int) (specs.LinuxResources, error) {
linuxResourceSpec := specs.LinuxResources{}

// If task level CPU limits are requested, set CPU quota + CPU period
Expand All @@ -148,6 +148,14 @@ func (task *Task) BuildLinuxResourceSpec(cGroupCPUPeriod time.Duration) (specs.L
linuxResourceSpec.Memory = &linuxMemorySpec
}

// Set task pids limit if set via ECS_TASK_PIDS_LIMIT env var
if taskPidsLimit > 0 {
pidsLimit := &specs.LinuxPids{
Limit: int64(taskPidsLimit),
}
linuxResourceSpec.Pids = pidsLimit
}

return linuxResourceSpec, nil
}

Expand Down
108 changes: 98 additions & 10 deletions agent/api/task/task_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,68 @@ func TestBuildLinuxResourceSpecCPUMem(t *testing.T) {
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
}

// BuildLinuxResourceSpec tested with pid limits passed in.
func TestBuildLinuxResourceSpecCPUMem_WithPidLimits(t *testing.T) {
taskMemoryLimit := int64(taskMemoryLimit)

task := &Task{
Arn: validTaskArn,
CPU: float64(taskVCPULimit),
Memory: taskMemoryLimit,
}

expectedTaskCPUPeriod := uint64(defaultCPUPeriod / time.Microsecond)
expectedTaskCPUQuota := int64(taskVCPULimit * float64(expectedTaskCPUPeriod))
expectedTaskMemory := taskMemoryLimit * bytesPerMegabyte
expectedLinuxResourceSpec := specs.LinuxResources{
CPU: &specs.LinuxCPU{
Quota: &expectedTaskCPUQuota,
Period: &expectedTaskCPUPeriod,
},
Memory: &specs.LinuxMemory{
Limit: &expectedTaskMemory,
},
Pids: &specs.LinuxPids{
Limit: int64(100),
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 100)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
}

// no pid limits expected when BuildLinuxResourceSpec receives an invalid value.
func TestBuildLinuxResourceSpecCPUMem_NegativeInvalidPidLimits(t *testing.T) {
taskMemoryLimit := int64(taskMemoryLimit)

task := &Task{
Arn: validTaskArn,
CPU: float64(taskVCPULimit),
Memory: taskMemoryLimit,
}

expectedTaskCPUPeriod := uint64(defaultCPUPeriod / time.Microsecond)
expectedTaskCPUQuota := int64(taskVCPULimit * float64(expectedTaskCPUPeriod))
expectedTaskMemory := taskMemoryLimit * bytesPerMegabyte
expectedLinuxResourceSpec := specs.LinuxResources{
CPU: &specs.LinuxCPU{
Quota: &expectedTaskCPUQuota,
Period: &expectedTaskCPUPeriod,
},
Memory: &specs.LinuxMemory{
Limit: &expectedTaskMemory,
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, -1)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
Expand All @@ -325,7 +386,7 @@ func TestBuildLinuxResourceSpecCPU(t *testing.T) {
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
Expand All @@ -340,7 +401,7 @@ func TestBuildLinuxResourceSpecIncreasedTaskCPULimit(t *testing.T) {
CPU: increasedTaskVCPULimit,
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

expectedTaskCPUPeriod := uint64(defaultCPUPeriod / time.Microsecond)
expectedTaskCPUQuota := int64(increasedTaskVCPULimit * float64(expectedTaskCPUPeriod))
Expand Down Expand Up @@ -371,7 +432,34 @@ func TestBuildLinuxResourceSpecWithoutTaskCPULimits(t *testing.T) {
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
}

// TestBuildLinuxResourceSpecWithoutTaskCPULimits validates behavior of CPU Shares
// validate that pid limits are also inserted correctly
func TestBuildLinuxResourceSpecWithoutTaskCPULimits_WithPidLimits(t *testing.T) {
task := &Task{
Arn: validTaskArn,
Containers: []*apicontainer.Container{
{
Name: "C1",
},
},
}
expectedCPUShares := uint64(minimumCPUShare)
expectedLinuxResourceSpec := specs.LinuxResources{
CPU: &specs.LinuxCPU{
Shares: &expectedCPUShares,
},
Pids: &specs.LinuxPids{
Limit: int64(100),
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 100)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
Expand All @@ -395,7 +483,7 @@ func TestBuildLinuxResourceSpecWithoutTaskCPUWithContainerCPULimits(t *testing.T
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
Expand All @@ -420,7 +508,7 @@ func TestBuildLinuxResourceSpecWithoutTaskCPUWithLessThanMinimumContainerCPULimi
},
}

linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

assert.NoError(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
Expand All @@ -443,7 +531,7 @@ func TestBuildLinuxResourceSpecInvalidMem(t *testing.T) {
}

expectedLinuxResourceSpec := specs.LinuxResources{}
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod)
linuxResourceSpec, err := task.BuildLinuxResourceSpec(defaultCPUPeriod, 0)

assert.Error(t, err)
assert.EqualValues(t, expectedLinuxResourceSpec, linuxResourceSpec)
Expand Down Expand Up @@ -593,7 +681,7 @@ func TestInitCgroupResourceSpecHappyPath(t *testing.T) {
defer ctrl.Finish()
mockControl := mock_control.NewMockControl(ctrl)
mockIO := mock_ioutilwrapper.NewMockIOUtil(ctrl)
assert.NoError(t, task.initializeCgroupResourceSpec("cgroupPath", defaultCPUPeriod, &taskresource.ResourceFields{
assert.NoError(t, task.initializeCgroupResourceSpec("cgroupPath", defaultCPUPeriod, 0, &taskresource.ResourceFields{
Control: mockControl,
ResourceFieldsCommon: &taskresource.ResourceFieldsCommon{
IOUtil: mockIO,
Expand All @@ -617,7 +705,7 @@ func TestInitCgroupResourceSpecInvalidARN(t *testing.T) {
MemoryCPULimitsEnabled: true,
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
}
assert.Error(t, task.initializeCgroupResourceSpec("", time.Millisecond, nil))
assert.Error(t, task.initializeCgroupResourceSpec("", time.Millisecond, 0, nil))
assert.Equal(t, 0, len(task.GetResources()))
assert.Equal(t, 0, len(task.Containers[0].TransitionDependenciesMap))
}
Expand All @@ -638,7 +726,7 @@ func TestInitCgroupResourceSpecInvalidMem(t *testing.T) {
MemoryCPULimitsEnabled: true,
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
}
assert.Error(t, task.initializeCgroupResourceSpec("", time.Millisecond, nil))
assert.Error(t, task.initializeCgroupResourceSpec("", time.Millisecond, 0, nil))
assert.Equal(t, 0, len(task.GetResources()))
assert.Equal(t, 0, len(task.Containers[0].TransitionDependenciesMap))
}
Expand Down
2 changes: 1 addition & 1 deletion agent/api/task/task_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (task *Task) adjustForPlatform(cfg *config.Config) {
task.MemoryCPULimitsEnabled = cfg.TaskCPUMemLimit.Enabled()
}

func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPeriod time.Duration, resourceFields *taskresource.ResourceFields) error {
func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPeriod time.Duration, taskPidsLimit int, resourceFields *taskresource.ResourceFields) error {
if !task.MemoryCPULimitsEnabled {
if task.CPU > 0 || task.Memory > 0 {
// Client-side validation/warning if a task with task-level CPU/memory limits specified somehow lands on an instance
Expand Down
2 changes: 1 addition & 1 deletion agent/api/task/task_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (task *Task) dockerCPUShares(containerCPU uint) int64 {
return int64(containerCPU)
}

func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPeriod time.Duration, resourceFields *taskresource.ResourceFields) error {
func (task *Task) initializeCgroupResourceSpec(cgroupPath string, cGroupCPUPeriod time.Duration, taskPidsLimit int, resourceFields *taskresource.ResourceFields) error {
if !task.MemoryCPULimitsEnabled {
if task.CPU > 0 || task.Memory > 0 {
// Client-side validation/warning if a task with task-level CPU/memory limits specified somehow lands on an instance
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ func environmentConfig() (Config, error) {
ShouldExcludeIPv6PortBinding: parseBooleanDefaultTrueConfig("ECS_EXCLUDE_IPV6_PORTBINDING"),
WarmPoolsSupport: parseBooleanDefaultFalseConfig("ECS_WARM_POOLS_CHECK"),
DynamicHostPortRange: parseDynamicHostPortRange("ECS_DYNAMIC_HOST_PORT_RANGE"),
TaskPidsLimit: parseTaskPidsLimit(),
}, err
}

Expand Down
25 changes: 25 additions & 0 deletions agent/config/parse_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"io/fs"
"os"
"strconv"
"strings"

"github.com/aws/amazon-ecs-agent/agent/utils"
Expand Down Expand Up @@ -110,3 +111,27 @@ var IsWindows2016 = func() (bool, error) {
func GetOSFamily() string {
return strings.ToUpper(OSType)
}

func parseTaskPidsLimit() int {
var taskPidsLimit int
pidsLimitEnvVal := os.Getenv("ECS_TASK_PIDS_LIMIT")
if pidsLimitEnvVal == "" {
seelog.Debug("Environment variable empty: ECS_TASK_PIDS_LIMIT")
return 0
}

taskPidsLimit, err := strconv.Atoi(strings.TrimSpace(pidsLimitEnvVal))
if err != nil {
seelog.Warnf(`Invalid format for "ECS_TASK_PIDS_LIMIT", expected an integer but got [%v]: %v`, pidsLimitEnvVal, err)
return 0
}

// 4194304 is a defacto limit set by runc on Amazon Linux (4*1024*1024), so
// we should use the same to avoid runtime container failures.
if taskPidsLimit <= 0 || taskPidsLimit > 4194304 {
seelog.Warnf(`Invalid value for "ECS_TASK_PIDS_LIMIT", expected integer greater than 0 and less than 4194305, but got [%v]`, taskPidsLimit)
return 0
}

return taskPidsLimit
}
29 changes: 29 additions & 0 deletions agent/config/parse_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func TestSkipDomainLessCheckParseGMSACapability(t *testing.T) {

assert.True(t, parseGMSADomainlessCapability().Enabled())
}

func TestParseTaskPidsLimit(t *testing.T) {
t.Setenv("ECS_TASK_PIDS_LIMIT", "1")
assert.Equal(t, 1, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "10")
assert.Equal(t, 10, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "100")
assert.Equal(t, 100, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "10000")
assert.Equal(t, 10000, parseTaskPidsLimit())
// test the upper limit minus 1
t.Setenv("ECS_TASK_PIDS_LIMIT", "4194304")
assert.Equal(t, 4194304, parseTaskPidsLimit())
// test the upper limit
t.Setenv("ECS_TASK_PIDS_LIMIT", "4194305")
assert.Equal(t, 0, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "0")
assert.Equal(t, 0, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "-1")
assert.Equal(t, 0, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "foobar")
assert.Equal(t, 0, parseTaskPidsLimit())
t.Setenv("ECS_TASK_PIDS_LIMIT", "")
assert.Equal(t, 0, parseTaskPidsLimit())
}

func TestParseTaskPidsLimit_Unset(t *testing.T) {
assert.Equal(t, 0, parseTaskPidsLimit())
}
29 changes: 26 additions & 3 deletions agent/config/parse_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//go:build unit
// +build unit

package config

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -114,3 +112,28 @@ func TestParseBooleanDefaultTrueConfig(t *testing.T) {
v = parseBooleanDefaultTrueConfig("ECS_PARSE_BOOLEAN_DEFAULT_TRUE")
assert.True(t, v.Enabled())
}

func TestParseContainerInstanceTags(t *testing.T) {
// empty
t.Setenv("ECS_CONTAINER_INSTANCE_TAGS", "")
var expected, actual map[string]string
expectedErrs := []error{}
actualErrs := []error{}
actual, actualErrs = parseContainerInstanceTags(actualErrs)
assert.Equal(t, expected, actual)
assert.Equal(t, expectedErrs, actualErrs)
// with valid values
t.Setenv("ECS_CONTAINER_INSTANCE_TAGS", `{"foo":"bar","baz":"bin","num":"7"}`)
expected = map[string]string{"baz": "bin", "foo": "bar", "num": "7"}
expectedErrs = []error{}
actual, actualErrs = parseContainerInstanceTags(actualErrs)
assert.Equal(t, expected, actual)
assert.Equal(t, expectedErrs, actualErrs)
// with invalid values
t.Setenv("ECS_CONTAINER_INSTANCE_TAGS", `{"foo":"bar","baz":"bin,"num":"7"}`) // missing "
var expectedInvalid map[string]string
expectedErrs = []error{fmt.Errorf("Invalid format for ECS_CONTAINER_INSTANCE_TAGS. Expected a json hash: invalid character 'n' after object key:value pair")}
actual, actualErrs = parseContainerInstanceTags(actualErrs)
assert.Equal(t, expectedInvalid, actual)
assert.Equal(t, expectedErrs, actualErrs)
}
4 changes: 4 additions & 0 deletions agent/config/parse_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ var IsWindows2016 = func() (bool, error) {
func GetOSFamily() string {
return strings.ToUpper(OSType)
}

func parseTaskPidsLimit() int {
return 0
}
4 changes: 4 additions & 0 deletions agent/config/parse_unsupported_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ func TestParseGMSACapabilitySupported(t *testing.T) {
func TestParseFSxWindowsFileServerCapability(t *testing.T) {
assert.False(t, parseGMSACapability().Enabled())
}

func TestParseTaskPidsLimit(t *testing.T) {
assert.Zero(t, parseTaskPidsLimit())
}
Loading

0 comments on commit 44fbb6c

Please sign in to comment.