Skip to content

Commit 2f169d1

Browse files
author
Peng Yin
committed
engine: make the container progress concurrently
Previously the waitForContainerTransition will wait for all the transitions to be done before the container can move to next state. This PR changes the waitForContainerTransition to only wait for no more than one transition to be done so that the first container that done the transition doesn't need to wait for other containers.
1 parent a61fee7 commit 2f169d1

File tree

6 files changed

+95
-45
lines changed

6 files changed

+95
-45
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 1.17.3-dev
44
* Bug - Fixed a bug where a stale websocket connection could linger [#1310](https://github.com/aws/amazon-ecs-agent/pull/1310)
5+
* Enhancement - Parallize the container transition in the same task [#1305](https://github.com/aws/amazon-ecs-agent/pull/1306)
56

67
## 1.17.2
78
* Enhancement - Update the `amazon-ecs-cni-plugins` to `2018.02.0` [#1272](https://github.com/aws/amazon-ecs-agent/pull/1272)

agent/api/container.go

+41-2
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,9 @@ type Container struct {
153153

154154
// AppliedStatus is the status that has been "applied" (e.g., we've called Pull,
155155
// Create, Start, or Stop) but we don't yet know that the application was successful.
156-
AppliedStatus ContainerStatus
156+
// No need to save it in the state file, as agent will synchronize the container status
157+
// on restart and for some operation eg: pull, it has to be recalled again.
158+
AppliedStatus ContainerStatus `json:"-"`
157159
// ApplyingError is an error that occurred trying to transition the container
158160
// to its desired state. It is propagated to the backend in the form
159161
// 'Name: ErrorString' as the 'reason' field.
@@ -241,12 +243,14 @@ func (c *Container) GetKnownStatus() ContainerStatus {
241243
return c.KnownStatusUnsafe
242244
}
243245

244-
// SetKnownStatus sets the known status of the container
246+
// SetKnownStatus sets the known status of the container and update the container
247+
// applied status
245248
func (c *Container) SetKnownStatus(status ContainerStatus) {
246249
c.lock.Lock()
247250
defer c.lock.Unlock()
248251

249252
c.KnownStatusUnsafe = status
253+
c.updateAppliedStatusUnsafe(status)
250254
}
251255

252256
// GetDesiredStatus gets the desired status of the container
@@ -548,3 +552,38 @@ func (c *Container) GetHealthStatus() HealthStatus {
548552

549553
return copyHealth
550554
}
555+
556+
// updateAppliedStatusUnsafe updates the container transitioning status
557+
func (c *Container) updateAppliedStatusUnsafe(knownStatus ContainerStatus) {
558+
if c.AppliedStatus == ContainerStatusNone {
559+
return
560+
}
561+
562+
// Check if the container transition has already finished
563+
if c.AppliedStatus <= knownStatus {
564+
c.AppliedStatus = ContainerStatusNone
565+
}
566+
}
567+
568+
// SetAppliedStatus sets the applied status of container and returns whether
569+
// the container is already in a transition
570+
func (c *Container) SetAppliedStatus(status ContainerStatus) bool {
571+
c.lock.Lock()
572+
defer c.lock.Unlock()
573+
574+
if c.AppliedStatus != ContainerStatusNone {
575+
// return false to indicate the set operation failed
576+
return false
577+
}
578+
579+
c.AppliedStatus = status
580+
return true
581+
}
582+
583+
// GetAppliedStatus returns the transitioning status of container
584+
func (c *Container) GetAppliedStatus() ContainerStatus {
585+
c.lock.RLock()
586+
defer c.lock.RUnlock()
587+
588+
return c.AppliedStatus
589+
}

agent/engine/engine_unix_integ_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -603,17 +603,25 @@ func TestVolumesFromRO(t *testing.T) {
603603
}
604604
testTask.Containers[1].VolumesFrom = []api.VolumeFrom{{SourceContainer: testTask.Containers[0].Name, ReadOnly: true}}
605605
testTask.Containers[1].Command = []string{"touch /data/readonly-fs || exit 42"}
606+
// make all the three containers non-essential to make sure all of the
607+
// container can be transitioned to running even one of them finished first
608+
testTask.Containers[1].Essential = false
606609
testTask.Containers[2].VolumesFrom = []api.VolumeFrom{{SourceContainer: testTask.Containers[0].Name}}
607610
testTask.Containers[2].Command = []string{"touch /data/notreadonly-fs-1 || exit 42"}
611+
testTask.Containers[2].Essential = false
608612
testTask.Containers[3].VolumesFrom = []api.VolumeFrom{{SourceContainer: testTask.Containers[0].Name, ReadOnly: false}}
609613
testTask.Containers[3].Command = []string{"touch /data/notreadonly-fs-2 || exit 42"}
614+
testTask.Containers[3].Essential = false
610615

611616
go taskEngine.AddTask(testTask)
612617

618+
verifyTaskIsRunning(stateChangeEvents, testTask)
619+
taskEngine.(*DockerTaskEngine).stopContainer(testTask, testTask.Containers[0])
620+
613621
verifyTaskIsStopped(stateChangeEvents, testTask)
614622

615623
if testTask.Containers[1].GetKnownExitCode() == nil || *testTask.Containers[1].GetKnownExitCode() != 42 {
616-
t.Error("Didn't exit due to failure to touch ro fs as expected: ", *testTask.Containers[1].GetKnownExitCode())
624+
t.Error("Didn't exit due to failure to touch ro fs as expected: ", testTask.Containers[1].GetKnownExitCode())
617625
}
618626
if testTask.Containers[2].GetKnownExitCode() == nil || *testTask.Containers[2].GetKnownExitCode() != 0 {
619627
t.Error("Couldn't touch with default of rw")

agent/engine/task_manager.go

+33-35
Original file line numberDiff line numberDiff line change
@@ -618,8 +618,8 @@ func (mtask *managedTask) progressContainers() {
618618
// We've kicked off one or more transitions, wait for them to
619619
// complete, but keep reading events as we do. in fact, we have to for
620620
// transitions to complete
621-
mtask.waitForContainerTransitions(transitions, transitionChange, transitionChangeContainer)
622-
seelog.Debugf("Managed task [%s]: done transitioning all containers", mtask.Arn)
621+
mtask.waitForContainerTransition(transitions, transitionChange, transitionChangeContainer)
622+
seelog.Debugf("Managed task [%s]: wait for container transition done", mtask.Arn)
623623

624624
// update the task status
625625
changed := mtask.UpdateStatus()
@@ -663,16 +663,30 @@ func (mtask *managedTask) startContainerTransitions(transitionFunc containerTran
663663
reasons = append(reasons, transition.reason)
664664
continue
665665
}
666+
667+
// If the container is already in a transition, skip
668+
if transition.actionRequired && !cont.SetAppliedStatus(transition.nextState) {
669+
// At least one container is able to be moved forwards, so we're not deadlocked
670+
anyCanTransition = true
671+
continue
672+
}
673+
666674
// At least one container is able to be moved forwards, so we're not deadlocked
667675
anyCanTransition = true
668676

669677
if !transition.actionRequired {
670-
mtask.handleContainerChange(dockerContainerChange{
671-
container: cont,
672-
event: DockerContainerChangeEvent{
673-
Status: transition.nextState,
674-
},
675-
})
678+
// Updating the container status without calling any docker API, send in
679+
// a goroutine so that it won't block here before the waitForContainerTransition
680+
// was called after this function. And all the events sent to mtask.dockerMessages
681+
// will be handled by handleContainerChange.
682+
go func(cont *api.Container, status api.ContainerStatus) {
683+
mtask.dockerMessages <- dockerContainerChange{
684+
container: cont,
685+
event: DockerContainerChangeEvent{
686+
Status: status,
687+
},
688+
}
689+
}(cont, transition.nextState)
676690
continue
677691
}
678692
transitions[cont.Name] = transition.nextState
@@ -763,39 +777,23 @@ func (mtask *managedTask) onContainersUnableToTransitionState() {
763777
mtask.emitTaskEvent(mtask.Task, taskUnableToTransitionToStoppedReason)
764778
// TODO we should probably panic here
765779
} else {
766-
seelog.Criticalf("Managed task [%s]: voving task to stopped due to bad state", mtask.Arn)
780+
seelog.Criticalf("Managed task [%s]: moving task to stopped due to bad state", mtask.Arn)
767781
mtask.handleDesiredStatusChange(api.TaskStopped, 0)
768782
}
769783
}
770784

771-
func (mtask *managedTask) waitForContainerTransitions(transitions map[string]api.ContainerStatus,
785+
func (mtask *managedTask) waitForContainerTransition(transitions map[string]api.ContainerStatus,
772786
transitionChange <-chan struct{},
773787
transitionChangeContainer <-chan string) {
774-
775-
for len(transitions) > 0 {
776-
if mtask.waitEvent(transitionChange) {
777-
changedContainer := <-transitionChangeContainer
778-
seelog.Debugf("Managed task [%s]: transition for container[%s] finished",
779-
mtask.Arn, changedContainer)
780-
delete(transitions, changedContainer)
781-
seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions)
782-
}
783-
if mtask.GetDesiredStatus().Terminal() || mtask.GetKnownStatus().Terminal() {
784-
allWaitingOnPulled := true
785-
for _, desired := range transitions {
786-
if desired != api.ContainerPulled {
787-
allWaitingOnPulled = false
788-
}
789-
}
790-
if allWaitingOnPulled {
791-
// We don't actually care to wait for 'pull' transitions to finish if
792-
// we're just heading to stopped since those resources aren't
793-
// inherently linked to this task anyways for e.g. gc and so on.
794-
seelog.Debugf("Managed task [%s]: all containers are waiting for pulled transition; exiting early: %v",
795-
mtask.Arn, transitions)
796-
break
797-
}
798-
}
788+
// There could be multiple transitions, but we just need to wait for one of them
789+
// to ensure that there is at least one container can be processed in the next
790+
// progressContainers. This is done by waiting for one transition/acs/docker message.
791+
if mtask.waitEvent(transitionChange) {
792+
changedContainer := <-transitionChangeContainer
793+
seelog.Debugf("Managed task [%s]: transition for container[%s] finished",
794+
mtask.Arn, changedContainer)
795+
delete(transitions, changedContainer)
796+
seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions)
799797
}
800798
}
801799

agent/engine/task_manager_test.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) {
693693
},
694694
stateChangeEvents: stateChangeEvents,
695695
containerChangeEventStream: containerChangeEventStream,
696+
dockerMessages: make(chan dockerContainerChange),
696697
}
697698

698699
eventsGenerated := sync.WaitGroup{}
@@ -716,6 +717,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) {
716717
eventsGenerated.Done()
717718
}()
718719

720+
go task.waitEvent(nil)
719721
canTransition, transitions, _ := task.startContainerTransitions(
720722
func(cont *api.Container, nextStatus api.ContainerStatus) {
721723
t.Error("Invalid code path. The transition function should not be invoked when transitioning container from CREATED -> STOPPED")
@@ -747,7 +749,7 @@ func TestWaitForContainerTransitionsForNonTerminalTask(t *testing.T) {
747749

748750
// populate the transitions map with transitions for two
749751
// containers. We expect two sets of events to be consumed
750-
// by `waitForContainerTransitions`
752+
// by `waitForContainerTransition`
751753
transitions := make(map[string]api.ContainerStatus)
752754
transitions[firstContainerName] = api.ContainerRunning
753755
transitions[secondContainerName] = api.ContainerRunning
@@ -762,13 +764,13 @@ func TestWaitForContainerTransitionsForNonTerminalTask(t *testing.T) {
762764
transitionChangeContainer <- firstContainerName
763765
}()
764766

765-
// waitForContainerTransitions will block until it receives events
767+
// waitForContainerTransition will block until it receives events
766768
// sent by the go routine defined above
767-
task.waitForContainerTransitions(transitions, transitionChange, transitionChangeContainer)
769+
task.waitForContainerTransition(transitions, transitionChange, transitionChangeContainer)
768770
}
769771

770772
// TestWaitForContainerTransitionsForTerminalTask verifies that the
771-
// `waitForContainerTransitions` method doesn't wait for any container
773+
// `waitForContainerTransition` method doesn't wait for any container
772774
// transitions when the task's desired status is STOPPED and if all
773775
// containers in the task are in PULLED state
774776
func TestWaitForContainerTransitionsForTerminalTask(t *testing.T) {
@@ -796,13 +798,13 @@ func TestWaitForContainerTransitionsForTerminalTask(t *testing.T) {
796798
transitions[secondContainerName] = api.ContainerPulled
797799

798800
// Event though there are two keys in the transitions map, send
799-
// only one event. This tests that `waitForContainerTransitions` doesn't
801+
// only one event. This tests that `waitForContainerTransition` doesn't
800802
// block to receive two events and will still progress
801803
go func() {
802804
transitionChange <- struct{}{}
803805
transitionChangeContainer <- secondContainerName
804806
}()
805-
task.waitForContainerTransitions(transitions, transitionChange, transitionChangeContainer)
807+
task.waitForContainerTransition(transitions, transitionChange, transitionChangeContainer)
806808
}
807809

808810
func TestOnContainersUnableToTransitionStateForDesiredStoppedTask(t *testing.T) {

agent/statemanager/state_manager.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ const (
5959
// d) Added task cgroup related fields ('CPU', 'Memory', 'MemoryCPULimitsEnabled') to 'api.Task'
6060
// 9) Add 'ipToTask' map to state file
6161
// 10) Add 'healthCheckType' field in 'api.Container'
62-
// 11) Add 'PrivateDNSName' field to 'api.ENI'
62+
// 11)
63+
// a) Add 'PrivateDNSName' field to 'api.ENI'
64+
// b)Remove `AppliedStatus` field form 'api.Container'
6365
ECSDataVersion = 11
6466

6567
// ecsDataFile specifies the filename in the ECS_DATADIR

0 commit comments

Comments
 (0)