Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
eventhandler: fix bugs and clean up task handler
Browse files Browse the repository at this point in the history
This commit fixes a bunch of things in the task handler code:
1. Breaking up the really long submitTaskEvents method: The callback
for the the backoff has been moved into its own named method from the
in-place anonymous method (send())
2. The backoff callback method now returns an error when there's an
error submitting state change to ECS backend. This fixes a bug with
the old version of the code where the backoff would never happen on
errors because the error was never propagated all the way up
3. The race-condition between startDrainEventsTicker() and
AddStateChangeEvent() when both are trying to submit changes to the
same task is fixed as the tasksToEvents is used to group events
belonging to the same task and only cleaned up when events are submitted
instead of being cleaned before invoking the submitTaskEvents() routine
aaithal committed Oct 24, 2017
1 parent b5b7001 commit d5c41a2
Showing 2 changed files with 306 additions and 87 deletions.
170 changes: 164 additions & 6 deletions agent/eventhandler/handler_test.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/utils/mocks"
"github.com/golang/mock/gomock"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
@@ -291,7 +292,14 @@ func TestCleanupTaskEventAfterSubmit(t *testing.T) {
handler.AddStateChangeEvent(taskEvent3, client)

wg.Wait()
assert.Len(t, handler.tasksToEvents, 0)

// Wait for task events to be removed from the tasksToEvents map
for {
if handler.getTasksToEventsLen() == 0 {
break
}
time.Sleep(time.Millisecond)
}
}

func containerEvent(arn string) statechange.Event {
@@ -316,11 +324,11 @@ func TestENISentStatusChange(t *testing.T) {
client := mock_api.NewMockECSClient(ctrl)

task := &api.Task{
Arn: "taskarn",
Arn: taskARN,
}

eniAttachment := &api.ENIAttachment{
TaskARN: "taskarn",
TaskARN: taskARN,
AttachStatusSent: false,
ExpiresAt: time.Now().Add(time.Second),
}
@@ -331,7 +339,7 @@ func TestENISentStatusChange(t *testing.T) {

sendableTaskEvent := newSendableTaskEvent(api.TaskStateChange{
Attachment: eniAttachment,
TaskARN: "taskarn",
TaskARN: taskARN,
Status: api.TaskStatusNone,
Task: task,
})
@@ -343,9 +351,9 @@ func TestENISentStatusChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
handler := NewTaskHandler(ctx, statemanager.NewNoopStateManager(), nil, client)
defer cancel()
handler.SubmitTaskEvents(&eventList{
handler.submitTaskEvents(&eventList{
events: events,
}, client)
}, client, taskARN)

assert.True(t, eniAttachment.AttachStatusSent)
}
@@ -371,3 +379,153 @@ func TestGetBatchedContainerEvents(t *testing.T) {
assert.Len(t, events, 1)
assert.Equal(t, "t1", events[0].TaskARN)
}

func TestSenderWhenSendingTaskRunningAfterStopped(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

state := mock_dockerstate.NewMockTaskEngineState(ctrl)
client := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()

handler := &TaskHandler{
state: state,
submitSemaphore: utils.NewSemaphore(concurrentEventCalls),
tasksToEvents: make(map[string]*eventList),
tasksToContainerStates: make(map[string][]api.ContainerStateChange),
stateSaver: stateManager,
client: client,
}

taskEvents := &eventList{events: list.New(),
sending: false,
createdAt: time.Now(),
taskARN: taskARN,
}

backoff := mock_utils.NewMockBackoff(ctrl)
sender := &taskEventsSender{
handler: handler,
backoff: backoff,
client: client,
taskEvents: taskEvents,
}
ok, err := sender.send()
assert.True(t, ok)
assert.NoError(t, err)

task := &api.Task{}
sender.taskEvents.events.PushBack(newSendableTaskEvent(api.TaskStateChange{
TaskARN: taskARN,
Status: api.TaskStopped,
Task: task,
}))
sender.taskEvents.events.PushBack(newSendableTaskEvent(api.TaskStateChange{
TaskARN: taskARN,
Status: api.TaskRunning,
Task: task,
}))
handler.tasksToEvents[taskARN] = sender.taskEvents

var wg sync.WaitGroup
wg.Add(1)
gomock.InOrder(
client.EXPECT().SubmitTaskStateChange(gomock.Any()).Do(func(change api.TaskStateChange) {
assert.Equal(t, api.TaskStopped, change.Status)
}),
backoff.EXPECT().Reset().Do(func() {
wg.Done()
}),
)
ok, err = sender.send()
// We have an unsent event for the TaskRunning transition. Hence, send() returns false
assert.False(t, ok)
assert.NoError(t, err)
wg.Wait()

// The unsent transition is deleted from the task list. send() returns true as it
// does not have any more events to process
ok, err = sender.send()
assert.NoError(t, err)
assert.True(t, ok)
}

func TestSenderWhenSendingTaskStoppedAfterRunning(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

state := mock_dockerstate.NewMockTaskEngineState(ctrl)
client := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()

handler := &TaskHandler{
state: state,
submitSemaphore: utils.NewSemaphore(concurrentEventCalls),
tasksToEvents: make(map[string]*eventList),
tasksToContainerStates: make(map[string][]api.ContainerStateChange),
stateSaver: stateManager,
client: client,
}

taskEvents := &eventList{events: list.New(),
sending: false,
createdAt: time.Now(),
taskARN: taskARN,
}

backoff := mock_utils.NewMockBackoff(ctrl)
sender := &taskEventsSender{
handler: handler,
backoff: backoff,
client: client,
taskEvents: taskEvents,
}
ok, err := sender.send()
assert.True(t, ok)
assert.NoError(t, err)

task := &api.Task{}
sender.taskEvents.events.PushBack(newSendableTaskEvent(api.TaskStateChange{
TaskARN: taskARN,
Status: api.TaskRunning,
Task: task,
}))
sender.taskEvents.events.PushBack(newSendableTaskEvent(api.TaskStateChange{
TaskARN: taskARN,
Status: api.TaskStopped,
Task: task,
}))
handler.tasksToEvents[taskARN] = sender.taskEvents

var wg sync.WaitGroup
wg.Add(1)
gomock.InOrder(
client.EXPECT().SubmitTaskStateChange(gomock.Any()).Do(func(change api.TaskStateChange) {
assert.Equal(t, api.TaskRunning, change.Status)
}),
backoff.EXPECT().Reset().Do(func() {
wg.Done()
}),
)
// We have an unsent event for the TaskStopped transition. Hence, send() returns false
ok, err = sender.send()
assert.False(t, ok)
assert.NoError(t, err)
wg.Wait()

wg.Add(1)
gomock.InOrder(
client.EXPECT().SubmitTaskStateChange(gomock.Any()).Do(func(change api.TaskStateChange) {
assert.Equal(t, api.TaskStopped, change.Status)
}),
backoff.EXPECT().Reset().Do(func() {
wg.Done()
}),
)
// The unsent transition is send and deleted from the task list. send() returns true as it
// does not have any more events to process
ok, err = sender.send()
assert.True(t, ok)
assert.NoError(t, err)
wg.Wait()
}
223 changes: 142 additions & 81 deletions agent/eventhandler/task_handler.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"container/list"
"context"
"errors"
"fmt"
"sync"
"time"

@@ -36,16 +37,12 @@ const (
// drainEventsFrequency is the frequency at the which unsent events batched
// by the task handler are sent to the backend
drainEventsFrequency = time.Minute
)

type eventList struct {
// events is a list of *sendableEvents
events *list.List
// sending will check whether the list is already being handlerd
sending bool
//eventsListLock locks both the list and sending bool
eventListLock sync.Mutex
}
submitStateBackoffMin = time.Second
submitStateBackoffMax = 30 * time.Second
submitStateBackoffJitterMultiple = 0.20
submitStateBackoffMultiple = 1.3
)

// TaskHandler encapsulates the the map of a task arn to task and container events
// associated with said task
@@ -62,7 +59,7 @@ type TaskHandler struct {
// taskHandlerLock is used to safely access the following maps:
// * taskToEvents
// * tasksToContainerStates
taskHandlerLock sync.RWMutex
lock sync.RWMutex

// stateSaver is a statemanager which may be used to save any
// changes to a task or container's SentStatus
@@ -74,6 +71,25 @@ type TaskHandler struct {
ctx context.Context
}

// eventList is used to group all events for a task
type eventList struct {
// events is a list of *sendableEvents
events *list.List
// sending will check whether the list is already being handlerd
sending bool
//eventsListLock locks both the list and sending bool
lock sync.Mutex
// createdAt is a timestamp for when the event list was created
createdAt time.Time
// taskARN is the task arn that the event list is associated
taskARN string
}

func (events *eventList) toStringUnsafe() string {
return fmt.Sprintf("Task event list [taskARN: %s, sending: %t, createdAt: %s]",
events.taskARN, events.sending, events.createdAt.String())
}

// NewTaskHandler returns a pointer to TaskHandler
func NewTaskHandler(ctx context.Context,
stateManager statemanager.Saver,
@@ -120,8 +136,8 @@ func (handler *TaskHandler) startDrainEventsTicker() {
// getBatchedContainerEvents gets a list task state changes for container events that
// have been batched and not sent beyond the drainEventsFrequency threshold
func (handler *TaskHandler) getBatchedContainerEvents() []api.TaskStateChange {
handler.taskHandlerLock.RLock()
defer handler.taskHandlerLock.RUnlock()
handler.lock.RLock()
defer handler.lock.RUnlock()

var events []api.TaskStateChange
for taskARN, _ := range handler.tasksToContainerStates {
@@ -163,18 +179,18 @@ func (handler *TaskHandler) AddStateChangeEvent(change statechange.Event, client

// batchContainerEvent collects container state change events for a given task arn
func (handler *TaskHandler) batchContainerEvent(event api.ContainerStateChange) {
handler.taskHandlerLock.Lock()
defer handler.taskHandlerLock.Unlock()
handler.lock.Lock()
defer handler.lock.Unlock()

seelog.Infof("TaskHandler, batching container event: %s", event.String())
seelog.Infof("TaskHandler: batching container event: %s", event.String())
handler.tasksToContainerStates[event.TaskArn] = append(handler.tasksToContainerStates[event.TaskArn], event)
}

// flushBatch attaches the task arn's container events to TaskStateChange event that
// is being submittied to the backend
func (handler *TaskHandler) flushBatch(event *api.TaskStateChange) {
handler.taskHandlerLock.Lock()
defer handler.taskHandlerLock.Unlock()
handler.lock.Lock()
defer handler.lock.Unlock()

event.Containers = append(event.Containers, handler.tasksToContainerStates[event.TaskARN]...)
delete(handler.tasksToContainerStates, event.TaskARN)
@@ -183,101 +199,146 @@ func (handler *TaskHandler) flushBatch(event *api.TaskStateChange) {
// addEvent prepares a given event to be sent by adding it to the handler's appropriate
// eventList and remove the entry in tasksToEvents map
func (handler *TaskHandler) addEvent(change *sendableEvent, client api.ECSClient) {
handler.taskHandlerLock.Lock()
defer handler.taskHandlerLock.Unlock()
seelog.Infof("TaskHandler, Adding event: %s", change.String())

taskEvents := handler.getTaskEventList(change)

taskEvents.eventListLock.Lock()
defer taskEvents.eventListLock.Unlock()
taskEvents.lock.Lock()
defer taskEvents.lock.Unlock()

// Update taskEvent
seelog.Infof("TaskHandler: Adding event: %s", change.String())
taskEvents.events.PushBack(change)

if !taskEvents.sending {
taskEvents.sending = true
go handler.SubmitTaskEvents(taskEvents, client)
go handler.submitTaskEvents(taskEvents, client, change.taskArn())
}

delete(handler.tasksToEvents, change.taskArn())
}

// getTaskEventList gets the eventList from taskToEvent map
func (handler *TaskHandler) getTaskEventList(change *sendableEvent) (taskEvents *eventList) {
taskEvents, ok := handler.tasksToEvents[change.taskArn()]
handler.lock.Lock()
defer handler.lock.Unlock()

taskARN := change.taskArn()
taskEvents, ok := handler.tasksToEvents[taskARN]
if !ok {
seelog.Debug("TaskHandler, collecting events for new task ", change)
taskEvents = &eventList{events: list.New(), sending: false}
handler.tasksToEvents[change.taskArn()] = taskEvents
taskEvents = &eventList{events: list.New(),
sending: false,
createdAt: time.Now(),
taskARN: taskARN,
}
handler.tasksToEvents[taskARN] = taskEvents
seelog.Debugf("TaskHandler: collecting events for new task; event: %s; events: %s ",
change.String(), taskEvents.toStringUnsafe())
}

return taskEvents
}

// Continuously retries sending an event until it succeeds, sleeping between each
// attempt
func (handler *TaskHandler) SubmitTaskEvents(taskEvents *eventList, client api.ECSClient) {
backoff := utils.NewSimpleBackoff(1*time.Second, 30*time.Second, 0.20, 1.3)
func (handler *TaskHandler) submitTaskEvents(taskEvents *eventList, client api.ECSClient, taskARN string) {
defer handler.removeTaskEvents(taskARN)

backoff := utils.NewSimpleBackoff(submitStateBackoffMin, submitStateBackoffMax,
submitStateBackoffJitterMultiple, submitStateBackoffMultiple)

// Mirror events.sending, but without the need to lock since this is local
// to our goroutine
done := false
sender := &taskEventsSender{
handler: handler,
taskEvents: taskEvents,
backoff: backoff,
client: client,
}

// TODO: wire in the context here. Else, we have go routine leaks in tests
for !done {
// If we looped back up here, we successfully submitted an event, but
// we haven't emptied the list so we should keep submitting
backoff.Reset()
utils.RetryWithBackoff(backoff, func() error {
// Lock and unlock within this function, allowing the list to be added
// to while we're not actively sending an event
seelog.Debug("TaskHandler, Waiting on semaphore to send...")
handler.submitSemaphore.Wait()
defer handler.submitSemaphore.Post()
var err error
done, err = sender.send()
return err
})
}
}

seelog.Debug("TaskHandler, Aquiring lock for sending event...")
taskEvents.eventListLock.Lock()
defer taskEvents.eventListLock.Unlock()
seelog.Debug("TaskHandler, Aquired lock!")
func (handler *TaskHandler) removeTaskEvents(taskARN string) {
handler.lock.Lock()
defer handler.lock.Unlock()

var err error
delete(handler.tasksToEvents, taskARN)
}

if taskEvents.events.Len() == 0 {
seelog.Debug("TaskHandler, No events left; not retrying more")
func (handler *TaskHandler) getTasksToEventsLen() int {
handler.lock.RLock()
defer handler.lock.RUnlock()

taskEvents.sending = false
done = true
return nil
}
return len(handler.tasksToEvents)
}

eventToSubmit := taskEvents.events.Front()
event := eventToSubmit.Value.(*sendableEvent)

if event.containerShouldBeSent() {
sendEvent(sendContainerStatusToECS, setContainerChangeSent, "container", client,
eventToSubmit, handler.stateSaver, backoff, taskEvents)
} else if event.taskShouldBeSent() {
sendEvent(sendTaskStatusToECS, setTaskChangeSent, "task", client,
eventToSubmit, handler.stateSaver, backoff, taskEvents)
} else if event.taskAttachmentShouldBeSent() {
sendEvent(sendTaskStatusToECS, setTaskAttachmentSent, "task attachment", client,
eventToSubmit, handler.stateSaver, backoff, taskEvents)
} else {
// Shouldn't be sent as either a task or container change event; must have been already sent
seelog.Infof("TaskHandler, Not submitting redundant event; just removing: %s", event.String())
taskEvents.events.Remove(eventToSubmit)
}
type taskEventsSender struct {
handler *TaskHandler
taskEvents *eventList
backoff utils.Backoff
client api.ECSClient
}

if taskEvents.events.Len() == 0 {
seelog.Debug("TaskHandler, Removed the last element, no longer sending")
taskEvents.sending = false
done = true
return nil
}
func (sender *taskEventsSender) send() (bool, error) {
// Lock and unlock within this function, allowing the list to be added
// to while we're not actively sending an event
seelog.Debug("TaskHandler: Waiting on semaphore to send events...")

handler := sender.handler
handler.submitSemaphore.Wait()
defer handler.submitSemaphore.Post()

seelog.Debug("TaskHandler: Aquiring lock for sending event...")
taskEvents := sender.taskEvents
taskEvents.lock.Lock()
defer taskEvents.lock.Unlock()
seelog.Debugf("TaskHandler: Aquired lock, processing event list: : %s", taskEvents.toStringUnsafe())

if taskEvents.events.Len() == 0 {
seelog.Debug("TaskHandler: No events left; not retrying more")
taskEvents.sending = false
return true, nil
}

return err
})
eventToSubmit := taskEvents.events.Front()
event := eventToSubmit.Value.(*sendableEvent)

if event.containerShouldBeSent() {
if err := sendEvent(sendContainerStatusToECS, setContainerChangeSent, "container",
sender.client, eventToSubmit, handler.stateSaver, sender.backoff, taskEvents); err != nil {
return false, err
}
} else if event.taskShouldBeSent() {
if err := sendEvent(sendTaskStatusToECS, setTaskChangeSent, "task", sender.client,
eventToSubmit, handler.stateSaver, sender.backoff, taskEvents); err != nil {
return false, err
}
} else if event.taskAttachmentShouldBeSent() {
if err := sendEvent(sendTaskStatusToECS, setTaskAttachmentSent, "task attachment",
sender.client, eventToSubmit, handler.stateSaver, sender.backoff, taskEvents); err != nil {
return false, err
}
} else {
// Shouldn't be sent as either a task or container change event; must have been already sent
seelog.Infof("TaskHandler: Not submitting redundant event; just removing: %s", event.String())
taskEvents.events.Remove(eventToSubmit)
}

if taskEvents.events.Len() == 0 {
seelog.Debug("TaskHandler: Removed the last element, no longer sending")
taskEvents.sending = false
return true, nil
}

return false, nil
}

// sendEvent tries to send an event, specified by 'eventToSubmit', of type
@@ -288,27 +349,27 @@ func sendEvent(sendStatusToECS sendStatusChangeToECS,
client api.ECSClient,
eventToSubmit *list.Element,
stateSaver statemanager.Saver,
backoff *utils.SimpleBackoff,
taskEvents *eventList) {

backoff utils.Backoff,
taskEvents *eventList) error {
// Extract the wrapped event from the list element
event := eventToSubmit.Value.(*sendableEvent)
seelog.Infof("TaskHandler, Sending %s change: %s", eventType, event.String())
seelog.Infof("TaskHandler: Sending %s change: %s", eventType, event.String())
// Try submitting the change to ECS
if err := sendStatusToECS(client, event); err != nil {
seelog.Errorf("TaskHandler, Unretriable error submitting %s state change [%s]: %v",
seelog.Errorf("TaskHandler: Unretriable error submitting %s state change [%s]: %v",
eventType, event.String(), err)
return
return err
}
// submitted; ensure we don't retry it
event.setSent()
// Mark event as sent
setChangeSent(event)
// Update the state file
stateSaver.Save()
seelog.Debugf("TaskHandler, Submitted container state change: %s", event.String())
backoff.Reset()
seelog.Debugf("TaskHandler: Submitted container state change: %s", event.String())
taskEvents.events.Remove(eventToSubmit)
backoff.Reset()
return nil
}

// sendStatusChangeToECS defines a function type for invoking the appropriate ECS state change API

0 comments on commit d5c41a2

Please sign in to comment.