From 7082a957aa5281840936ea1164577173bb16624e Mon Sep 17 00:00:00 2001 From: "been.zino" Date: Fri, 10 May 2024 19:25:25 +0900 Subject: [PATCH] Fix line skipping issue in receive_lines method Before this patch, long lines could cause breakdowns in fluentd, potentially posing a vulnerability. With this patch, max_line_size will be integrated into the FIFO, enabling the system to skip lines exceeding the maximum size before executing receive_lines. Co-authored-by: yugeeklab Co-authored-by: moggaa Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 42 +++++++++++----- test/plugin/in_tail/test_fifo.rb | 38 ++++++++++---- test/plugin/in_tail/test_io_handler.rb | 31 ++++++++++++ test/plugin/test_in_tail.rb | 70 +++++++++++++------------- 4 files changed, 124 insertions(+), 57 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fc5761f917..6846d10e83 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -696,14 +696,6 @@ def flush_buffer(tw, buf) # @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError def receive_lines(lines, tail_watcher) - lines = lines.reject do |line| - skip_line = @max_line_size ? line.bytesize > @max_line_size : false - if skip_line - log.warn "received line length is longer than #{@max_line_size}" - log.debug "skipped line: #{line.chomp}" - end - skip_line - end es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @@ -819,6 +811,7 @@ def io_handler(watcher, path) from_encoding: @from_encoding, encoding: @encoding, metrics: @metrics, + max_line_size: @max_line_size, &method(:receive_lines) ) end @@ -1011,15 +1004,18 @@ def swap_state(pe) end class FIFO - def initialize(from_encoding, encoding) + def initialize(from_encoding, encoding, log, max_line_size=nil) @from_encoding = from_encoding @encoding = encoding @need_enc = from_encoding != encoding @buffer = ''.force_encoding(from_encoding) @eol = "\n".encode(from_encoding).freeze + @max_line_size = max_line_size + @was_long_line = false + @log = log end - attr_reader :from_encoding, :encoding, :buffer + attr_reader :from_encoding, :encoding, :buffer, :max_line_size def <<(chunk) # Although "chunk" is most likely transient besides String#force_encoding itself @@ -1059,7 +1055,27 @@ def read_lines(lines) rbuf = @buffer.slice(0, idx + 1) @buffer = @buffer.slice(idx + 1, @buffer.size) idx = @buffer.index(@eol) - lines << convert(rbuf) + + is_long_line = !@max_line_size.nil? && ( + rbuf.bytesize > @max_line_size || @was_long_line + ) + + if !is_long_line + lines << convert(rbuf) + else + @log.warn "received line length is longer than #{@max_line_size}" + @log.debug "skipped line" + @was_long_line = false + end + end + + is_long_line = !@max_line_size.nil? && ( + @buffer.bytesize > @max_line_size || @was_long_line + ) + + if is_long_line + @buffer = ''.force_encoding(@from_encoding) + @was_long_line = true end end @@ -1074,14 +1090,14 @@ class IOHandler attr_accessor :shutdown_timeout - def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update - @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) + @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @io = nil diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index 86345d8bb9..acd6f9e51d 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -5,7 +5,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#read_line' do test 'returns lines spliting per `\n`' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'concant line when line is separated' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'returns lines which convert encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reads lines as from_encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case 'when it includes multi byte chars' do test 'handles it as ascii_8bit' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'replaces character with ? when convert error happens' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reutrns nothing when buffer is empty' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) lines = [] fifo.read_lines(lines) assert_equal [], lines @@ -86,11 +86,31 @@ class IntailFIFO < Test::Unit::TestCase fifo.read_lines(lines) assert_equal [], lines end + + test 'return empty line when line size is bigger than max_line_size' do + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, 5) + t = "test" * 3 + text = ("#{t}\n" * 3).force_encoding(Encoding::ASCII_8BIT) + fifo << text + lines = [] + fifo.read_lines(lines) + assert_equal [], lines + end + + test 'return lines when line size is less than or equal to max_line_size' do + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, 5) + t = "test" + text = ("#{t}\n" * 2).force_encoding(Encoding::ASCII_8BIT) + fifo << text + lines = [] + fifo.read_lines(lines) + assert_equal ["test\n", "test\n"], lines + end end sub_test_case '#<<' do test 'does not make any change about encoding to an argument' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) assert_equal Encoding::UTF_8, text.encoding @@ -101,7 +121,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#bytesize' do test 'reutrns buffer size' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = "test\n" * 3 + 'test' fifo << text diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 647c4a55a1..84d39ceaae 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -146,5 +146,36 @@ def create_watcher assert_equal 5, returned_lines[0].size assert_equal 3, returned_lines[1].size end + + test 'call receive_lines some times when long line(more than 8192) and max_line_size is 4096' do + t = 'line' * (8192 / 8) + text = "#{t}\n" * 8 + t = 'line' * 8 + text += "#{t}\n" * 8 + + @file.write(text) + @file.close + + update_pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end + + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, max_line_size: 4096, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true + end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 1, returned_lines.size + assert_equal 8, returned_lines[0].size + end end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d30595ce9a..819bb37aaf 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2011,41 +2011,6 @@ def test_tag_prefix_and_suffix_ignore mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - - data( - small: ["128", 128], - KiB: ["1k", 1024] - ) - test 'max_line_size' do |(label, size)| - config = config_element("", "", { - "tag" => "max_line_size", - "path" => "#{@tmp_dir}/with_long_lines.txt", - "format" => "none", - "read_from_head" => true, - "max_line_size" => label, - "log_level" => "debug" - }) - Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| - f.puts "foo" - f.puts "x" * size # 'x' * size + \n > @max_line_size - f.puts "bar" - end - d = create_driver(config, false) - timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") - Timecop.freeze(timestamp) - d.run(expect_records: 2) - assert_equal([ - [{"message" => "foo"},{"message" => "bar"}], - [ - "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", - "2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n" - ] - ], - [ - d.events.collect { |event| event.last }, - d.logs[-2..] - ]) - end end # Ensure that no fatal exception is raised when a file is missing and that @@ -3426,4 +3391,39 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait ) end end + + data( + small: ["128", 128], + KiB: ["1k", 1024] + ) + test 'max_line_size' do |(label, size)| + config = config_element("", "", { + "tag" => "max_line_size", + "path" => "#{@tmp_dir}/with_long_lines.txt", + "format" => "none", + "read_from_head" => true, + "max_line_size" => label, + "log_level" => "debug" + }) + Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| + f.puts "foo" + f.puts "x" * size # 'x' * size + \n > @max_line_size + f.puts "bar" + end + d = create_driver(config, false) + timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") + Timecop.freeze(timestamp) + d.run(expect_records: 2) + assert_equal([ + [{"message" => "foo"},{"message" => "bar"}], + [ + "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", + "2021-11-29 11:22:33 +0000 [debug]: skipped line\n" + ] + ], + [ + d.events.collect { |event| event.last }, + d.logs[-2..] + ]) + end end