Skip to content

Commit 45d7b5a

Browse files
Yiyuanzzzprateekchaudhry
authored andcommitted
host resource manager initialization
1 parent 9d35c53 commit 45d7b5a

23 files changed

+352
-64
lines changed

agent/acs/update_handler/updater_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) {
128128
Reason: ptr("Updates are disabled").(*string),
129129
}})
130130

131-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
131+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
132132
msg := &ecsacs.PerformUpdateMessage{
133133
ClusterArn: ptr("cluster").(*string),
134134
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) {
182182

183183
require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")
184184

185-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
185+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
186186
msg := &ecsacs.PerformUpdateMessage{
187187
ClusterArn: ptr("cluster").(*string),
188188
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) {
250250
MessageId: ptr("mid").(*string),
251251
}})
252252

253-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
253+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
254254
msg := &ecsacs.PerformUpdateMessage{
255255
ClusterArn: ptr("cluster").(*string),
256256
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) {
308308

309309
require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")
310310

311-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
311+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
312312
msg := &ecsacs.PerformUpdateMessage{
313313
ClusterArn: ptr("cluster").(*string),
314314
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) {
377377

378378
require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")
379379

380-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
380+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
381381
msg := &ecsacs.PerformUpdateMessage{
382382
ClusterArn: ptr("cluster").(*string),
383383
ContainerInstanceArn: ptr("containerInstance").(*string),
@@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) {
448448

449449
require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written")
450450

451-
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
451+
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
452452
msg := &ecsacs.PerformUpdateMessage{
453453
ClusterArn: ptr("cluster").(*string),
454454
ContainerInstanceArn: ptr("containerInstance").(*string),

agent/api/ecsclient/client.go

+14
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,20 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) {
306306
return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil
307307
}
308308

309+
// GetHostResources calling getHostResources to get a list of CPU, MEMORY, PORTS and PORTS_UPD resources
310+
// and return a resourceMap that map the resource name to each resource
311+
func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) {
312+
resources, err := client.getResources()
313+
if err != nil {
314+
return nil, err
315+
}
316+
resourceMap := make(map[string]*ecs.Resource)
317+
for _, resource := range resources {
318+
resourceMap[*resource.Name] = resource
319+
}
320+
return resourceMap, nil
321+
}
322+
309323
func getCpuAndMemory() (int64, int64) {
310324
memInfo, err := system.ReadMemInfo()
311325
mem := int64(0)

agent/api/interface.go

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type ECSClient interface {
5757
// UpdateContainerInstancesState updates the given container Instance ID with
5858
// the given status. Only valid statuses are ACTIVE and DRAINING.
5959
UpdateContainerInstancesState(instanceARN, status string) error
60+
// GetHostResources retrieves a map that map the resource name to the corresponding resource
61+
GetHostResources() (map[string]*ecs.Resource, error)
6062
}
6163

6264
// ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS

agent/api/mocks/api_mocks.go

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/app/agent.go

+24-4
Original file line numberDiff line numberDiff line change
@@ -306,17 +306,36 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
306306
return exitcodes.ExitTerminal
307307
}
308308
}
309+
hostResources, err := client.GetHostResources()
310+
if err != nil {
311+
seelog.Critical("Unable to fetch host resources")
312+
return exitcodes.ExitError
313+
}
314+
numGPUs := int64(0)
309315
if agent.cfg.GPUSupportEnabled {
310316
err := agent.initializeGPUManager()
311317
if err != nil {
312318
seelog.Criticalf("Could not initialize Nvidia GPU Manager: %v", err)
313319
return exitcodes.ExitError
314320
}
321+
// Find number of GPUs instance has
322+
platformDevices := agent.getPlatformDevices()
323+
for _, device := range platformDevices {
324+
if *device.Type == ecs.PlatformDeviceTypeGpu {
325+
numGPUs++
326+
}
327+
}
328+
}
329+
330+
hostResources["GPU"] = &ecs.Resource{
331+
Name: utils.Strptr("GPU"),
332+
Type: utils.Strptr("INTEGER"),
333+
IntegerValue: &numGPUs,
315334
}
316335

317336
// Create the task engine
318337
taskEngine, currentEC2InstanceID, err := agent.newTaskEngine(
319-
containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, agent.serviceconnectManager)
338+
containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, agent.serviceconnectManager)
320339
if err != nil {
321340
seelog.Criticalf("Unable to initialize new task engine: %v", err)
322341
return exitcodes.ExitTerminal
@@ -523,6 +542,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
523542
credentialsManager credentials.Manager,
524543
state dockerstate.TaskEngineState,
525544
imageManager engine.ImageManager,
545+
hostResources map[string]*ecs.Resource,
526546
execCmdMgr execcmd.Manager,
527547
serviceConnectManager engineserviceconnect.Manager) (engine.TaskEngine, string, error) {
528548

@@ -531,11 +551,11 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
531551
if !agent.cfg.Checkpoint.Enabled() {
532552
seelog.Info("Checkpointing not enabled; a new container instance will be created each time the agent is run")
533553
return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager,
534-
containerChangeEventStream, imageManager, state,
554+
containerChangeEventStream, imageManager, hostResources, state,
535555
agent.metadataManager, agent.resourceFields, execCmdMgr, serviceConnectManager), "", nil
536556
}
537557

538-
savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager)
558+
savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager)
539559
if err != nil {
540560
seelog.Criticalf("Error loading previously saved state: %v", err)
541561
return nil, "", err
@@ -560,7 +580,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
560580
state.Reset()
561581
// Reset taskEngine; all the other values are still default
562582
return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager,
563-
containerChangeEventStream, imageManager, state, agent.metadataManager,
583+
containerChangeEventStream, imageManager, hostResources, state, agent.metadataManager,
564584
agent.resourceFields, execCmdMgr, serviceConnectManager), currentEC2InstanceID, nil
565585
}
566586

agent/app/agent_compatibility_linux_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ func TestCompatibilityEnabledSuccess(t *testing.T) {
6565
defer cancel()
6666

6767
containerChangeEventStream := eventstream.NewEventStream("events", ctx)
68-
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
68+
hostResources := getTestHostResources()
69+
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)
6970

7071
assert.NoError(t, err)
7172
assert.True(t, cfg.TaskCPUMemLimit.Enabled())
@@ -106,7 +107,8 @@ func TestCompatibilityNotSetFail(t *testing.T) {
106107
defer cancel()
107108

108109
containerChangeEventStream := eventstream.NewEventStream("events", ctx)
109-
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
110+
hostResources := getTestHostResources()
111+
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)
110112

111113
assert.NoError(t, err)
112114
assert.False(t, cfg.TaskCPUMemLimit.Enabled())
@@ -146,7 +148,8 @@ func TestCompatibilityExplicitlyEnabledFail(t *testing.T) {
146148
defer cancel()
147149

148150
containerChangeEventStream := eventstream.NewEventStream("events", ctx)
149-
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
151+
hostResources := getTestHostResources()
152+
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)
150153

151154
assert.Error(t, err)
152155
}

0 commit comments

Comments
 (0)