Skip to content

Commit

Permalink
Add log throttling per file
Browse files Browse the repository at this point in the history
Remove debug log
  • Loading branch information
Anthony Comtois committed Nov 4, 2019
1 parent 0ac4f42 commit f9eef2b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
18 changes: 14 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def initialize
config_param :refresh_interval, :time, default: 60
desc 'The number of reading lines at each IO.'
config_param :read_lines_limit, :integer, default: 1000
desc 'The number of reading lines per notify'
config_param :read_lines_limit_per_notify, :integer, default: -1
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
Expand Down Expand Up @@ -283,7 +285,7 @@ def refresh_watchers

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, @read_lines_limit_per_notify, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
Expand Down Expand Up @@ -494,14 +496,15 @@ def parse_multilines(lines, tail_watcher)
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, read_lines_limit_per_notify, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@enable_watch_timer = enable_watch_timer
@enable_stat_watcher = enable_stat_watcher
@read_lines_limit = read_lines_limit
@read_lines_limit_per_notify = read_lines_limit_per_notify
@receive_lines = receive_lines
@update_watcher = update_watcher

Expand All @@ -519,7 +522,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
end

attr_reader :path
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :log, :pe, :read_lines_limit, :read_lines_limit_per_notify, :open_on_every_update
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher
attr_accessor :timer_trigger
Expand Down Expand Up @@ -754,11 +757,18 @@ def handle_notify
while true
@fifo << io.readpartial(8192, @iobuf)
@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit
limit_per_notify = @lines.size >= @watcher.read_lines_limit_per_notify and @watcher.read_lines_limit_per_notify > 0
if limit_per_notify
# stop reading files when we reach the read lines limit per notify, to throttle the log ingestion
read_more = false
elsif @lines.size >= @watcher.read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
end
if @lines.size >= @watcher.read_lines_limit or limit_per_notify
break
end

end
rescue EOFError
end
Expand Down
34 changes: 32 additions & 2 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal 2, d.instance.rotate_wait
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal -1, d.instance.read_lines_limit_per_notify
assert_equal false, d.instance.ignore_repeated_permission_error
end

Expand Down Expand Up @@ -212,6 +213,35 @@ def test_emit_with_read_lines_limit(data)
assert_equal(num_events, d.emit_count)
end

data('flat 1' => [:flat, 100, 1, 10],
'flat 10' => [:flat, 100, 10, 20],
'parse 1' => [:parse, 100, 3, 10],
'parse 10' => [:parse, 100, 10, 20])
def test_emit_with_read_lines_limit_per_notify(data)
config_style, limit, limit_per_notify, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_lines_limit_per_notify" => limit_per_notify })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_lines_limit_per_notify" => limit_per_notify }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for x in 0..20
f.puts msg
end
}
end

events = d.events
assert_equal(true, events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_read_from_head(data)
Expand Down Expand Up @@ -1061,7 +1091,7 @@ def test_z_refresh_watchers
Timecop.freeze(2010, 1, 2, 3, 4, 5) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
EX_PATHS.each do |path|
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, -1, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') { |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand All @@ -1079,7 +1109,7 @@ def test_z_refresh_watchers

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, -1, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') do |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand Down

0 comments on commit f9eef2b

Please sign in to comment.