-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add log throttling per file #2702
Conversation
Remove debug log Signed-off-by: Anthony Comtois <[email protected]>
a4ec3f2
to
8ab733c
Compare
Signed-off-by: Anthony Comtois <[email protected]>
…otify Signed-off-by: Anthony Comtois <[email protected]>
Signed-off-by: Anthony Comtois <[email protected]>
Signed-off-by: Anthony Comtois <[email protected]>
8ab733c
to
416693c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test fails. so could you fix the test first?
@@ -201,7 +204,15 @@ def start | |||
end | |||
|
|||
refresh_watchers unless @skip_refresh_on_startup | |||
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) | |||
|
|||
@threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads. | ||
|
||
@threads.each { |thr| | ||
thr.join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it blocks here, all code after this is blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@threads
is hash. so thr
is Array.
|
||
log.debug "Thread refresh_watchers" | ||
@threads.each { |thr| | ||
log.debug "Thread #{thr[0]} #{thr[1].status}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
@@ -356,6 +392,7 @@ def update_watcher(path, pe) | |||
end | |||
end | |||
rotated_tw = @tails[path] | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary
} | ||
end | ||
|
||
def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true) | ||
paths.each { |path| | ||
tw = remove_watcher ? @tails.delete(path) : @tails[path] | ||
if remove_watcher | ||
@threads[path].exit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread#exit is dangerous. could you finish this thread in a proper way?
if @lines.size >= @watcher.read_lines_limit | ||
|
||
number_bytes_read += bytes_to_read | ||
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0) | |
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second && @watcher.read_bytes_limit_per_second > 0) |
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion | ||
time_spent_reading = Time.new - start_reading | ||
@watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") | ||
if (time_spent_reading < 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (time_spent_reading < 1) | |
if time_spent_reading < 1 |
log.warn "Skip #{path} because unexpected setup error happens: #{e}" | ||
next | ||
begin | ||
tw = setup_watcher(path, pe) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be a race condition. before passing pe
to setup_watcher, L334 should be called. but the current code does not ensure it.
end | ||
end | ||
if @threads[path].nil? | ||
log.debug "Add Thread #{path}" | ||
@threads[path] = Thread.new(path) do |path| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you change these codes to run on new thread?
@fifo.read_lines(@lines) | ||
if @lines.size >= @watcher.read_lines_limit | ||
|
||
number_bytes_read += bytes_to_read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IO#readpartial does not alway read bytes_to_read
bytes. Is this code ok?
Hello, I'm gonna spend more time to add more tests and review your comments. |
Ok. I like this feature :) Creating threads per file is not acceptable. Because if there are 1000 files to monitor, it creates 1000 threads. https://github.com/fluent/fluentd/pull/2702/files#diff-1da710c9dcc8d0fc57996df7a9d39695R331 |
Hi @ganmacs,
That was one of my worries in term of performance, I've been testing with 200 files and a high number of bytes per file, the number of thread hasn't been an issue but I agree having one thread per file is not ideal, I was thinking about using a thread pool but that would need a bigger re-architecture and I'm not sure the inotify will work as expect.
I agree, however we are more interested to throttle log in a Kubernetes environment per container which means per file, to do not affect container which are sending at a decent rate. In the commit, I've been implementing the log throttling per file but this implementation work nicely only with you use the timer because you it will stop reading and if it did not get notified again then some logs might not be read. |
This PR has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this PR will be closed in 30 days |
cosmo0920 is now working on this feature on #3185 with newer in_tail code. |
What this PR does / why we need it:
Running in a big cluster with high volume of log, it would be nice to throttle the log shipping to avoid network saturation and make it easier to calculate the max throughput per node for example in a Kubernetes cluster.
Tail plugin is watching files and every second reading from the last pointer to the end of the file.
This change allow to stop reading the file after X number of logs lines read and update the pointer in the pos file as usual.
This commit adds log throttling per bytes for each files, should work only when
watch_timer
is enabled and thestat_watcher
(inotify) is disabled.In order to have this feature for any watch configuration (timer or inotify..), I've updated to add a sleep when you have been reaching the bytes read limit, the sleep would block process and affect other file ingestion, I've added a basic thread array to have a multi-threading ingestion.
However I've noticed you are relying on
cool.io
, and I was wondering if I should use this library instead.Would you be interested in this feature?
Some discussions before submitting the PR.
Docs Changes:
adding read_lines_limit_per_notify which by default is set to -1, so no throttling involve by default.
Release Note: