Skip to content

Commit

Permalink
test: add test to for parallizing container progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Peng Yin committed Mar 27, 2018
1 parent bd28b2e commit 2ee395f
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 9 deletions.
144 changes: 143 additions & 1 deletion agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,6 @@ func TestHandleDockerHealthEvent(t *testing.T) {
}

func TestContainerMetadataUpdatedOnRestart(t *testing.T) {

dockerID := "dockerID_created"
labels := map[string]string{
"name": "metadata",
Expand Down Expand Up @@ -1872,3 +1871,146 @@ func TestContainerMetadataUpdatedOnRestart(t *testing.T) {
})
}
}

// TestContainerProgressParallize tests the container can be processed parallelly
func TestContainerProgressParallize(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, testTime, taskEngine, _, imageManager, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

stateChangeEvents := taskEngine.StateChangeEvents()
eventStream := make(chan DockerContainerChangeEvent)
state := taskEngine.(*DockerTaskEngine).State()

fastPullImage := "fast-pull-image"
slowPullImage := "slow-pull-image"

testTask := testdata.LoadTask("sleep5")

containerTwo := &api.Container{
Name: fastPullImage,
Image: fastPullImage,
}

testTask.Containers = append(testTask.Containers, containerTwo)
testTask.Containers[0].Image = slowPullImage
testTask.Containers[0].Name = slowPullImage

var fastContainerDockerName string
var slowContainerDockerName string
fastContainerDockerID := "fast-pull-container-id"
slowContainerDockerID := "slow-pull-container-id"

var waitForFastPullContainer sync.WaitGroup
waitForFastPullContainer.Add(1)

client.EXPECT().Version().Return("17.12.0", nil).AnyTimes()
testTime.EXPECT().Now().Return(time.Now()).AnyTimes()
client.EXPECT().APIVersion().Return(defaultDockerClientAPIVersion, nil).AnyTimes()
imageManager.EXPECT().AddAllImageStates(gomock.Any()).AnyTimes()
imageManager.EXPECT().RecordContainerReference(gomock.Any()).Return(nil).AnyTimes()
imageManager.EXPECT().GetImageStateFromImageName(gomock.Any()).Return(nil).AnyTimes()
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
client.EXPECT().PullImage(fastPullImage, gomock.Any())
client.EXPECT().PullImage(slowPullImage, gomock.Any()).Do(
func(image interface{}, auth interface{}) {
waitForFastPullContainer.Wait()
})
client.EXPECT().CreateContainer(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(cfg interface{}, hostconfig interface{}, name string, duration interface{}) {
if strings.Contains(name, slowPullImage) {
slowContainerDockerName = name
state.AddContainer(&api.DockerContainer{
DockerID: slowContainerDockerID,
DockerName: slowContainerDockerName,
Container: testTask.Containers[0],
}, testTask)
go func() {
event := createDockerEvent(api.ContainerCreated)
event.DockerID = slowContainerDockerID
eventStream <- event
}()
} else if strings.Contains(name, fastPullImage) {
fastContainerDockerName = name
state.AddTask(testTask)
state.AddContainer(&api.DockerContainer{
DockerID: fastContainerDockerID,
DockerName: fastContainerDockerName,
Container: testTask.Containers[1],
}, testTask)
go func() {
event := createDockerEvent(api.ContainerCreated)
event.DockerID = fastContainerDockerID
eventStream <- event
}()
} else {
t.Fatalf("Got unexpected name for creating container: %s", name)
}
}).Times(2)
client.EXPECT().StartContainer(fastContainerDockerID, gomock.Any()).Do(
func(id string, duration interface{}) {
go func() {
event := createDockerEvent(api.ContainerRunning)
event.DockerID = fastContainerDockerID
eventStream <- event
}()
})
client.EXPECT().StartContainer(slowContainerDockerID, gomock.Any()).Do(
func(id string, duration interface{}) {
go func() {
event := createDockerEvent(api.ContainerRunning)
event.DockerID = slowContainerDockerID
eventStream <- event
}()
})

taskEngine.Init(ctx)
taskEngine.AddTask(testTask)

// Expect the fast pulled container to be running firs
fastPullContainerRunning := false
for event := range stateChangeEvents {
containerEvent, ok := event.(api.ContainerStateChange)
if ok && containerEvent.Status == api.ContainerRunning {
if containerEvent.ContainerName == fastPullImage {
fastPullContainerRunning = true
// The second container should start processing now
waitForFastPullContainer.Done()
continue
}
assert.True(t, fastPullContainerRunning, "got the slower pulled container running events first")
continue
}

taskEvent, ok := event.(api.TaskStateChange)
if ok && taskEvent.Status == api.TaskRunning {
break
}
t.Errorf("Got unexpected task event: %v", taskEvent.String())
}
defer discardEvents(stateChangeEvents)()
// stop and clean up the task
cleanup := make(chan time.Time)
client.EXPECT().StopContainer(gomock.Any(), gomock.Any()).Return(
DockerContainerMetadata{DockerID: fastContainerDockerID}).AnyTimes()
client.EXPECT().StopContainer(gomock.Any(), gomock.Any()).Return(
DockerContainerMetadata{DockerID: slowContainerDockerID}).AnyTimes()
testTime.EXPECT().After(gomock.Any()).Return(cleanup).MinTimes(1)
client.EXPECT().RemoveContainer(gomock.Any(), gomock.Any()).Return(nil).Times(2)
imageManager.EXPECT().RemoveContainerReferenceFromImageState(gomock.Any()).Return(nil).Times(2)

containerStoppedEvent := createDockerEvent(api.ContainerStopped)
containerStoppedEvent.DockerID = slowContainerDockerID
eventStream <- containerStoppedEvent

testTask.SetSentStatus(api.TaskStopped)
cleanup <- time.Now()
for {
tasks, _ := taskEngine.(*DockerTaskEngine).ListTasks()
if len(tasks) == 0 {
break
}
time.Sleep(5 * time.Millisecond)
}
}
17 changes: 9 additions & 8 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,6 @@ func (mtask *managedTask) progressContainers() {
// complete, but keep reading events as we do. in fact, we have to for
// transitions to complete
mtask.waitForContainerTransition(transitions, transitionChange, transitionChangeContainer)
seelog.Debugf("Managed task [%s]: wait for container transition done", mtask.Arn)

// update the task status
changed := mtask.UpdateStatus()
Expand Down Expand Up @@ -820,18 +819,20 @@ func (mtask *managedTask) onContainersUnableToTransitionState() {
}

func (mtask *managedTask) waitForContainerTransition(transitions map[string]api.ContainerStatus,
transitionChange <-chan struct{},
transition <-chan struct{},
transitionChangeContainer <-chan string) {
// There could be multiple transitions, but we just need to wait for one of them
// to ensure that there is at least one container can be processed in the next
// progressContainers. This is done by waiting for one transition/acs/docker message.
if mtask.waitEvent(transitionChange) {
changedContainer := <-transitionChangeContainer
seelog.Debugf("Managed task [%s]: transition for container[%s] finished",
mtask.Arn, changedContainer)
delete(transitions, changedContainer)
seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions)
if !mtask.waitEvent(transition) {
seelog.Debugf("Managed task [%s]: received non-transition events", mtask.Arn)
return
}
transitionedContainer := <-transitionChangeContainer
seelog.Debugf("Managed task [%s]: transition for container[%s] finished",
mtask.Arn, transitionedContainer)
delete(transitions, transitionedContainer)
seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions)
}

func (mtask *managedTask) time() ttime.Time {
Expand Down

0 comments on commit 2ee395f

Please sign in to comment.