-
Notifications
You must be signed in to change notification settings - Fork 618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Non essential container state change bug fix #1026
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code flow looks good, some comments below.
agent/eventhandler/task_handler.go
Outdated
event.taskChange.Task.SetSentStatus(event.taskChange.Status) | ||
} | ||
for _, container := range event.taskChange.Containers { | ||
container.Container.SetSentStatus(event.taskChange.Status.ContainerStatus(container.Container.GetSteadyStateStatus())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just container.Status
instead of event.taskChange.Status.ContainerStatus(container.Container.GetSteadyStateStatus())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that we have this in ContainerStateChange
. I've modified this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is still not fixed, did you miss this in the commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems so. Working on it.
return false // redundant event | ||
} | ||
if tevent.Task != nil && tevent.Task.GetSentStatus() >= tevent.Status { | ||
for _, containerStateChange := range tevent.Containers { | ||
container := containerStateChange.Container |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments here to makes it easier to read. And this part should have a unit test to cover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agent/eventhandler/task_handler.go
Outdated
@@ -188,57 +254,14 @@ func (handler *TaskHandler) SubmitTaskEvents(taskEvents *eventList, client api.E | |||
event := eventToSubmit.Value.(*sendableEvent) | |||
|
|||
if event.containerShouldBeSent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need this part of code now, which can be in another pr.
agent/eventhandler/task_handler.go
Outdated
|
||
var events []api.TaskStateChange | ||
for taskARN, _ := range handler.tasksToContainerStates { | ||
if task, ok := handler.state.TaskByArn(taskARN); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if the else
situation will happen, but since you are checking here, we may want to remove it from the handler.tasksToContainerStates
map if it happened. And probably a log will help.
agent/eventhandler/task_handler.go
Outdated
// getBatchedContainerEvents gets a list task state changes for container events that | ||
// have been batched and not sent beyond the drainEventsFrequency threshold | ||
func (handler *TaskHandler) getBatchedContainerEvents() []api.TaskStateChange { | ||
handler.taskHandlerLock.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be a race condition where we are sending in the ticker and received the task state change also, can you move this lock inside the ticker line 110?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. As per offline discussion, I don't think moving the lock itself resolves the race condition. However, moving https://github.com/aws/amazon-ecs-agent/blob/dev/agent/eventhandler/task_handler.go#L114 to https://github.com/aws/amazon-ecs-agent/blob/dev/agent/eventhandler/task_handler.go#L154 does resolve it I think. I'll modify that.
@@ -58,16 +67,73 @@ type TaskHandler struct { | |||
// stateSaver is a statemanager which may be used to save any | |||
// changes to a task or container's SentStatus | |||
stateSaver statemanager.Saver | |||
|
|||
drainEventsFrequency time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this needs gofmt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think so? It is formatted code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I...have no idea what I was looking at. Please ignore.
agent/eventhandler/task_handler.go
Outdated
ticker.Stop() | ||
return | ||
case <-ticker.C: | ||
seelog.Info("TaskHandler tick") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Debug
level? (and potentially line 105 too?) If we keep it at Info
, I think we should make it more descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a miss on my part. It should be removed.
agent/eventhandler/task_handler.go
Outdated
case <-ticker.C: | ||
seelog.Info("TaskHandler tick") | ||
for _, event := range handler.getBatchedContainerEvents() { | ||
seelog.Infof( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above. Should this be Debug
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes sense for this to be Info
as it's a trigger for submitting state changes to the backend and we'd want to know about it without having to elevate log level to debug
agent/eventhandler/task_handler.go
Outdated
|
||
// Extract the wrapped event from the list element | ||
event := eventToSubmit.Value.(*sendableEvent) | ||
seelog.Infof("TaskHandler, Sending %s change: %s", eventType, event.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changed from the prior if blocks from Debug
to Info
. I like this increase in verbosity better than some of the other Info
level additions. However, I might flip the level on to Debug
and log only the success message (line 309) as Info
.
d5c41a2
to
b96d157
Compare
agent/eventhandler/task_handler.go
Outdated
defer handler.lock.RUnlock() | ||
|
||
var events []api.TaskStateChange | ||
for taskARN, _ := range handler.tasksToContainerStates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If only the keys are required, you can simply use: for taskARN := range handler.tasksToContainerStates
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to modify this?
} | ||
go taskHandler.startDrainEventsTicker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we explictly call this when start the eventhandler in agent/app/agent.go
code. Call this in a new function seems confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have this pattern elsewhere as well. An object creation method doing all the things for completing the object creation doesn't seem like an unwanted thing to me. I don't have strong opinions here. I can change it if you really want me to.
agent/eventhandler/task_handler.go
Outdated
@@ -117,142 +198,218 @@ func (handler *TaskHandler) flushBatch(event *api.TaskStateChange) { | |||
// addEvent prepares a given event to be sent by adding it to the handler's appropriate | |||
// eventList and remove the entry in tasksToEvents map | |||
func (handler *TaskHandler) addEvent(change *sendableEvent, client api.ECSClient) { | |||
handler.taskHandlerLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there is another race case here, we may want to move this lock into AddStateChangeEvent. But I need to confirm with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a valid race. Please look at the latest revision where this has been fixed as well.
agent/eventhandler/task_handler.go
Outdated
event.taskChange.Task.SetSentStatus(event.taskChange.Status) | ||
} | ||
for _, container := range event.taskChange.Containers { | ||
container.Container.SetSentStatus(event.taskChange.Status.ContainerStatus(container.Container.GetSteadyStateStatus())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is still not fixed, did you miss this in the commit?
e04e7f9
to
814b098
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as long as all the tests pass. Some minor comments on the structure
agent/eventhandler/task_handler.go
Outdated
defer handler.taskHandlerLock.Unlock() | ||
// startDrainEventsTicker starts a ticker that periodically drains the events queue | ||
// by submitting state change events to the ECS backend | ||
func (handler *TaskHandler) startDrainEventsTicker() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd love to move this function before AddStateChangeEvent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted the exported functions to be at the top of the file as that's a reasonable way to organize the code.
agent/eventhandler/task_handler.go
Outdated
defer handler.lock.RUnlock() | ||
|
||
var events []api.TaskStateChange | ||
for taskARN, _ := range handler.tasksToContainerStates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to modify this?
agent/eventhandler/task_handler.go
Outdated
// sendChange adds the change to the sendable events list. If the | ||
// event for the task is in the process of being sent, it doesn't trigger | ||
// the async submit state api method | ||
func (taskEvents *taskSendableEvents) sendChange(change *sendableEvent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, can you move this above submitTaskEvents
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order is TaskHandler
methods and then taskEvents
methods (exported first, non exported afterwards), which is again an organization structure that we've followed everywhere in the code base
This commit refactors the 'SubmitTaskEvents' method to consolidate the duplicate code in event submission logic into a single method.
Fixed a bug where upon submitting the task state change, the container sent state was not being updated. This would cause spurious container state changes in the submission queue of task handler upon agent restart.
Added a ticker to drain the event queue every minute. If there are any container state changes not submitted, the ticker ensures that they do get submitted.
This commit fixes a bunch of things in the task handler code: 1. Breaking up the really long submitTaskEvents method: The callback for the the backoff has been moved into its own named method from the in-place anonymous method (send()) 2. The backoff callback method now returns an error when there's an error submitting state change to ECS backend. This fixes a bug with the old version of the code where the backoff would never happen on errors because the error was never propagated all the way up 3. The race-condition between startDrainEventsTicker() and AddStateChangeEvent() when both are trying to submit changes to the same task is fixed as the tasksToEvents is used to group events belonging to the same task and only cleaned up when events are submitted instead of being cleaned before invoking the submitTaskEvents() routine
814b098
to
2f3ce92
Compare
There was another race condition where, if an event was already being marked for submission, the batched container changes would be lost if another submission trigger came in for the same task from the ticker. This commit makes it so that the two maps in task handler are updated in a single lock scope to avoid such a race. This commit also refactors the code to get rid of most methods that were not tied to any struct. Instead, each struct has its own methods to deal with sending state changes to ECS
2f3ce92
to
d959f31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code lgtm, thanks for cleaning this up. just few questions for my understanding.
//eventsListLock locks both the list and sending bool | ||
lock sync.Mutex | ||
// createdAt is a timestamp for when the event list was created | ||
createdAt time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a createdAt
field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both createdAt
and taskARN
fields are to help with debugging/logging.
agent/eventhandler/task_handler.go
Outdated
func (handler *TaskHandler) batchContainerEvent(event api.ContainerStateChange) { | ||
handler.taskHandlerLock.Lock() | ||
defer handler.taskHandlerLock.Unlock() | ||
// startDrainEventsTicker starts a ticker that periodically drains the events queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add to comment that intended use is to call during object creation.
agent/eventhandler/task_handler.go
Outdated
func (handler *TaskHandler) flushBatch(event *api.TaskStateChange) { | ||
handler.taskHandlerLock.Lock() | ||
defer handler.taskHandlerLock.Unlock() | ||
// batchContainerEventUnsafe collects container state change events for a given task arn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so i understand - why are we getting rid of the locks in batchContainerEventUnsafe
and flushBatchUnsafe
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're moving it up to AddStateChangeEvent
. More details in the commit message: d959f31
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀
agent/eventhandler/task_handler.go
Outdated
} | ||
// drainEventsFrequency is the frequency at the which unsent events batched | ||
// by the task handler are sent to the backend | ||
drainEventsFrequency = time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we go faster than a minute? A minute seems pretty slow to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per offline conversation, i'll change this to 20s
agent/eventhandler/task_handler.go
Outdated
// sendChange adds the change to the sendable events list. If the | ||
// event for the task is in the process of being sent, it doesn't trigger | ||
// the async submit state api method | ||
func (taskEvents *taskSendableEvents) sendChange(change *sendableEvent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a bit to this comment here explaining that the actual task submission (API request) occurs asynchronously, so that this function doesn't end up causing the lock to be held for a long time?
} | ||
go taskHandler.startDrainEventsTicker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like a comment somewhere explaining the flow for batched events. This is roughly what I got through reading this:
startDrainEventsTicker->
getBatchedContainerEvents->
read from tasksToContainerStates and returns TaskEvent
AddStateChangeEvent->
flushBatchUnsafe
The part that took me a while is that AddStateChangeEvent
also calls batchContainerUnsafe
for ContainerEvent
s and the key difference is that getBatchedContainerEvents
returns a TaskEvent
even though it has "ContainerEvents" in the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename getBatchedContainerEvents
-> taskStateChangesToSend
. Will add some comments in startDrainEventsTicker
as well.
|
||
// Mirror events.sending, but without the need to lock since this is local | ||
// to our goroutine | ||
done := false | ||
|
||
// TODO: wire in the context here. Else, we have go routine leaks in tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this TODO expected to be fixed before merging? goroutine leaks in tests can cause flaky tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not with this PR :(
agent/eventhandler/task_handler.go
Outdated
taskEvents.sending = true | ||
go handler.submitTaskEvents(taskEvents, client, change.taskArn()) | ||
} else { | ||
seelog.Debugf("TaskHandler: Not submitting change as the task is already being sent: %s", change.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think there's a race here, but the impact of the race is probably only a delay of drainEventsFrequency
. If we evaluate taskEvents.sending
as true we won't cause a new goroutine to be created for handler.submitTaskEvents
. However, if the existing goroutine is just about done sending, it's possible it won't pick up the new event that we just added on line 263. I think this is a reasonable tradeoff here, but it would be worth recording it somewhere that the maximum length of time we can have an unsubmitted change is really effectively 2*drainEventsFrequency
.
@samuelkarp can you take a look at the latest commit? I think I've addressed your comments. Thanks! |
Summary
Fixes the bug wherein non-essential container state changes were not being submitted to ECS
Implementation details
Added a ticker to drain the event queue every minute. If
there are any container state changes not submitted,
the ticker ensures that they do get submitted.
Testing
make release
)go build -out amazon-ecs-agent.exe ./agent
)make test
) passgo test -timeout=25s ./agent/...
) passmake run-integ-tests
) pass.\scripts\run-integ-tests.ps1
) passmake run-functional-tests
) pass.\scripts\run-functional-tests.ps1
) passNew tests cover the changes: Yes
Description for the changelog
Will add an entry in CHANGELOG.md
Licensing
This contribution is under the terms of the Apache 2.0 License: Yes