diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fc5761f917..6a83075cc5 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -566,7 +566,7 @@ def update_watcher(tail_watcher, pe, new_inode) if @follow_inodes && new_inode.nil? # nil inode means the file disappeared, so we only need to stop it. @tails.delete(tail_watcher.path) - # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 # Because of this problem, log duplication can occur during `rotate_wait`. # Need to set `rotate_wait 0` for a workaround. # Duplication will occur if `refresh_watcher` is called during the `rotate_wait`. @@ -696,14 +696,6 @@ def flush_buffer(tw, buf) # @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError def receive_lines(lines, tail_watcher) - lines = lines.reject do |line| - skip_line = @max_line_size ? line.bytesize > @max_line_size : false - if skip_line - log.warn "received line length is longer than #{@max_line_size}" - log.debug "skipped line: #{line.chomp}" - end - skip_line - end es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @@ -819,6 +811,7 @@ def io_handler(watcher, path) from_encoding: @from_encoding, encoding: @encoding, metrics: @metrics, + max_line_size: @max_line_size, &method(:receive_lines) ) end @@ -1011,15 +1004,19 @@ def swap_state(pe) end class FIFO - def initialize(from_encoding, encoding) + def initialize(from_encoding, encoding, log, max_line_size=nil) @from_encoding = from_encoding @encoding = encoding @need_enc = from_encoding != encoding @buffer = ''.force_encoding(from_encoding) @eol = "\n".encode(from_encoding).freeze + @max_line_size = max_line_size + @skip_current_line = false + @skipping_current_line_bytesize = 0 + @log = log end - attr_reader :from_encoding, :encoding, :buffer + attr_reader :from_encoding, :encoding, :buffer, :max_line_size def <<(chunk) # Although "chunk" is most likely transient besides String#force_encoding itself @@ -1051,6 +1048,7 @@ def convert(s) def read_lines(lines) idx = @buffer.index(@eol) + has_skipped_line = false until idx.nil? # Using freeze and slice is faster than slice! @@ -1059,11 +1057,47 @@ def read_lines(lines) rbuf = @buffer.slice(0, idx + 1) @buffer = @buffer.slice(idx + 1, @buffer.size) idx = @buffer.index(@eol) + + is_long_line = @max_line_size && ( + @skip_current_line || rbuf.bytesize > @max_line_size + ) + + if is_long_line + @log.warn "received line length is longer than #{@max_line_size}" + if @skip_current_line + @log.debug("The continuing line is finished. Finally discarded data: ") { convert(rbuf).chomp } + else + @log.debug("skipped line: ") { convert(rbuf).chomp } + end + has_skipped_line = true + @skip_current_line = false + @skipping_current_line_bytesize = 0 + next + end + lines << convert(rbuf) end + + is_long_current_line = @max_line_size && ( + @skip_current_line || @buffer.bytesize > @max_line_size + ) + + if is_long_current_line + @log.debug( + "The continuing current line length is longer than #{@max_line_size}." + + " The received data will be discarded until this line is finished." + + " Discarded data: " + ) { convert(@buffer).chomp } + @skip_current_line = true + @skipping_current_line_bytesize += @buffer.bytesize + @buffer.clear + end + + return has_skipped_line end - def bytesize + def reading_bytesize + return @skipping_current_line_bytesize if @skip_current_line @buffer.bytesize end end @@ -1074,14 +1108,14 @@ class IOHandler attr_accessor :shutdown_timeout - def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update - @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) + @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @io = nil @@ -1164,6 +1198,7 @@ def handle_notify with_io do |io| begin read_more = false + has_skipped_line = false if !io.nil? && @lines.empty? begin @@ -1177,7 +1212,7 @@ def handle_notify @fifo << data n_lines_before_read = @lines.size - @fifo.read_lines(@lines) + has_skipped_line = @fifo.read_lines(@lines) || has_skipped_line group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) group_watcher_limit = group_watcher&.limit_lines_reached?(@path) @@ -1200,9 +1235,11 @@ def handle_notify end end - unless @lines.empty? + if @lines.empty? + @watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) if has_skipped_line + else if @receive_lines.call(@lines, @watcher) - @watcher.pe.update_pos(io.pos - @fifo.bytesize) + @watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) @lines.clear else read_more = false @@ -1214,12 +1251,12 @@ def handle_notify def open io = Fluent::FileWrapper.open(@path) - io.seek(@watcher.pe.read_pos + @fifo.bytesize) + io.seek(@watcher.pe.read_pos + @fifo.reading_bytesize) @metrics.opened.inc io rescue RangeError io.close if io - raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" + raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.reading_bytesize.to_s(16)}" rescue Errno::EACCES => e @log.warn "#{e}" nil diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index 86345d8bb9..ebead83b92 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -5,7 +5,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#read_line' do test 'returns lines spliting per `\n`' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'concant line when line is separated' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'returns lines which convert encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reads lines as from_encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case 'when it includes multi byte chars' do test 'handles it as ascii_8bit' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'replaces character with ? when convert error happens' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reutrns nothing when buffer is empty' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) lines = [] fifo.read_lines(lines) assert_equal [], lines @@ -86,11 +86,54 @@ class IntailFIFO < Test::Unit::TestCase fifo.read_lines(lines) assert_equal [], lines end + + data('bigger than max_line_size', [ + ["test test test\n" * 3], + [], + ]) + data('less than or equal to max_line_size', [ + ["test\n" * 2], + ["test\n", "test\n"], + ]) + data('mix', [ + ["test test test\ntest\ntest test test\ntest\ntest test test\n"], + ["test\n", "test\n"], + ]) + data('mix and multiple', [ + [ + "test test test\ntest\n", + "test", + " test test\nt", + "est\nt" + ], + ["test\n", "test\n"], + ]) + data('remaining data bigger than max_line_size should be discarded', [ + [ + "test\nlong line still not having EOL", + "following texts to the previous long line\ntest\n", + ], + ["test\n", "test\n"], + ]) + test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)| + max_line_size = 5 + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) + lines = [] + + input_texts.each do |text| + fifo << text.force_encoding(Encoding::ASCII_8BIT) + fifo.read_lines(lines) + # The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size. + assert { fifo.buffer.bytesize <= max_line_size } + end + + assert_equal expected, lines + end end sub_test_case '#<<' do test 'does not make any change about encoding to an argument' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) assert_equal Encoding::UTF_8, text.encoding @@ -99,23 +142,50 @@ class IntailFIFO < Test::Unit::TestCase end end - sub_test_case '#bytesize' do - test 'reutrns buffer size' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + sub_test_case '#reading_bytesize' do + test 'returns buffer size' do + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = "test\n" * 3 + 'test' fifo << text - assert_equal text.bytesize, fifo.bytesize + assert_equal text.bytesize, fifo.reading_bytesize lines = [] fifo.read_lines(lines) assert_equal ["test\n", "test\n", "test\n"], lines - assert_equal 'test'.bytesize, fifo.bytesize + assert_equal 'test'.bytesize, fifo.reading_bytesize fifo << "2\n" fifo.read_lines(lines) assert_equal ["test\n", "test\n", "test\n", "test2\n"], lines - assert_equal 0, fifo.bytesize + assert_equal 0, fifo.reading_bytesize + end + + test 'returns the entire line size even if the size is over max_line_size' do + max_line_size = 20 + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) + lines = [] + + text = "long line still not having EOL" + fifo << text + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal text.bytesize, fifo.reading_bytesize + + text2 = " following texts" + fifo << text2 + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal text.bytesize + text2.bytesize, fifo.reading_bytesize + + text3 = " end of the line\n" + fifo << text3 + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal 0, fifo.reading_bytesize end end end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 647c4a55a1..5174ff9cfb 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -147,4 +147,145 @@ def create_watcher assert_equal 3, returned_lines[1].size end end + + sub_test_case 'max_line_size' do + test 'does not call receive_lines when line_size exceeds max_line_size' do + t = 'x' * (8192) + text = "#{t}\n" + + max_line_size = 8192 + + @file.write(text) + @file.close + + update_pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end + + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true + end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 0, returned_lines.size + end + + data( + "open_on_every_update false" => false, + "open_on_every_update true" => true, + ) + test 'manage pos correctly if a long line not having EOL occurs' do |open_on_every_update| + max_line_size = 20 + returned_lines = [] + pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos { pos } + stub(pe).update_pos { |val| pos = val } + pe + end + + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + short_line = "short line\n" + long_lines = [ + "long line still not having EOL", + " end of the line\n", + ] + + @file.write(short_line) + @file.write(long_lines[0]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + assert_equal short_line.bytesize, pos + + @file.write(long_lines[1]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + expected_size = short_line.bytesize + long_lines[0..1].map{|l| l.bytesize}.sum + assert_equal expected_size, pos + + io_handler.close + end + + data( + "open_on_every_update false" => false, + "open_on_every_update true" => true, + ) + test 'discards a subsequent data in a long line even if restarting occurs between' do |open_on_every_update| + max_line_size = 20 + returned_lines = [] + pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos { pos } + stub(pe).update_pos { |val| pos = val } + pe + end + + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + short_line = "short line\n" + long_lines = [ + "long line still not having EOL", + " end of the line\n", + ] + + @file.write(short_line) + @file.write(long_lines[0]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + + io_handler.close + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + @file.write(long_lines[1]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + + io_handler.close + end + end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d30595ce9a..bb16c1ab8e 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2011,41 +2011,6 @@ def test_tag_prefix_and_suffix_ignore mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - - data( - small: ["128", 128], - KiB: ["1k", 1024] - ) - test 'max_line_size' do |(label, size)| - config = config_element("", "", { - "tag" => "max_line_size", - "path" => "#{@tmp_dir}/with_long_lines.txt", - "format" => "none", - "read_from_head" => true, - "max_line_size" => label, - "log_level" => "debug" - }) - Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| - f.puts "foo" - f.puts "x" * size # 'x' * size + \n > @max_line_size - f.puts "bar" - end - d = create_driver(config, false) - timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") - Timecop.freeze(timestamp) - d.run(expect_records: 2) - assert_equal([ - [{"message" => "foo"},{"message" => "bar"}], - [ - "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", - "2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n" - ] - ], - [ - d.events.collect { |event| event.last }, - d.logs[-2..] - ]) - end end # Ensure that no fatal exception is raised when a file is missing and that @@ -3426,4 +3391,39 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait ) end end + + data( + small: ["128", 128], + KiB: ["1k", 1024] + ) + test 'max_line_size' do |(label, size)| + config = config_element("", "", { + "tag" => "max_line_size", + "path" => "#{@tmp_dir}/with_long_lines.txt", + "format" => "none", + "read_from_head" => true, + "max_line_size" => label, + "log_level" => "debug" + }) + Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| + f.puts "foo" + f.puts "x" * size # 'x' * size + \n > @max_line_size + f.puts "bar" + end + d = create_driver(config, false) + timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") + Timecop.freeze(timestamp) + d.run(expect_records: 2) + assert_equal([ + [{"message" => "foo"},{"message" => "bar"}], + [ + "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", + "2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n" + ] + ], + [ + d.events.collect { |event| event.last }, + d.logs[-2..] + ]) + end end