Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the progresscontainer independent of each other #1306

Merged
merged 3 commits into from
Mar 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.17.3-dev
* Bug - Fixed a bug where a stale websocket connection could linger [#1310](https://github.com/aws/amazon-ecs-agent/pull/1310)
* Enhancement - Parallize the container transition in the same task [#1305](https://github.com/aws/amazon-ecs-agent/pull/1306)

## 1.17.2
* Enhancement - Update the `amazon-ecs-cni-plugins` to `2018.02.0` [#1272](https://github.com/aws/amazon-ecs-agent/pull/1272)
Expand Down
43 changes: 41 additions & 2 deletions agent/api/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ type Container struct {

// AppliedStatus is the status that has been "applied" (e.g., we've called Pull,
// Create, Start, or Stop) but we don't yet know that the application was successful.
AppliedStatus ContainerStatus
// No need to save it in the state file, as agent will synchronize the container status
// on restart and for some operation eg: pull, it has to be recalled again.
AppliedStatus ContainerStatus `json:"-"`
// ApplyingError is an error that occurred trying to transition the container
// to its desired state. It is propagated to the backend in the form
// 'Name: ErrorString' as the 'reason' field.
Expand Down Expand Up @@ -241,12 +243,14 @@ func (c *Container) GetKnownStatus() ContainerStatus {
return c.KnownStatusUnsafe
}

// SetKnownStatus sets the known status of the container
// SetKnownStatus sets the known status of the container and update the container
// applied status
func (c *Container) SetKnownStatus(status ContainerStatus) {
c.lock.Lock()
defer c.lock.Unlock()

c.KnownStatusUnsafe = status
c.updateAppliedStatusUnsafe(status)
}

// GetDesiredStatus gets the desired status of the container
Expand Down Expand Up @@ -548,3 +552,38 @@ func (c *Container) GetHealthStatus() HealthStatus {

return copyHealth
}

// updateAppliedStatusUnsafe updates the container transitioning status
func (c *Container) updateAppliedStatusUnsafe(knownStatus ContainerStatus) {
if c.AppliedStatus == ContainerStatusNone {
return
}

// Check if the container transition has already finished
if c.AppliedStatus <= knownStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the condition c.AppliedStatus > knownStatus ever be true here? We set the knownStatus only when a transition is complete, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the agent received duplicate docker events for some reason, this could happen. As the event can be a past status of the container.

c.AppliedStatus = ContainerStatusNone
}
}

// SetAppliedStatus sets the applied status of container and returns whether
// the container is already in a transition
func (c *Container) SetAppliedStatus(status ContainerStatus) bool {
c.lock.Lock()
defer c.lock.Unlock()

if c.AppliedStatus != ContainerStatusNone {
// return false to indicate the set operation failed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not clear to me why returning false here will indicate the set operation failed. do you mean the SetKnownStatus and then updateAppliedStatusUnsafe path has failed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, failed means the container is already in a transition(where the appliedstatus isn't none). This ensures that agent won't call the same API(pull/create/start/stop) twice for the same container.

return false
}

c.AppliedStatus = status
return true
}

// GetAppliedStatus returns the transitioning status of container
func (c *Container) GetAppliedStatus() ContainerStatus {
c.lock.RLock()
defer c.lock.RUnlock()

return c.AppliedStatus
}
148 changes: 145 additions & 3 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
sleepTask.Containers[0].TransitionDependencySet.ContainerDependencies = []api.ContainerDependency{
{
ContainerName: "pause",
SatisfiedStatus: api.ContainerRunning,
SatisfiedStatus: api.ContainerResourcesProvisioned,
DependentStatus: api.ContainerPulled,
}}
sleepContainer := sleepTask.Containers[0]
Expand Down Expand Up @@ -432,6 +432,7 @@ func TestRemoveEvents(t *testing.T) {
client.EXPECT().Version()
}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().StopContainer(gomock.Any(), gomock.Any()).AnyTimes()
containerName := make(chan string)
go func() {
name := <-containerName
Expand Down Expand Up @@ -477,7 +478,6 @@ func TestRemoveEvents(t *testing.T) {
eventStream <- createDockerEvent(api.ContainerStopped)
}).Return(nil)

client.EXPECT().StopContainer(gomock.Any(), gomock.Any()).AnyTimes()
imageManager.EXPECT().RemoveContainerReferenceFromImageState(gomock.Any())

// This ensures that managedTask.waitForStopReported makes progress
Expand Down Expand Up @@ -1750,7 +1750,6 @@ func TestHandleDockerHealthEvent(t *testing.T) {
}

func TestContainerMetadataUpdatedOnRestart(t *testing.T) {

dockerID := "dockerID_created"
labels := map[string]string{
"name": "metadata",
Expand Down Expand Up @@ -1871,3 +1870,146 @@ func TestContainerMetadataUpdatedOnRestart(t *testing.T) {
})
}
}

// TestContainerProgressParallize tests the container can be processed parallelly
func TestContainerProgressParallize(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, testTime, taskEngine, _, imageManager, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

stateChangeEvents := taskEngine.StateChangeEvents()
eventStream := make(chan DockerContainerChangeEvent)
state := taskEngine.(*DockerTaskEngine).State()

fastPullImage := "fast-pull-image"
slowPullImage := "slow-pull-image"

testTask := testdata.LoadTask("sleep5")

containerTwo := &api.Container{
Name: fastPullImage,
Image: fastPullImage,
}

testTask.Containers = append(testTask.Containers, containerTwo)
testTask.Containers[0].Image = slowPullImage
testTask.Containers[0].Name = slowPullImage

var fastContainerDockerName string
var slowContainerDockerName string
fastContainerDockerID := "fast-pull-container-id"
slowContainerDockerID := "slow-pull-container-id"

var waitForFastPullContainer sync.WaitGroup
waitForFastPullContainer.Add(1)

client.EXPECT().Version().Return("17.12.0", nil).AnyTimes()
testTime.EXPECT().Now().Return(time.Now()).AnyTimes()
client.EXPECT().APIVersion().Return(defaultDockerClientAPIVersion, nil).AnyTimes()
imageManager.EXPECT().AddAllImageStates(gomock.Any()).AnyTimes()
imageManager.EXPECT().RecordContainerReference(gomock.Any()).Return(nil).AnyTimes()
imageManager.EXPECT().GetImageStateFromImageName(gomock.Any()).Return(nil).AnyTimes()
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().PullImage(fastPullImage, gomock.Any())
client.EXPECT().PullImage(slowPullImage, gomock.Any()).Do(
func(image interface{}, auth interface{}) {
waitForFastPullContainer.Wait()
})
client.EXPECT().CreateContainer(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(cfg interface{}, hostconfig interface{}, name string, duration interface{}) {
if strings.Contains(name, slowPullImage) {
slowContainerDockerName = name
state.AddContainer(&api.DockerContainer{
DockerID: slowContainerDockerID,
DockerName: slowContainerDockerName,
Container: testTask.Containers[0],
}, testTask)
go func() {
event := createDockerEvent(api.ContainerCreated)
event.DockerID = slowContainerDockerID
eventStream <- event
}()
} else if strings.Contains(name, fastPullImage) {
fastContainerDockerName = name
state.AddTask(testTask)
state.AddContainer(&api.DockerContainer{
DockerID: fastContainerDockerID,
DockerName: fastContainerDockerName,
Container: testTask.Containers[1],
}, testTask)
go func() {
event := createDockerEvent(api.ContainerCreated)
event.DockerID = fastContainerDockerID
eventStream <- event
}()
} else {
t.Fatalf("Got unexpected name for creating container: %s", name)
}
}).Times(2)
client.EXPECT().StartContainer(fastContainerDockerID, gomock.Any()).Do(
func(id string, duration interface{}) {
go func() {
event := createDockerEvent(api.ContainerRunning)
event.DockerID = fastContainerDockerID
eventStream <- event
}()
})
client.EXPECT().StartContainer(slowContainerDockerID, gomock.Any()).Do(
func(id string, duration interface{}) {
go func() {
event := createDockerEvent(api.ContainerRunning)
event.DockerID = slowContainerDockerID
eventStream <- event
}()
})

taskEngine.Init(ctx)
taskEngine.AddTask(testTask)

// Expect the fast pulled container to be running firs
fastPullContainerRunning := false
for event := range stateChangeEvents {
containerEvent, ok := event.(api.ContainerStateChange)
if ok && containerEvent.Status == api.ContainerRunning {
if containerEvent.ContainerName == fastPullImage {
fastPullContainerRunning = true
// The second container should start processing now
waitForFastPullContainer.Done()
continue
}
assert.True(t, fastPullContainerRunning, "got the slower pulled container running events first")
continue
}

taskEvent, ok := event.(api.TaskStateChange)
if ok && taskEvent.Status == api.TaskRunning {
break
}
t.Errorf("Got unexpected task event: %v", taskEvent.String())
}
defer discardEvents(stateChangeEvents)()
// stop and clean up the task
cleanup := make(chan time.Time)
client.EXPECT().StopContainer(gomock.Any(), gomock.Any()).Return(
DockerContainerMetadata{DockerID: fastContainerDockerID}).AnyTimes()
client.EXPECT().StopContainer(gomock.Any(), gomock.Any()).Return(
DockerContainerMetadata{DockerID: slowContainerDockerID}).AnyTimes()
testTime.EXPECT().After(gomock.Any()).Return(cleanup).MinTimes(1)
client.EXPECT().RemoveContainer(gomock.Any(), gomock.Any()).Return(nil).Times(2)
imageManager.EXPECT().RemoveContainerReferenceFromImageState(gomock.Any()).Return(nil).Times(2)

containerStoppedEvent := createDockerEvent(api.ContainerStopped)
containerStoppedEvent.DockerID = slowContainerDockerID
eventStream <- containerStoppedEvent

testTask.SetSentStatus(api.TaskStopped)
cleanup <- time.Now()
for {
tasks, _ := taskEngine.(*DockerTaskEngine).ListTasks()
if len(tasks) == 0 {
break
}
time.Sleep(5 * time.Millisecond)
}
}
10 changes: 9 additions & 1 deletion agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,17 +603,25 @@ func TestVolumesFromRO(t *testing.T) {
}
testTask.Containers[1].VolumesFrom = []api.VolumeFrom{{SourceContainer: testTask.Containers[0].Name, ReadOnly: true}}
testTask.Containers[1].Command = []string{"touch /data/readonly-fs || exit 42"}
// make all the three containers non-essential to make sure all of the
// container can be transitioned to running even one of them finished first
testTask.Containers[1].Essential = false
testTask.Containers[2].VolumesFrom = []api.VolumeFrom{{SourceContainer: testTask.Containers[0].Name}}
testTask.Containers[2].Command = []string{"touch /data/notreadonly-fs-1 || exit 42"}
testTask.Containers[2].Essential = false
testTask.Containers[3].VolumesFrom = []api.VolumeFrom{{SourceContainer: testTask.Containers[0].Name, ReadOnly: false}}
testTask.Containers[3].Command = []string{"touch /data/notreadonly-fs-2 || exit 42"}
testTask.Containers[3].Essential = false

go taskEngine.AddTask(testTask)

verifyTaskIsRunning(stateChangeEvents, testTask)
taskEngine.(*DockerTaskEngine).stopContainer(testTask, testTask.Containers[0])

verifyTaskIsStopped(stateChangeEvents, testTask)

if testTask.Containers[1].GetKnownExitCode() == nil || *testTask.Containers[1].GetKnownExitCode() != 42 {
t.Error("Didn't exit due to failure to touch ro fs as expected: ", *testTask.Containers[1].GetKnownExitCode())
t.Error("Didn't exit due to failure to touch ro fs as expected: ", testTask.Containers[1].GetKnownExitCode())
}
if testTask.Containers[2].GetKnownExitCode() == nil || *testTask.Containers[2].GetKnownExitCode() != 0 {
t.Error("Couldn't touch with default of rw")
Expand Down
71 changes: 35 additions & 36 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,7 @@ func (mtask *managedTask) progressContainers() {
// We've kicked off one or more transitions, wait for them to
// complete, but keep reading events as we do. in fact, we have to for
// transitions to complete
mtask.waitForContainerTransitions(transitions, transitionChange, transitionChangeContainer)
seelog.Debugf("Managed task [%s]: done transitioning all containers", mtask.Arn)
mtask.waitForContainerTransition(transitions, transitionChange, transitionChangeContainer)

// update the task status
changed := mtask.UpdateStatus()
Expand Down Expand Up @@ -663,16 +662,30 @@ func (mtask *managedTask) startContainerTransitions(transitionFunc containerTran
reasons = append(reasons, transition.reason)
continue
}

// If the container is already in a transition, skip
if transition.actionRequired && !cont.SetAppliedStatus(transition.nextState) {
// At least one container is able to be moved forwards, so we're not deadlocked
anyCanTransition = true
continue
}

// At least one container is able to be moved forwards, so we're not deadlocked
anyCanTransition = true

if !transition.actionRequired {
mtask.handleContainerChange(dockerContainerChange{
container: cont,
event: DockerContainerChangeEvent{
Status: transition.nextState,
},
})
// Updating the container status without calling any docker API, send in
// a goroutine so that it won't block here before the waitForContainerTransition
// was called after this function. And all the events sent to mtask.dockerMessages
// will be handled by handleContainerChange.
go func(cont *api.Container, status api.ContainerStatus) {
mtask.dockerMessages <- dockerContainerChange{
container: cont,
event: DockerContainerChangeEvent{
Status: status,
},
}
}(cont, transition.nextState)
continue
}
transitions[cont.Name] = transition.nextState
Expand Down Expand Up @@ -763,40 +776,26 @@ func (mtask *managedTask) onContainersUnableToTransitionState() {
mtask.emitTaskEvent(mtask.Task, taskUnableToTransitionToStoppedReason)
// TODO we should probably panic here
} else {
seelog.Criticalf("Managed task [%s]: voving task to stopped due to bad state", mtask.Arn)
seelog.Criticalf("Managed task [%s]: moving task to stopped due to bad state", mtask.Arn)
mtask.handleDesiredStatusChange(api.TaskStopped, 0)
}
}

func (mtask *managedTask) waitForContainerTransitions(transitions map[string]api.ContainerStatus,
transitionChange <-chan struct{},
func (mtask *managedTask) waitForContainerTransition(transitions map[string]api.ContainerStatus,
transition <-chan struct{},
transitionChangeContainer <-chan string) {

for len(transitions) > 0 {
if mtask.waitEvent(transitionChange) {
changedContainer := <-transitionChangeContainer
seelog.Debugf("Managed task [%s]: transition for container[%s] finished",
mtask.Arn, changedContainer)
delete(transitions, changedContainer)
seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions)
}
if mtask.GetDesiredStatus().Terminal() || mtask.GetKnownStatus().Terminal() {
allWaitingOnPulled := true
for _, desired := range transitions {
if desired != api.ContainerPulled {
allWaitingOnPulled = false
}
}
if allWaitingOnPulled {
Copy link
Contributor

@sharanyad sharanyad Mar 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this condition handled in the new change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In previously the pull will block the whole transitions, that's why we want to break the wait if allWaitingOnPulled. Where the new change makes it non-blocked, which is handled by default.

// We don't actually care to wait for 'pull' transitions to finish if
// we're just heading to stopped since those resources aren't
// inherently linked to this task anyways for e.g. gc and so on.
seelog.Debugf("Managed task [%s]: all containers are waiting for pulled transition; exiting early: %v",
mtask.Arn, transitions)
break
}
}
// There could be multiple transitions, but we just need to wait for one of them
// to ensure that there is at least one container can be processed in the next
// progressContainers. This is done by waiting for one transition/acs/docker message.
if !mtask.waitEvent(transition) {
seelog.Debugf("Managed task [%s]: received non-transition events", mtask.Arn)
return
}
transitionedContainer := <-transitionChangeContainer
seelog.Debugf("Managed task [%s]: transition for container[%s] finished",
mtask.Arn, transitionedContainer)
delete(transitions, transitionedContainer)
seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions)
}

func (mtask *managedTask) time() ttime.Time {
Expand Down
Loading