diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 953c6cbab4..5b92d955bc 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -566,7 +566,7 @@ def update_watcher(tail_watcher, pe, new_inode) if @follow_inodes && new_inode.nil? # nil inode means the file disappeared, so we only need to stop it. @tails.delete(tail_watcher.path) - # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 # Because of this problem, log duplication can occur during `rotate_wait`. # Need to set `rotate_wait 0` for a workaround. # Duplication will occur if `refresh_watcher` is called during the `rotate_wait`. @@ -1012,6 +1012,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size @was_long_line = false + @has_skipped_line = false @log = log end @@ -1047,6 +1048,7 @@ def convert(s) def read_lines(lines) idx = @buffer.index(@eol) + @has_skipped_line = false until idx.nil? # Using freeze and slice is faster than slice! @@ -1064,6 +1066,7 @@ def read_lines(lines) @log.warn "received line length is longer than #{@max_line_size}" @log.debug("skipped line: ") { convert(rbuf).chomp } @was_long_line = false + @has_skipped_line = true next end @@ -1077,12 +1080,17 @@ def read_lines(lines) if is_long_line @buffer.clear @was_long_line = true + @has_skipped_line = true end end def bytesize @buffer.bytesize end + + def has_skipped_line? + @has_skipped_line + end end class IOHandler @@ -1181,6 +1189,7 @@ def handle_notify with_io do |io| begin read_more = false + has_skipped_line = false if !io.nil? && @lines.empty? begin @@ -1195,6 +1204,7 @@ def handle_notify n_lines_before_read = @lines.size @fifo.read_lines(@lines) + has_skipped_line = @fifo.has_skipped_line? group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) group_watcher_limit = group_watcher&.limit_lines_reached?(@path) @@ -1217,6 +1227,10 @@ def handle_notify end end + if @lines.empty? && has_skipped_line + @watcher.pe.update_pos(io.pos - @fifo.bytesize) + end + unless @lines.empty? if @receive_lines.call(@lines, @watcher) @watcher.pe.update_pos(io.pos - @fifo.bytesize) diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 84d39ceaae..9397006763 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -146,36 +146,35 @@ def create_watcher assert_equal 5, returned_lines[0].size assert_equal 3, returned_lines[1].size end + end - test 'call receive_lines some times when long line(more than 8192) and max_line_size is 4096' do - t = 'line' * (8192 / 8) - text = "#{t}\n" * 8 - t = 'line' * 8 - text += "#{t}\n" * 8 + test 'does not call receive_lines when line_size exceeds max_line_size' do + t = 'x' * (8192) + text = "#{t}\n" - @file.write(text) - @file.close + max_line_size = 8192 - update_pos = 0 + @file.write(text) + @file.close - watcher = create_watcher - stub(watcher).pe do - pe = 'position_file' - stub(pe).read_pos {0} - stub(pe).update_pos { |val| update_pos = val } - pe - end + update_pos = 0 - returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, max_line_size: 4096, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| - returned_lines << lines.dup - true - end + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end - r.on_notify - assert_equal text.bytesize, update_pos - assert_equal 1, returned_lines.size - assert_equal 8, returned_lines[0].size + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 0, returned_lines.size end end