Skip to content

Commit

Permalink
Centralize DockerID getting, make it more resilient
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Sep 3, 2020
1 parent 6b32199 commit 5d562ce
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 59 deletions.
2 changes: 1 addition & 1 deletion agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (dg *dockerGoClient) stopContainer(ctx context.Context, dockerID string, ti
err = client.ContainerStop(ctx, dockerID, &timeout)
metadata := dg.containerMetadata(ctx, dockerID)
if err != nil {
seelog.Errorf("DockerGoClient: error stopping container %s: %v", dockerID, err)
seelog.Errorf("DockerGoClient: error stopping container ID=%s: %v", dockerID, err)
if metadata.Error == nil {
if strings.Contains(err.Error(), "No such container") {
err = NoSuchContainerError{dockerID}
Expand Down
103 changes: 45 additions & 58 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,17 +487,12 @@ func (engine *DockerTaskEngine) synchronizeContainerStatus(container *apicontain
// their state to the managed task's container channel.
func (engine *DockerTaskEngine) checkTaskState(task *apitask.Task) {
defer metrics.MetricsEngineGlobal.RecordTaskEngineMetric("CHECK_TASK_STATE")()
taskContainers, ok := engine.state.ContainerMapByArn(task.Arn)
if !ok {
seelog.Warnf("Task engine [%s]: could not check task state; no task in state", task.Arn)
return
}
for _, container := range task.Containers {
dockerContainer, ok := taskContainers[container.Name]
if !ok {
dockerID, err := engine.getDockerID(task, container)
if err != nil {
continue
}
status, metadata := engine.client.DescribeContainer(engine.ctx, dockerContainer.DockerID)
status, metadata := engine.client.DescribeContainer(engine.ctx, dockerID)
engine.tasksLock.RLock()
managedTask, ok := engine.managedTasks[task.Arn]
engine.tasksLock.RUnlock()
Expand Down Expand Up @@ -1136,25 +1131,17 @@ func (engine *DockerTaskEngine) startContainer(task *apitask.Task, container *ap
client = client.WithVersion(dockerclient.DockerVersion(*container.DockerConfig.Version))
}

containerMap, ok := engine.state.ContainerMapByArn(task.Arn)
if !ok {
dockerID, err := engine.getDockerID(task, container)
if err != nil {
return dockerapi.DockerContainerMetadata{
Error: dockerapi.CannotStartContainerError{
FromError: errors.Errorf("Container belongs to unrecognized task %s", task.Arn),
FromError: err,
},
}
}

dockerContainer, ok := containerMap[container.Name]
if !ok {
return dockerapi.DockerContainerMetadata{
Error: dockerapi.CannotStartContainerError{
FromError: errors.Errorf("Container not recorded as created"),
},
}
}
startContainerBegin := time.Now()
dockerContainerMD := client.StartContainer(engine.ctx, dockerContainer.DockerID, engine.cfg.ContainerStartTimeout)
dockerContainerMD := client.StartContainer(engine.ctx, dockerID, engine.cfg.ContainerStartTimeout)

// Get metadata through container inspection and available task information then write this to the metadata file
// Performs this in the background to avoid delaying container start
Expand All @@ -1164,7 +1151,7 @@ func (engine *DockerTaskEngine) startContainer(task *apitask.Task, container *ap
engine.cfg.ContainerMetadataEnabled.Enabled() &&
!container.IsInternal() {
go func() {
err := engine.metadataManager.Update(engine.ctx, dockerContainer.DockerID, task, container.Name)
err := engine.metadataManager.Update(engine.ctx, dockerID, task, container.Name)
if err != nil {
seelog.Warnf("Task engine [%s]: failed to update metadata file for container %s: %v",
task.Arn, container.Name, err)
Expand Down Expand Up @@ -1217,7 +1204,7 @@ func (engine *DockerTaskEngine) startContainer(task *apitask.Task, container *ap
func (engine *DockerTaskEngine) provisionContainerResources(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {
seelog.Infof("Task engine [%s]: setting up container resources for container [%s]",
task.Arn, container.Name)
containerInspectOutput, err := engine.inspectContainerByName(task.Arn, container.Name)
containerInspectOutput, err := engine.inspectContainer(task.Arn, container)
if err != nil {
return dockerapi.DockerContainerMetadata{
Error: ContainerNetworkingError{
Expand Down Expand Up @@ -1268,7 +1255,7 @@ func (engine *DockerTaskEngine) cleanupPauseContainerNetwork(task *apitask.Task,
seelog.Infof("Task engine [%s]: waiting %s before cleaning up pause container.", task.Arn, delay)
engine.handleDelay(delay)
}
containerInspectOutput, err := engine.inspectContainerByName(task.Arn, container.Name)
containerInspectOutput, err := engine.inspectContainer(task.Arn, container)
if err != nil {
return errors.Wrap(err, "engine: cannot cleanup task network namespace due to error inspecting pause container")
}
Expand Down Expand Up @@ -1312,43 +1299,27 @@ func (engine *DockerTaskEngine) buildCNIConfigFromTaskContainer(
return cniConfig, nil
}

func (engine *DockerTaskEngine) inspectContainerByName(taskArn, containerName string) (*types.ContainerJSON, error) {
containers, ok := engine.state.ContainerMapByArn(taskArn)
if !ok {
return nil, errors.New("engine: failed to find the pause container, no containers in the task")
}

pauseContainer, ok := containers[containerName]
if !ok {
return nil, errors.New("engine: failed to find the pause container")
func (engine *DockerTaskEngine) inspectContainer(taskArn, container *apicontainer.Container) (*types.ContainerJSON, error) {
dockerID, err := engine.getDockerID(task, container)
if err != nil {
return nil, err
}
containerInspectOutput, err := engine.client.InspectContainer(
engine.ctx,
pauseContainer.DockerName,
dockerclient.InspectContainerTimeout,
)

return containerInspectOutput, err
return engine.client.InspectContainer(engine.ctx, dockerID, dockerclient.InspectContainerTimeout)
}

func (engine *DockerTaskEngine) stopContainer(task *apitask.Task, container *apicontainer.Container) dockerapi.DockerContainerMetadata {
seelog.Infof("Task engine [%s]: stopping container [%s]", task.Arn, container.Name)
containerMap, ok := engine.state.ContainerMapByArn(task.Arn)
if !ok {

dockerID, err := engine.getDockerID(task, container)
if err != nil {
return dockerapi.DockerContainerMetadata{
Error: dockerapi.CannotStopContainerError{
FromError: errors.Errorf("Container belongs to unrecognized task %s", task.Arn),
FromError: err,
},
}
}

dockerContainer, ok := containerMap[container.Name]
if !ok {
return dockerapi.DockerContainerMetadata{
Error: dockerapi.CannotStopContainerError{FromError: errors.Errorf("Container not recorded as created")},
}
}

// Cleanup the pause container network namespace before stop the container
if container.Type == apicontainer.ContainerCNIPause {
err := engine.cleanupPauseContainerNetwork(task, container)
Expand All @@ -1364,23 +1335,18 @@ func (engine *DockerTaskEngine) stopContainer(task *apitask.Task, container *api
apiTimeoutStopContainer = engine.cfg.DockerStopTimeout
}

return engine.client.StopContainer(engine.ctx, dockerContainer.DockerID, apiTimeoutStopContainer)
return engine.client.StopContainer(engine.ctx, dockerID, apiTimeoutStopContainer)
}

func (engine *DockerTaskEngine) removeContainer(task *apitask.Task, container *apicontainer.Container) error {
seelog.Infof("Task engine [%s]: removing container: %s", task.Arn, container.Name)
containerMap, ok := engine.state.ContainerMapByArn(task.Arn)

if !ok {
return errors.New("No such task: " + task.Arn)
}

dockerContainer, ok := containerMap[container.Name]
if !ok {
return errors.New("No container named '" + container.Name + "' created in " + task.Arn)
dockerID, err := engine.getDockerID(task, container)
if err != nil {
return err
}

return engine.client.RemoveContainer(engine.ctx, dockerContainer.DockerName, dockerclient.RemoveContainerTimeout)
return engine.client.RemoveContainer(engine.ctx, dockerID, dockerclient.RemoveContainerTimeout)
}

// updateTaskUnsafe determines if a new transition needs to be applied to the
Expand Down Expand Up @@ -1497,3 +1463,24 @@ func getContainerHostIP(networkSettings *types.NetworkSettings) (string, bool) {
}
return "", false
}

func (engine *DockerTaskEngine) getDockerID(task *apitask.Task, container *apicontainer.Container) (string, error) {
runtimeID := container.GetRuntimeID()
if runtimeID != "" {
return runtimeID, nil
}
containerMap, ok := engine.state.ContainerMapByArn(task.Arn)
if !ok {
return "", errors.Errorf("Container name=%s belongs to unrecognized task taskArn=%s", container.Name, task.Arn)
}

dockerContainer, ok := containerMap[container.Name]
if !ok {
return "", errors.Errorf("Container name=%s not recognized by agent", container.Name)
}

if dockerContainer.DockerID == "" {
return dockerContainer.DockerName, nil
}
return dockerContainer.DockerID, nil
}

0 comments on commit 5d562ce

Please sign in to comment.