diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index dd94560d0b..8861cc015e 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1012,6 +1012,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size @skip_current_line = false + @skipping_current_line_bytesize = 0 @log = log end @@ -1064,8 +1065,9 @@ def read_lines(lines) if is_long_line @log.warn "received line length is longer than #{@max_line_size}" @log.debug("skipped line: ") { convert(rbuf).chomp } - @skip_current_line = false has_skipped_line = true + @skip_current_line = false + @skipping_current_line_bytesize = 0 next end @@ -1077,14 +1079,16 @@ def read_lines(lines) ) if is_long_current_line - @buffer.clear @skip_current_line = true + @skipping_current_line_bytesize += @buffer.bytesize + @buffer.clear end return has_skipped_line end - def bytesize + def reading_bytesize + return @skipping_current_line_bytesize if @skip_current_line @buffer.bytesize end end @@ -1223,10 +1227,10 @@ def handle_notify end if @lines.empty? - @watcher.pe.update_pos(io.pos - @fifo.bytesize) if has_skipped_line + @watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) if has_skipped_line else if @receive_lines.call(@lines, @watcher) - @watcher.pe.update_pos(io.pos - @fifo.bytesize) + @watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) @lines.clear else read_more = false @@ -1238,12 +1242,12 @@ def handle_notify def open io = Fluent::FileWrapper.open(@path) - io.seek(@watcher.pe.read_pos + @fifo.bytesize) + io.seek(@watcher.pe.read_pos + @fifo.reading_bytesize) @metrics.opened.inc io rescue RangeError io.close if io - raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" + raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.reading_bytesize.to_s(16)}" rescue Errno::EACCES => e @log.warn "#{e}" nil diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index ebe0729d49..ebead83b92 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -124,7 +124,7 @@ class IntailFIFO < Test::Unit::TestCase fifo << text.force_encoding(Encoding::ASCII_8BIT) fifo.read_lines(lines) # The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size. - assert { fifo.bytesize <= max_line_size } + assert { fifo.buffer.bytesize <= max_line_size } end assert_equal expected, lines @@ -142,23 +142,50 @@ class IntailFIFO < Test::Unit::TestCase end end - sub_test_case '#bytesize' do - test 'reutrns buffer size' do + sub_test_case '#reading_bytesize' do + test 'returns buffer size' do fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = "test\n" * 3 + 'test' fifo << text - assert_equal text.bytesize, fifo.bytesize + assert_equal text.bytesize, fifo.reading_bytesize lines = [] fifo.read_lines(lines) assert_equal ["test\n", "test\n", "test\n"], lines - assert_equal 'test'.bytesize, fifo.bytesize + assert_equal 'test'.bytesize, fifo.reading_bytesize fifo << "2\n" fifo.read_lines(lines) assert_equal ["test\n", "test\n", "test\n", "test2\n"], lines - assert_equal 0, fifo.bytesize + assert_equal 0, fifo.reading_bytesize + end + + test 'returns the entire line size even if the size is over max_line_size' do + max_line_size = 20 + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) + lines = [] + + text = "long line still not having EOL" + fifo << text + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal text.bytesize, fifo.reading_bytesize + + text2 = " following texts" + fifo << text2 + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal text.bytesize + text2.bytesize, fifo.reading_bytesize + + text3 = " end of the line\n" + fifo << text3 + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal 0, fifo.reading_bytesize end end end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 9397006763..5174ff9cfb 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -148,33 +148,144 @@ def create_watcher end end - test 'does not call receive_lines when line_size exceeds max_line_size' do - t = 'x' * (8192) - text = "#{t}\n" + sub_test_case 'max_line_size' do + test 'does not call receive_lines when line_size exceeds max_line_size' do + t = 'x' * (8192) + text = "#{t}\n" - max_line_size = 8192 + max_line_size = 8192 - @file.write(text) - @file.close + @file.write(text) + @file.close - update_pos = 0 + update_pos = 0 - watcher = create_watcher - stub(watcher).pe do - pe = 'position_file' - stub(pe).read_pos {0} - stub(pe).update_pos { |val| update_pos = val } - pe + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end + + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true + end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 0, returned_lines.size end - returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| - returned_lines << lines.dup - true + data( + "open_on_every_update false" => false, + "open_on_every_update true" => true, + ) + test 'manage pos correctly if a long line not having EOL occurs' do |open_on_every_update| + max_line_size = 20 + returned_lines = [] + pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos { pos } + stub(pe).update_pos { |val| pos = val } + pe + end + + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + short_line = "short line\n" + long_lines = [ + "long line still not having EOL", + " end of the line\n", + ] + + @file.write(short_line) + @file.write(long_lines[0]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + assert_equal short_line.bytesize, pos + + @file.write(long_lines[1]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + expected_size = short_line.bytesize + long_lines[0..1].map{|l| l.bytesize}.sum + assert_equal expected_size, pos + + io_handler.close end - r.on_notify - assert_equal text.bytesize, update_pos - assert_equal 0, returned_lines.size + data( + "open_on_every_update false" => false, + "open_on_every_update true" => true, + ) + test 'discards a subsequent data in a long line even if restarting occurs between' do |open_on_every_update| + max_line_size = 20 + returned_lines = [] + pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos { pos } + stub(pe).update_pos { |val| pos = val } + pe + end + + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + short_line = "short line\n" + long_lines = [ + "long line still not having EOL", + " end of the line\n", + ] + + @file.write(short_line) + @file.write(long_lines[0]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + + io_handler.close + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + @file.write(long_lines[1]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + + io_handler.close + end end end