Skip to content

Commit

Permalink
eventhandler: Reduce drain events frequency to 20s
Browse files Browse the repository at this point in the history
  • Loading branch information
aaithal committed Oct 27, 2017
1 parent f363f06 commit 4d9cdf2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 31 deletions.
2 changes: 1 addition & 1 deletion agent/eventhandler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestGetBatchedContainerEvents(t *testing.T) {
state.EXPECT().TaskByArn("t1").Return(&api.Task{Arn: "t1", KnownStatusUnsafe: api.TaskRunning}, true)
state.EXPECT().TaskByArn("t2").Return(nil, false)

events := handler.getBatchedContainerEvents()
events := handler.taskStateChangesToSend()
assert.Len(t, events, 1)
assert.Equal(t, "t1", events[0].TaskARN)
}
Expand Down
95 changes: 65 additions & 30 deletions agent/eventhandler/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (

// drainEventsFrequency is the frequency at the which unsent events batched
// by the task handler are sent to the backend
drainEventsFrequency = time.Minute
drainEventsFrequency = 20 * time.Second

submitStateBackoffMin = time.Second
submitStateBackoffMax = 30 * time.Second
Expand Down Expand Up @@ -72,9 +72,12 @@ type TaskHandler struct {

// taskSendableEvents is used to group all events for a task
type taskSendableEvents struct {
// events is a list of *sendableEvents
// events is a list of *sendableEvents. We treat this as queue, where
// new events are added to the back of the queue and old events are
// drained from the front. `sendChange` pushes an event to the back of
// the queue. An event is removed from the queue in `submitFirstEvent`
events *list.List
// sending will check whether the list is already being handlerd
// sending will check whether the list is already being handled
sending bool
//eventsListLock locks both the list and sending bool
lock sync.Mutex
Expand Down Expand Up @@ -105,7 +108,12 @@ func NewTaskHandler(ctx context.Context,
return taskHandler
}

// AddStateChangeEvent queues up a state change for sending using the given client.
// AddStateChangeEvent queues up the state change event to be sent to ECS.
// If the event is for a container state change, it just gets added to the
// handler.tasksToContainerStates map.
// If the event is for task state change, it triggers the non-blocking
// handler.submitTaskEvents method to submit the batched container state
// changes and the task state change to ECS
func (handler *TaskHandler) AddStateChangeEvent(change statechange.Event, client api.ECSClient) error {
handler.lock.Lock()
defer handler.lock.Unlock()
Expand All @@ -116,6 +124,9 @@ func (handler *TaskHandler) AddStateChangeEvent(change statechange.Event, client
if !ok {
return errors.New("eventhandler: unable to get task event from state change event")
}
// Task event: gather all the container events and send them
// to ECS by invoking the async submitTaskEvents method from
// the sendable event list object
handler.flushBatchUnsafe(&event, client)
return nil

Expand Down Expand Up @@ -143,24 +154,35 @@ func (handler *TaskHandler) startDrainEventsTicker() {
ticker.Stop()
return
case <-ticker.C:
for _, event := range handler.getBatchedContainerEvents() {
// Gather a list of task state changes to send. This list is
// constructed from the tasksToEvents map based on the task
// arns of containers that haven't been sent to ECS yet.
for _, taskEvent := range handler.taskStateChangesToSend() {
seelog.Infof(
"TaskHandler: Adding a state change event to send batched container events: %s",
event.String())
handler.AddStateChangeEvent(event, handler.client)
taskEvent.String())
// Force start the the task state change submission
// workflow by calling AddStateChangeEvent method.
handler.AddStateChangeEvent(taskEvent, handler.client)
}
}
}
}

// getBatchedContainerEvents gets a list task state changes for container events that
// taskStateChangesToSend gets a list task state changes for container events that
// have been batched and not sent beyond the drainEventsFrequency threshold
func (handler *TaskHandler) getBatchedContainerEvents() []api.TaskStateChange {
func (handler *TaskHandler) taskStateChangesToSend() []api.TaskStateChange {
handler.lock.RLock()
defer handler.lock.RUnlock()

var events []api.TaskStateChange
for taskARN := range handler.tasksToContainerStates {
// An entry for the task in tasksToContainerStates means that there
// is at least 1 container event for that task that hasn't been sent
// to ECS (has been batched).
// Make sure that the engine's task state knows about this task (as a
// safety mechanism) and add it to the list of task state changes
// that need to be sent to ECS
if task, ok := handler.state.TaskByArn(taskARN); ok {
events = append(events, api.TaskStateChange{
TaskARN: taskARN,
Expand All @@ -178,25 +200,34 @@ func (handler *TaskHandler) batchContainerEventUnsafe(event api.ContainerStateCh
handler.tasksToContainerStates[event.TaskArn] = append(handler.tasksToContainerStates[event.TaskArn], event)
}

// flushBatchUnsafe attaches the task arn's container events to TaskStateChange event that
// is being submittied to the backend and sends it to the backend
func (handler *TaskHandler) flushBatchUnsafe(event *api.TaskStateChange, client api.ECSClient) {
event.Containers = append(event.Containers, handler.tasksToContainerStates[event.TaskARN]...)
delete(handler.tasksToContainerStates, event.TaskARN)

// Prepare a given event to be sent by adding it to the handler's appropriate
// eventList and remove the entry in tasksToEvents map
change := newSendableTaskEvent(*event)
taskEvents := handler.getTaskEventsUnsafe(change)

taskEvents.sendChange(change, client, handler)
// flushBatchUnsafe attaches the task arn's container events to TaskStateChange event
// by creating the sendable event list. It then submits this event to ECS asynchronously
func (handler *TaskHandler) flushBatchUnsafe(taskStateChange *api.TaskStateChange, client api.ECSClient) {
taskStateChange.Containers = append(taskStateChange.Containers,
handler.tasksToContainerStates[taskStateChange.TaskARN]...)
// All container events for the task have now been copied to the
// task state change object. Remove them from the map
delete(handler.tasksToContainerStates, taskStateChange.TaskARN)

// Prepare a given event to be sent by adding it to the handler's
// eventList
event := newSendableTaskEvent(*taskStateChange)
taskEvents := handler.getTaskEventsUnsafe(event)

// Add the event to the sendable events queue for the task and
// start sending it asynchronously if possible
taskEvents.sendChange(event, client, handler)
}

// getTaskEventsUnsafe gets the eventList from taskToEvent map
func (handler *TaskHandler) getTaskEventsUnsafe(change *sendableEvent) *taskSendableEvents {
taskARN := change.taskArn()
// getTaskEventsUnsafe gets the event list for the task arn in the sendableEvent
// from taskToEvent map
func (handler *TaskHandler) getTaskEventsUnsafe(event *sendableEvent) *taskSendableEvents {
taskARN := event.taskArn()
taskEvents, ok := handler.tasksToEvents[taskARN]

if !ok {
// We are not tracking this task arn in the tasksToEvents map. Create
// a new entry
taskEvents = &taskSendableEvents{
events: list.New(),
sending: false,
Expand All @@ -205,7 +236,7 @@ func (handler *TaskHandler) getTaskEventsUnsafe(change *sendableEvent) *taskSend
}
handler.tasksToEvents[taskARN] = taskEvents
seelog.Debugf("TaskHandler: collecting events for new task; event: %s; events: %s ",
change.toString(), taskEvents.toStringUnsafe())
event.toString(), taskEvents.toStringUnsafe())
}

return taskEvents
Expand Down Expand Up @@ -248,25 +279,29 @@ func (handler *TaskHandler) removeTaskEvents(taskARN string) {
delete(handler.tasksToEvents, taskARN)
}

// sendChange adds the change to the sendable events list. If the
// event for the task is in the process of being sent, it doesn't trigger
// the async submit state api method
// sendChange adds the change to the sendable events queue. It triggers
// the handler's submitTaskEvents async method to submit this change if
// there's no go routines already sending changes for this event list
func (taskEvents *taskSendableEvents) sendChange(change *sendableEvent,
client api.ECSClient,
handler *TaskHandler) {

taskEvents.lock.Lock()
defer taskEvents.lock.Unlock()

// Update taskEvent
// Add event to the queue
seelog.Infof("TaskHandler: Adding event: %s", change.toString())
taskEvents.events.PushBack(change)

if !taskEvents.sending {
// If a send event is not already in progress, trigger the
// submitTaskEvents to start sending changes to ECS
taskEvents.sending = true
go handler.submitTaskEvents(taskEvents, client, change.taskArn())
} else {
seelog.Debugf("TaskHandler: Not submitting change as the task is already being sent: %s", change.toString())
seelog.Debugf(
"TaskHandler: Not submitting change as the task is already being sent: %s",
change.toString())
}
}

Expand Down

0 comments on commit 4d9cdf2

Please sign in to comment.