Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure task status is reported before cleanup #705

Merged
merged 9 commits into from
Feb 14, 2017
8 changes: 8 additions & 0 deletions agent/engine/docker_image_manager_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestIntegImageCleanupHappyCase(t *testing.T) {

// Verify Task is stopped.
verifyTaskIsStopped(taskEvents, testTask)
testTask.SetSentStatus(api.TaskStopped)

// Allow Task cleanup to occur
time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -230,6 +231,7 @@ func TestIntegImageCleanupThreshold(t *testing.T) {

// Verify Task is stopped
verifyTaskIsStopped(taskEvents, testTask)
testTask.SetSentStatus(api.TaskStopped)

// Allow Task cleanup to occur
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -378,6 +380,9 @@ func TestImageWithSameNameAndDifferentID(t *testing.T) {

// Verify Task is stopped
verifyTaskIsStopped(taskEvents, task1, task2, task3)
task1.SetSentStatus(api.TaskStopped)
task2.SetSentStatus(api.TaskStopped)
task3.SetSentStatus(api.TaskStopped)

// Allow Task cleanup to occur
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -501,6 +506,9 @@ func TestImageWithSameIDAndDifferentNames(t *testing.T) {

// Verify Task is stopped
verifyTaskIsStopped(taskEvents, task1, task2, task3)
task1.SetSentStatus(api.TaskStopped)
task2.SetSentStatus(api.TaskStopped)
task3.SetSentStatus(api.TaskStopped)

// Allow Task cleanup to occur
time.Sleep(2 * time.Second)
Expand Down
2 changes: 2 additions & 0 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestBatchContainerHappyPath(t *testing.T) {
assert.Equal(t, *cont.ExitCode, 0, "Exit code should be present")
}
assert.Equal(t, (<-taskEvents).Status, api.TaskStopped, "Task is not in STOPPED state")
sleepTask.SetSentStatus(api.TaskStopped)

// Extra events should not block forever; duplicate acs and docker events are possible
go func() { eventStream <- createDockerEvent(api.ContainerStopped) }()
Expand Down Expand Up @@ -317,6 +318,7 @@ func TestRemoveEvents(t *testing.T) {
}).Return(nil)

taskEngine.AddTask(sleepTaskStop)
sleepTask.SetSentStatus(api.TaskStopped)
imageManager.EXPECT().RemoveContainerReferenceFromImageState(gomock.Any())
// trigger cleanup
cleanup <- time.Now()
Expand Down
8 changes: 7 additions & 1 deletion agent/engine/engine_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const (
credentialsIDIntegTest = "credsid"
)

func init() {
// Set this very low for integ tests only
_stoppedSentWaitInterval = 1 * time.Second
}

func createTestTask(arn string) *api.Task {
return &api.Task{
Arn: arn,
Expand Down Expand Up @@ -183,8 +188,9 @@ func TestSweepContainer(t *testing.T) {
defer discardEvents(taskEvents)()

// Should be stopped, let's verify it's still listed...
_, ok := taskEngine.(*DockerTaskEngine).State().TaskByArn("testSweepContainer")
task, ok := taskEngine.(*DockerTaskEngine).State().TaskByArn("testSweepContainer")
assert.True(t, ok, "Expected task to be present still, but wasn't")
task.SetSentStatus(api.TaskStopped) // cleanupTask waits for TaskStopped to be sent before cleaning
time.Sleep(1 * time.Minute)
for i := 0; i < 60; i++ {
_, ok = taskEngine.(*DockerTaskEngine).State().TaskByArn("testSweepContainer")
Expand Down
24 changes: 24 additions & 0 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

const (
steadyStateTaskVerifyInterval = 10 * time.Minute
stoppedSentWaitInterval = 30 * time.Second
maxStoppedWaitTimes = 72 * time.Hour / stoppedSentWaitInterval
)

type acsTaskUpdate struct {
Expand Down Expand Up @@ -474,6 +476,9 @@ func (mtask *managedTask) time() ttime.Time {
return mtask._time
}

var _stoppedSentWaitInterval = stoppedSentWaitInterval
var _maxStoppedWaitTimes = int(maxStoppedWaitTimes)

func (mtask *managedTask) cleanupTask(taskStoppedDuration time.Duration) {
cleanupTimeDuration := mtask.GetKnownStatusTime().Add(taskStoppedDuration).Sub(ttime.Now())
// There is a potential deadlock here if cleanupTime is negative. Ignore the computed
Expand All @@ -489,8 +494,27 @@ func (mtask *managedTask) cleanupTask(taskStoppedDuration time.Duration) {
cleanupTimeBool <- true
close(cleanupTimeBool)
}()
// wait for the cleanup time to elapse, signalled by cleanupTimeBool
for !mtask.waitEvent(cleanupTimeBool) {
}
stoppedSentBool := make(chan bool)
go func() {
Copy link
Contributor

@aaithal aaithal Feb 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be broken out into a named method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the new written go routine, shall we start enforcing the rule of 'always pass "Context" to it', so that it can provide simple "cancelation"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no context wired in to most of our codebase as most of it was written before context existed. We could add that to the managedTask but that's outside the scope of my change here.

I don't think that always passing context is a hard rule that we should enforce. I think that whether or not we should use context depends on what the code is doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without context, there is no way to stop this potential "long run" NEW go routine (which could run for at worst 72 hours).

In case if there is another kind of state mismatch between agent and backend, backend thinks this instance is able to launch new task but agent is holding those "long run" cleanup GO routines. Is it possible, that agent could run out of memory ...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant "backend service" keep on starting a new task, and these task get stuck in "cleanup" state for 72 hours..., eventually will agent run out of memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the desired behavior. A successful submission of task state will result in this goroutine exiting. Unsuccessful submissions will delay until success or the timeout or 72 hours, whichever is sooner. There is no use-case to stop the goroutine other than this.

for i := 0; i < _maxStoppedWaitTimes; i++ {
// ensure that we block until api.TaskStopped is actually sent
sentStatus := mtask.GetSentStatus()
if sentStatus >= api.TaskStopped {
stoppedSentBool <- true
close(stoppedSentBool)
return
}
seelog.Warnf("Blocking cleanup for task %v until the task has been reported stopped. SentStatus: %v (%d/%d)", mtask, sentStatus, i, _maxStoppedWaitTimes)
mtask._time.Sleep(_stoppedSentWaitInterval)
}
}()
// wait for api.TaskStopped to be sent
for !mtask.waitEvent(stoppedSentBool) {
}

log.Info("Cleaning up task's containers and data", "task", mtask.Task)

// For the duration of this, simply discard any task events; this ensures the
Expand Down