Make event acknowledgment asynchronous in shipper output#32785
Make event acknowledgment asynchronous in shipper output#32785rdner merged 7 commits intoelastic:mainfrom
Conversation
So we can keep publishing batches not blocking on a single batch to be acknowledged.
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
| s.pendingMutex.Lock() | ||
| lastProcessed := 0 | ||
| for _, p := range s.pending { | ||
| if p.serverID != indexReply.Uuid { |
There was a problem hiding this comment.
Going along with the initialization comment above: the cleanup from a mismatched uuid should happen in Publish and/or Close -- the uuid will never change during an active connection, so after the first iteration this would just be caught by the err != nil check above and none of the outstanding batches would be cancelled. (It might also be nice for readability to move this into a standalone cancelAllBatches helper.)
There was a problem hiding this comment.
That makes sense, but in that case can we move the cleanup so it happens in Connect when we set s.serverID? It will still never change over the course of a connection, so this check could be skipped -- keeping one-time initialization separate makes the logic of ackLoop clearer.
There was a problem hiding this comment.
+1 to trying to trying to keep the cleanup in Connect to simplify the rest of the logic.
There was a problem hiding this comment.
It's not going to simplify the rest of the logic, in fact it's going to add one more lock for the pending queue in a different place that can possibly cause a deadlock with the client lock. Also, I would have to copy items in two places. It's more robust to keep it the way it is and I don't see any reason to move it. Unless there is a good argument why the current state of the code does not work as intended I'm going to keep it.
TLDR; I think having only one place where we lock and mutate this pending slice is cleaner and safer. And moving this code gains no obvious benefit.
|
CI failures (metricbeat) are unrelated to changes in the PR. |
| s.pendingMutex.Lock() | ||
| lastProcessed := 0 | ||
| for _, p := range s.pending { | ||
| if p.serverID != indexReply.Uuid { |
There was a problem hiding this comment.
That makes sense, but in that case can we move the cleanup so it happens in Connect when we set s.serverID? It will still never change over the course of a connection, so this check could be skipped -- keeping one-time initialization separate makes the logic of ackLoop clearer.
cmacknz
left a comment
There was a problem hiding this comment.
I spotted one small issue (I think), otherwise LGTM.
…ackaging * upstream/main: (109 commits) Add cap_net_raw requirements to heartbeat docs (elastic#32816) apply a quick hotfix for having main working properly (elastic#32934) action: checks for x-pack/libbeat and libbeat (elastic#32754) Update to Go 1.18 in go.mod. (elastic#32940) [heartbeat] disable browser code on windows via build tags (elastic#32939) action: checks for heartbeat and x-pack/heartbeat (elastic#32749) Make event acknowledgment asynchronous in shipper output (elastic#32785) [Automation] Update elastic stack version to 8.5.0-fedc3e60 for testing (elastic#32930) Preallocate memory to reduce GC load (elastic#32905) [Automation] Update elastic stack version to 8.5.0-440e0896 for testing (elastic#32919) Skip broken ceph tests. (elastic#32912) Use non-deprecated docker image for testing jolokia (elastic#32885) update ironbank image product name (elastic#32867) ci: pre-commit stage within Jenkins (elastic#32839) Fix a couple of bugs in the logic for how AWS metric periods are calculated (elastic#32724) [Filebeat] [httpjson] Add support for single string containing multiple relation-types in getRFC5988Link (elastic#32811) [Heartbeat] Update HB k8s template to use <Mi> metric (elastic#32801) action: checks for metricbeat and x-pack/metricbeat (elastic#32748) action: checks for filebeat and x-pack/filebeat (elastic#32746) allow for json/ndjson content type with charset (elastic#32767) ...
So we can keep publishing batches not blocking on a single batch to be acknowledged. Also updated the config documentation.
What does this PR do?
Implements an asynchronous approach for acknowledgment of event batches replacing the previous blocking approach.
Why is it important?
So, the event pipeline is not blocked because of a single batch and keeps publishing events to the shipper.
Checklist
- [ ] I have made corresponding change to the default configuration files- [ ] I have added an entry inCHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.Related issues