From f5bb69b72d5b9608d2505fb046a6fd0748563df7 Mon Sep 17 00:00:00 2001 From: Anthony Comtois Date: Tue, 29 Oct 2019 10:06:49 +0000 Subject: [PATCH] Add log throttling per file --- lib/fluent/plugin/in_tail.rb | 19 +++++++++++++++---- test/plugin/test_in_tail.rb | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6a226a544d..bbc7befe76 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -74,6 +74,8 @@ def initialize config_param :refresh_interval, :time, default: 60 desc 'The number of reading lines at each IO.' config_param :read_lines_limit, :integer, default: 1000 + desc 'The number of reading lines per notify' + config_param :read_lines_limit_per_notify, :integer, default: -1 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil desc 'Enable the option to emit unmatched lines.' @@ -283,7 +285,7 @@ def refresh_watchers def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil - tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) + tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, @read_lines_limit_per_notify, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) tw.attach do |watcher| event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger @@ -494,7 +496,7 @@ def parse_multilines(lines, tail_watcher) end class TailWatcher - def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) + def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, read_lines_limit_per_notify, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @@ -502,6 +504,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e @enable_watch_timer = enable_watch_timer @enable_stat_watcher = enable_stat_watcher @read_lines_limit = read_lines_limit + @read_lines_limit_per_notify = read_lines_limit_per_notify @receive_lines = receive_lines @update_watcher = update_watcher @@ -519,7 +522,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e end attr_reader :path - attr_reader :log, :pe, :read_lines_limit, :open_on_every_update + attr_reader :log, :pe, :read_lines_limit, :read_lines_limit_per_notify, :open_on_every_update attr_reader :from_encoding, :encoding attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher attr_accessor :timer_trigger @@ -754,11 +757,19 @@ def handle_notify while true @fifo << io.readpartial(8192, @iobuf) @fifo.read_lines(@lines) - if @lines.size >= @watcher.read_lines_limit + @watcher.log.debug(" #{@lines.size} #{@watcher.read_lines_limit} #{@watcher.read_lines_limit_per_notify} debug log") + limit_per_notify = @lines.size >= @watcher.read_lines_limit_per_notify and @watcher.read_lines_limit_per_notify > 0 + if limit_per_notify + # stop reading files when we reach the read lines limit per notify, to throttle the log ingestion + read_more = false + elsif @lines.size >= @watcher.read_lines_limit # not to use too much memory in case the file is very large read_more = true + end + if @lines.size >= @watcher.read_lines_limit or limit_per_notify break end + end rescue EOFError end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d67cbdce3c..e634fe7095 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -73,6 +73,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal 2, d.instance.rotate_wait assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file assert_equal 1000, d.instance.read_lines_limit + assert_equal -1, d.instance.read_lines_limit_per_notify assert_equal false, d.instance.ignore_repeated_permission_error end @@ -212,6 +213,35 @@ def test_emit_with_read_lines_limit(data) assert_equal(num_events, d.emit_count) end + data('flat 1' => [:flat, 100, 1, 10], + 'flat 10' => [:flat, 100, 10, 20], + 'parse 1' => [:parse, 100, 3, 10], + 'parse 10' => [:parse, 100, 10, 20]) + def test_emit_with_read_lines_limit_per_notify(data) + config_style, limit, limit_per_notify, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_lines_limit_per_notify" => limit_per_notify }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_lines_limit_per_notify" => limit_per_notify }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for x in 0..20 + f.puts msg + end + } + end + + events = d.events + assert_equal(true, events.length <= num_events) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + end + data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data) @@ -1061,7 +1091,7 @@ def test_z_refresh_watchers Timecop.freeze(2010, 1, 2, 3, 4, 5) do flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| EX_PATHS.each do |path| - watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do + watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, -1, any, any, any, any, any, any).once.and_return do flexmock('TailWatcher') { |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times @@ -1079,7 +1109,7 @@ def test_z_refresh_watchers Timecop.freeze(2010, 1, 2, 3, 4, 6) do flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| - watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do + watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, -1, any, any, any, any, any, any).once.and_return do flexmock('TailWatcher') do |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times