Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log throttling per file #2702

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 80 additions & 21 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def to_s
def initialize
super
@paths = []
@threads = {}
@tails = {}
@pf_file = nil
@pf = nil
Expand All @@ -74,6 +75,8 @@ def initialize
config_param :refresh_interval, :time, default: 60
desc 'The number of reading lines at each IO.'
config_param :read_lines_limit, :integer, default: 1000
desc 'The number of reading bytes per second'
config_param :read_bytes_limit_per_second, :integer, default: -1
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
Expand Down Expand Up @@ -201,7 +204,15 @@ def start
end

refresh_watchers unless @skip_refresh_on_startup
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))

@threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end
@threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads.

@threads.each { |thr|
thr.join
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it blocks here, all code after this is blocking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@threads is hash. so thr is Array.

}
end

def stop
Expand Down Expand Up @@ -279,11 +290,16 @@ def refresh_watchers

stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?

log.debug "Thread refresh_watchers"
@threads.each { |thr|
log.debug "Thread #{thr[0]} #{thr[1].status}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

}
end

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, @read_bytes_limit_per_second, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
Expand All @@ -302,31 +318,49 @@ def setup_watcher(path, pe)

def start_watchers(paths)
paths.each { |path|
pe = nil
if @pf
pe = @pf[path]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
end
unless @threads[path].nil?
log.debug "Check Thread #{path} #{@threads[path].status}"
if @threads[path].status != "sleep" and @threads[path].status != "run"
log.debug "Stopping Thread #{path} #{@threads[path].status}"
@threads[path].exit
@threads.delete(path)
end
end
if @threads[path].nil?
log.debug "Add Thread #{path}"
@threads[path] = Thread.new(path) do |path|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change these codes to run on new thread?

pe = nil
if @pf
pe = @pf[path]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
end
end
end

begin
tw = setup_watcher(path, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
begin
tw = setup_watcher(path, pe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be a race condition. before passing pe to setup_watcher, L334 should be called. but the current code does not ensure it.

rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
end
@tails[path] = tw
end
end
@tails[path] = tw
}
end

def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true)
paths.each { |path|
tw = remove_watcher ? @tails.delete(path) : @tails[path]
if remove_watcher
@threads[path].exit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread#exit is dangerous. could you finish this thread in a proper way?

@threads.delete(path)
end

if tw
tw.unwatched = unwatched
if immediate
Expand All @@ -340,6 +374,8 @@ def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: tru

def close_watcher_handles
@tails.keys.each do |path|
@threads[path].exit
@threads.delete(path)
tw = @tails.delete(path)
if tw
tw.close
Expand All @@ -356,6 +392,7 @@ def update_watcher(path, pe)
end
end
rotated_tw = @tails[path]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary

@tails[path] = setup_watcher(path, pe)
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end
Expand Down Expand Up @@ -494,14 +531,15 @@ def parse_multilines(lines, tail_watcher)
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, read_bytes_limit_per_second, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@enable_watch_timer = enable_watch_timer
@enable_stat_watcher = enable_stat_watcher
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@update_watcher = update_watcher

Expand All @@ -519,7 +557,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
end

attr_reader :path
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :log, :pe, :read_lines_limit, :read_bytes_limit_per_second, :open_on_every_update
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher
attr_accessor :timer_trigger
Expand Down Expand Up @@ -747,16 +785,37 @@ def on_notify
def handle_notify
with_io do |io|
begin
bytes_to_read = 8192
number_bytes_read = 0
start_reading = Time.new
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@fifo << io.readpartial(8192, @iobuf)
@fifo << io.readpartial(bytes_to_read, @iobuf)
@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit

number_bytes_read += bytes_to_read
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IO#readpartial does not alway read bytes_to_read bytes. Is this code ok?

limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second && @watcher.read_bytes_limit_per_second > 0)


@watcher.log.debug("reading file: #{ @watcher.path}")
if @lines.size >= @watcher.read_lines_limit or limit_bytes_per_second_reached
# not to use too much memory in case the file is very large
read_more = true

if limit_bytes_per_second_reached
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Time.new - start_reading
@watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if (time_spent_reading < 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (time_spent_reading < 1)
if time_spent_reading < 1

debug_time = 1 - time_spent_reading
@watcher.log.debug("sleep: #{debug_time}")
sleep(1 - time_spent_reading)
end
start_reading = Time.new
end

break
end
end
Expand Down
34 changes: 32 additions & 2 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal 2, d.instance.rotate_wait
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal -1, d.instance.read_bytes_limit_per_second
assert_equal false, d.instance.ignore_repeated_permission_error
end

Expand Down Expand Up @@ -212,6 +213,35 @@ def test_emit_with_read_lines_limit(data)
assert_equal(num_events, d.emit_count)
end

data('flat 1' => [:flat, 100, 8192, 10],
'flat 10' => [:flat, 100, (8192 * 10), 20],
'parse 1' => [:parse, 100, (8192 * 3), 10],
'parse 10' => [:parse, 100, (8192 * 10), 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for x in 0..20
f.puts msg
end
}
end

events = d.events
assert_equal(true, events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_read_from_head(data)
Expand Down Expand Up @@ -1061,7 +1091,7 @@ def test_z_refresh_watchers
Timecop.freeze(2010, 1, 2, 3, 4, 5) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
EX_PATHS.each do |path|
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, -1, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') { |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand All @@ -1079,7 +1109,7 @@ def test_z_refresh_watchers

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, -1, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') do |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand Down