Skip to content

Commit 40590fe

Browse files
Add host resource manager methods (aws#3700)
1 parent 67dcd21 commit 40590fe

File tree

2 files changed

+516
-16
lines changed

2 files changed

+516
-16
lines changed

agent/engine/host_resource_manager.go

+296-16
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,300 @@
1616
package engine
1717

1818
import (
19+
"fmt"
20+
"sync"
21+
1922
"github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs"
2023
"github.com/aws/amazon-ecs-agent/agent/utils"
2124
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
25+
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
2226
)
2327

24-
// initialHostResource keeps account of each task in
28+
// TODO remove this once resource, consume are used
29+
//lint:file-ignore U1000 Ignore all unused code
30+
31+
const (
32+
CPU = "CPU"
33+
GPU = "GPU"
34+
MEMORY = "MEMORY"
35+
PORTSTCP = "PORTS_TCP"
36+
PORTSUDP = "PORTS_UDP"
37+
)
38+
39+
// HostResourceManager keeps account of host resources allocated for tasks set to be created/running tasks
2540
type HostResourceManager struct {
26-
initialHostResource map[string]*ecs.Resource
27-
consumedResource map[string]*ecs.Resource
41+
initialHostResource map[string]*ecs.Resource
42+
consumedResource map[string]*ecs.Resource
43+
hostResourceManagerRWLock sync.Mutex
2844

2945
//task.arn to boolean whether host resources consumed or not
3046
taskConsumed map[string]bool
3147
}
3248

49+
type ResourceNotFoundForTask struct {
50+
resource string
51+
}
52+
53+
func (e *ResourceNotFoundForTask) Error() string {
54+
return fmt.Sprintf("no %s in task resources", e.resource)
55+
}
56+
57+
type InvalidHostResource struct {
58+
resource string
59+
}
60+
61+
func (e *InvalidHostResource) Error() string {
62+
return fmt.Sprintf("no %s resource found in host resources", e.resource)
63+
}
64+
65+
type ResourceIsNilForTask struct {
66+
resource string
67+
}
68+
69+
func (e *ResourceIsNilForTask) Error() string {
70+
return fmt.Sprintf("resource %s is nil in task resources", e.resource)
71+
}
72+
73+
func (h *HostResourceManager) logResources(msg string, taskArn string) {
74+
logger.Debug(msg, logger.Fields{
75+
"taskArn": taskArn,
76+
"CPU": *h.consumedResource[CPU].IntegerValue,
77+
"MEMORY": *h.consumedResource[MEMORY].IntegerValue,
78+
"PORTS_TCP": h.consumedResource[PORTSTCP].StringSetValue,
79+
"PORTS_UDP": h.consumedResource[PORTSUDP].StringSetValue,
80+
"GPU": *h.consumedResource[GPU].IntegerValue,
81+
})
82+
}
83+
84+
func (h *HostResourceManager) consumeIntType(resourceType string, resources map[string]*ecs.Resource) {
85+
*h.consumedResource[resourceType].IntegerValue += *resources[resourceType].IntegerValue
86+
}
87+
88+
func (h *HostResourceManager) consumeStringSetType(resourceType string, resources map[string]*ecs.Resource) {
89+
resource, ok := resources[resourceType]
90+
if ok {
91+
h.consumedResource[resourceType].StringSetValue = append(h.consumedResource[resourceType].StringSetValue, resource.StringSetValue...)
92+
}
93+
}
94+
95+
// Returns if resources consumed or not and error status
96+
// false, nil -> did not consume, task should stay pending
97+
// false, err -> resources map has errors, task should fail as cannot schedule with 'wrong' resource map (this basically never happens)
98+
// true, nil -> successfully consumed, task should progress with task creation
99+
func (h *HostResourceManager) consume(taskArn string, resources map[string]*ecs.Resource) (bool, error) {
100+
h.hostResourceManagerRWLock.Lock()
101+
defer h.hostResourceManagerRWLock.Unlock()
102+
defer h.logResources("Consumed resources after task consume call", taskArn)
103+
104+
// Check if already consumed
105+
_, ok := h.taskConsumed[taskArn]
106+
if ok {
107+
// Nothing to do, already consumed, return
108+
logger.Info("Resources pre-consumed, continue to task creation", logger.Fields{"taskArn": taskArn})
109+
return true, nil
110+
}
111+
112+
ok, err := h.consumable(resources)
113+
if err != nil {
114+
logger.Error("Resources failing to consume, error in task resources", logger.Fields{
115+
"taskArn": taskArn,
116+
field.Error: err,
117+
})
118+
return false, err
119+
}
120+
if ok {
121+
for resourceKey := range resources {
122+
if *resources[resourceKey].Type == "INTEGER" {
123+
// CPU, MEMORY, GPU
124+
h.consumeIntType(resourceKey, resources)
125+
} else if *resources[resourceKey].Type == "STRINGSET" {
126+
// PORTS_TCP, PORTS_UDP
127+
h.consumeStringSetType(resourceKey, resources)
128+
}
129+
}
130+
131+
// Set consumed status
132+
h.taskConsumed[taskArn] = true
133+
logger.Info("Resources successfully consumed, continue to task creation", logger.Fields{"taskArn": taskArn})
134+
return true, nil
135+
}
136+
logger.Info("Resources not consumed, enough resources not available", logger.Fields{"taskArn": taskArn})
137+
return false, nil
138+
}
139+
140+
// Functions checkConsumableIntType and checkConsumableStringSetType to be called
141+
// only after checking for resource map health
142+
func (h *HostResourceManager) checkConsumableIntType(resourceName string, resources map[string]*ecs.Resource) bool {
143+
resourceConsumableStatus := *(h.initialHostResource[resourceName].IntegerValue) >= *(h.consumedResource[resourceName].IntegerValue)+*(resources[resourceName].IntegerValue)
144+
return resourceConsumableStatus
145+
}
146+
147+
func (h *HostResourceManager) checkConsumableStringSetType(resourceName string, resources map[string]*ecs.Resource) bool {
148+
resourceSlice := resources[resourceName].StringSetValue
149+
150+
// (optimizization) Get a resource specific map to ease look up
151+
resourceMap := make(map[string]struct{}, len(resourceSlice))
152+
for _, v := range resourceSlice {
153+
resourceMap[*v] = struct{}{}
154+
}
155+
156+
// Check intersection of resource StringSetValue is empty with consumedResource
157+
for _, obj1 := range h.consumedResource[resourceName].StringSetValue {
158+
_, ok := resourceMap[*obj1]
159+
if ok {
160+
// If resource is already reserved by some other task, this 'resources' object can not be consumed
161+
return false
162+
}
163+
}
164+
return true
165+
}
166+
167+
func checkResourceExistsInt(resourceName string, resources map[string]*ecs.Resource) error {
168+
_, ok := resources[resourceName]
169+
if ok {
170+
if resources[resourceName].IntegerValue == nil {
171+
return &ResourceIsNilForTask{resourceName}
172+
}
173+
} else {
174+
return &ResourceNotFoundForTask{resourceName}
175+
}
176+
return nil
177+
}
178+
179+
func checkResourceExistsStringSet(resourceName string, resources map[string]*ecs.Resource) error {
180+
_, ok := resources[resourceName]
181+
if ok {
182+
for _, obj := range resources[resourceName].StringSetValue {
183+
if obj == nil {
184+
return &ResourceIsNilForTask{resourceName}
185+
}
186+
}
187+
} else {
188+
return &ResourceNotFoundForTask{resourceName}
189+
}
190+
return nil
191+
}
192+
193+
// Checks all resources exists and their values are not nil
194+
func (h *HostResourceManager) checkResourcesHealth(resources map[string]*ecs.Resource) error {
195+
for resourceKey, resourceVal := range resources {
196+
_, ok := h.initialHostResource[resourceKey]
197+
if !ok {
198+
logger.Error(fmt.Sprintf("resource %s not found in ", resourceKey))
199+
return &InvalidHostResource{resourceKey}
200+
}
201+
202+
// CPU, MEMORY, GPU are INTEGER;
203+
// PORTS_TCP, PORTS_UDP are STRINGSET
204+
// Check if either of these data types exist
205+
if resourceVal.Type == nil || !(*resourceVal.Type == "INTEGER" || *resourceVal.Type == "STRINGSET") {
206+
logger.Error(fmt.Sprintf("type not assigned for resource %s", resourceKey))
207+
return fmt.Errorf("invalid resource type for %s", resourceKey)
208+
}
209+
210+
// CPU, MEMORY, GPU
211+
if *resourceVal.Type == "INTEGER" {
212+
err := checkResourceExistsInt(resourceKey, resources)
213+
return err
214+
}
215+
216+
// PORTS_TCP, PORTS_UDP
217+
if *resourceVal.Type == "STRINGSET" {
218+
err := checkResourceExistsStringSet(resourceKey, resources)
219+
return err
220+
}
221+
}
222+
return nil
223+
}
224+
225+
// Helper function for consume to check if resources are consumable with the current account
226+
// we have for the host resources. Should not call host resource manager lock in this func
227+
// return values
228+
func (h *HostResourceManager) consumable(resources map[string]*ecs.Resource) (bool, error) {
229+
err := h.checkResourcesHealth(resources)
230+
if err != nil {
231+
return false, err
232+
}
233+
234+
for resourceKey := range resources {
235+
if *resources[resourceKey].Type == "INTEGER" {
236+
consumable := h.checkConsumableIntType(resourceKey, resources)
237+
if !consumable {
238+
return false, nil
239+
}
240+
}
241+
242+
if *resources[resourceKey].Type == "STRINGSET" {
243+
consumable := h.checkConsumableStringSetType(resourceKey, resources)
244+
if !consumable {
245+
return false, nil
246+
}
247+
}
248+
}
249+
250+
return true, nil
251+
}
252+
253+
// Utility function to manage release of ports
254+
// s2 is contiguous sub slice of s1, each is unique (ports)
255+
// returns a slice after removing s2 from s1, if found
256+
func removeSubSlice(s1 []*string, s2 []*string) []*string {
257+
begin := 0
258+
end := len(s1) - 1
259+
for ; begin < len(s1); begin++ {
260+
if *s1[begin] == *s2[0] {
261+
break
262+
}
263+
}
264+
// no intersection found
265+
if begin == len(s1) {
266+
return s1
267+
}
268+
269+
end = begin + len(s2)
270+
newSlice := append(s1[:begin], s1[end:]...)
271+
return newSlice
272+
}
273+
274+
func (h *HostResourceManager) releaseIntType(resourceType string, resources map[string]*ecs.Resource) {
275+
*h.consumedResource[resourceType].IntegerValue -= *resources[resourceType].IntegerValue
276+
}
277+
278+
func (h *HostResourceManager) releaseStringSetType(resourceType string, resources map[string]*ecs.Resource) {
279+
newSlice := removeSubSlice(h.consumedResource[resourceType].StringSetValue, resources[resourceType].StringSetValue)
280+
h.consumedResource[resourceType].StringSetValue = newSlice
281+
}
282+
283+
// Returns error if task resource map has error, else releases resources
284+
// Task resource map should never have errors as it is made by task ToHostResources method
285+
// In cases releases fails due to errors, those resources will be failed to be released
286+
// by HostResourceManager
287+
func (h *HostResourceManager) release(taskArn string, resources map[string]*ecs.Resource) error {
288+
h.hostResourceManagerRWLock.Lock()
289+
defer h.hostResourceManagerRWLock.Unlock()
290+
defer h.logResources("Consumed resources after task release call", taskArn)
291+
292+
if h.taskConsumed[taskArn] {
293+
err := h.checkResourcesHealth(resources)
294+
if err != nil {
295+
return err
296+
}
297+
298+
for resourceKey := range resources {
299+
if *resources[resourceKey].Type == "INTEGER" {
300+
h.releaseIntType(resourceKey, resources)
301+
}
302+
if *resources[resourceKey].Type == "STRINGSET" {
303+
h.releaseStringSetType(resourceKey, resources)
304+
}
305+
}
306+
307+
// Set consumed status
308+
delete(h.taskConsumed, taskArn)
309+
}
310+
return nil
311+
}
312+
33313
// NewHostResourceManager initialize host resource manager with available host resource values
34314
func NewHostResourceManager(resourceMap map[string]*ecs.Resource) HostResourceManager {
35315
// for resources in resourceMap, some are "available resources" like CPU, mem, while
@@ -39,45 +319,45 @@ func NewHostResourceManager(resourceMap map[string]*ecs.Resource) HostResourceMa
39319
// assigns CPU, MEMORY, PORTS_TCP, PORTS_UDP from host
40320
//CPU
41321
CPUs := int64(0)
42-
consumedResourceMap["CPU"] = &ecs.Resource{
43-
Name: utils.Strptr("CPU"),
322+
consumedResourceMap[CPU] = &ecs.Resource{
323+
Name: utils.Strptr(CPU),
44324
Type: utils.Strptr("INTEGER"),
45325
IntegerValue: &CPUs,
46326
}
47327
//MEMORY
48328
memory := int64(0)
49-
consumedResourceMap["MEMORY"] = &ecs.Resource{
50-
Name: utils.Strptr("MEMORY"),
329+
consumedResourceMap[MEMORY] = &ecs.Resource{
330+
Name: utils.Strptr(MEMORY),
51331
Type: utils.Strptr("INTEGER"),
52332
IntegerValue: &memory,
53333
}
54334
//PORTS_TCP
55335
//Copying ports from host resources as consumed ports for initializing
56336
portsTcp := []*string{}
57-
if resourceMap != nil && resourceMap["PORTS_TCP"] != nil {
58-
portsTcp = resourceMap["PORTS_TCP"].StringSetValue
337+
if resourceMap != nil && resourceMap[PORTSTCP] != nil {
338+
portsTcp = resourceMap[PORTSTCP].StringSetValue
59339
}
60-
consumedResourceMap["PORTS_TCP"] = &ecs.Resource{
340+
consumedResourceMap[PORTSTCP] = &ecs.Resource{
61341
Name: utils.Strptr("PORTS_TCP"),
62342
Type: utils.Strptr("STRINGSET"),
63343
StringSetValue: portsTcp,
64344
}
65345

66346
//PORTS_UDP
67347
portsUdp := []*string{}
68-
if resourceMap != nil && resourceMap["PORTS_UDP"] != nil {
69-
portsUdp = resourceMap["PORTS_UDP"].StringSetValue
348+
if resourceMap != nil && resourceMap[PORTSUDP] != nil {
349+
portsUdp = resourceMap[PORTSUDP].StringSetValue
70350
}
71-
consumedResourceMap["PORTS_UDP"] = &ecs.Resource{
72-
Name: utils.Strptr("PORTS_UDP"),
351+
consumedResourceMap[PORTSUDP] = &ecs.Resource{
352+
Name: utils.Strptr(PORTSUDP),
73353
Type: utils.Strptr("STRINGSET"),
74354
StringSetValue: portsUdp,
75355
}
76356

77357
//GPUs
78358
numGPUs := int64(0)
79-
consumedResourceMap["GPU"] = &ecs.Resource{
80-
Name: utils.Strptr("GPU"),
359+
consumedResourceMap[GPU] = &ecs.Resource{
360+
Name: utils.Strptr(GPU),
81361
Type: utils.Strptr("INTEGER"),
82362
IntegerValue: &numGPUs,
83363
}

0 commit comments

Comments
 (0)