-
Notifications
You must be signed in to change notification settings - Fork 5k
Add support to combine update operations #25976
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
With this change the update operations emited by the filestream input are not directly applied up on ACK, but will be scheduled for application. A backgroun go-routine picks up all updates from all inputs that are or have been active and applies the changes to the registry. This removes the application to the cursor and write to persistent store from the ACK handler path, which when blocked could produce back pressure on the outputs, processors or inputs. When another set of state updates are acked by the output, but the writer is still busy with past updates, then we merge the new and other pending updates into a single update operation. This reduces the amount of in memory and on disk updates we need to execute if the system gets under pressure. With this change backpressure from the registry writes is mostly nullified, which removes the need for a `registry.flush` setting, as we have globally for the logs input. limitations: We combine as many updates as we can. If the registry can't be updates for a long time, there is a chance of publishing duplicates when filebeat is restarted after a crash. This same issue exists with the `filebeat.registry.flush` setting. On clean shutdown we still serialize all pending updates for already ACKed events.
| return nil, err | ||
| } | ||
| resource.pendingCursor = cursor | ||
| resource.pendingUpdate = updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not apply the update directly anymore. Instead we store the most recent delta with the resource. The delta is applied lazily when the current pendingCursor state is required for real. This removes an 'expensive' operation from the hot path.
| // Set overwrites key value pair in the pending update operations. | ||
| func (w *updateWriter) Schedule(op *updateOp, n uint) { | ||
| w.mutex.Lock() | ||
| defer w.mutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schedule is a point of contention here. But it is still better then blocking and waiting for the serialization and IO to be finished.
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
|
/test |
kvch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the general direction of the PR, hence I approved the PR. I am glad loginp.Cursor no longer requires a store instance. \o/
But please provide tests for your changes.
- don't pass complete store, but only channel to acker (only pass required functionality) - separate the update writer into worker and channel to ease testing
|
Pinging @elastic/agent (Team:Agent) |
|
Added tests. PR is now ready for review. |
(cherry picked from commit 1c71c94)
(cherry picked from commit 1c71c94) Co-authored-by: Steffen Siering <[email protected]>
What does this PR do?
With this change the update operations emited by the filestream input
are not directly applied up on ACK, but will be scheduled for
application. A backgroun go-routine picks up all updates from all inputs
that are or have been active and applies the changes to the registry.
This removes the application to the cursor and write to persistent store
from the ACK handler path, which when blocked could produce back
pressure on the outputs, processors or inputs.
When another set of state updates are acked by the output, but the
writer is still busy with past updates, then we merge the new and other
pending updates into a single update operation. This reduces the amount
of in memory and on disk updates we need to execute if the system gets
under pressure.
TODO: performance tests logs input, master, and changes.
Why is it important?
With this change backpressure from the registry writes is mostly
nullified, which removes the need for a
registry.flushsetting, as wehave globally for the logs input.
limitations: We combine as many updates as we can. If the registry can't
be updates for a long time, there is a chance of publishing duplicates
when filebeat is restarted after a crash. This same issue exists with
the
filebeat.registry.flushsetting.On clean shutdown we still serialize all pending updates for already
ACKed events.
Benchmarks
I did run some benchmarks with Filebeat master and this PR using the console output. I'm the complete output, as many subsystem are involved, that can impact performance overall.
Test cases:
exclude_linesdrop_eventprocessorexclude_lines. 100 log files processed concurrently.drop_eventprocessor. 100 log files processed concurrently.Note: For the test cases with events being dropped, the output event rate does not exactly tell us how many events
have been processed in total. We estimate the assumed event rate of the inputs by computing
rate_in = rate_out * 100 / (100 - filtered_percentage).Inputs:
SL:
log:
fs:
fsack
L100:
log
fs:
fsack
EL:
log:
fs:
fsack:
DL:
log:
fs:
fsack:
EL100:
logs
$ ./scripts/run_console.zsh master filebeat -E max_procs=2 -c input/dir_remove.yml CPU=194% User=28,86s Sys=0,40s Total=15,04s MaxRSS=108 InOps=0 OutOps=6848 [12,8k/s] [12,8k/s] rate_in: 256k events/sfs
fsack
DL100:
logs
fs
fsack
=>
In general, the new filestream input seems to be faster in comparison to the logs input.
Especially the exclude_lines configuration is almost twice as fast as the logs input.
The rate achieved by the inputs with exclude_lines only outperforms the
non-filtered case always.
Note: The tests have been run without any real output that can produce backpressure.
The exclude_lines implementation of the logs inputs produce a status update
per dropped event, that still must be processed by the registry. Slow outputs
can still slow down inputs with lines to be filtered.
The filestream input on the other hand just ignores the line without producing
a status update for the registry, which allows it to keep a higher rate in case
the output is unresponsive.
Filtering event via drop_event + when clause is rather costly. In comparison
the exclude_lines a full event must be produces, the event is then
'normalized', which results in a deep copy with many small allocations, the
when clause must be evaluated and finally the event is dropped via ack
handlers. Next the produced but dropped event might eventually be shipped to the registrar
with or without pending ACKs. At worst this can produce an ACK event per
dropped event (very likely at 95% drop rate). By writing an event per dropped event to the
registry file, we produce additional overhead, which can result in backpressure and
slow down publishing of events.
The full processing generates quite some measurable overhead in all setups.
In case of the logs input, the line read rate is less in comparison to simply
publishing all lines. Due to the architecture of the logs input, this can also
lead to a very long startup phase, which is intermixed with event publishing.
The filestream input can better keep the rate here, and with ACK merging
support (fsack tests), the read rate still is higher, then for the non-filtered case.
Checklist
CHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.Author's Checklist
How to test this PR locally
Related issues
Use cases
Screenshots
Logs