Skip to content

Commit

Permalink
Fix line skipping issue in receive_lines method
Browse files Browse the repository at this point in the history
Before this patch, long lines could cause breakdowns in fluentd, potentially posing a vulnerability. With this patch, max_line_size will be integrated into the FIFO, enabling the system to skip lines exceeding the maximum size before executing receive_lines.

Co-authored-by: yugeeklab <[email protected]>
Signed-off-by: been.zino <[email protected]>
  • Loading branch information
been-zino and yugeeklab committed May 10, 2024
1 parent 236d87d commit cd9affb
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 57 deletions.
42 changes: 29 additions & 13 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -696,14 +696,6 @@ def flush_buffer(tw, buf)

# @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
def receive_lines(lines, tail_watcher)
lines = lines.reject do |line|
skip_line = @max_line_size ? line.bytesize > @max_line_size : false
if skip_line
log.warn "received line length is longer than #{@max_line_size}"
log.debug "skipped line: #{line.chomp}"
end
skip_line
end
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
tag = if @tag_prefix || @tag_suffix
Expand Down Expand Up @@ -819,6 +811,7 @@ def io_handler(watcher, path)
from_encoding: @from_encoding,
encoding: @encoding,
metrics: @metrics,
max_line_size: @max_line_size,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -1011,15 +1004,18 @@ def swap_state(pe)
end

class FIFO
def initialize(from_encoding, encoding)
def initialize(from_encoding, encoding, log, max_line_size=nil)
@from_encoding = from_encoding
@encoding = encoding
@need_enc = from_encoding != encoding
@buffer = ''.force_encoding(from_encoding)
@eol = "\n".encode(from_encoding).freeze
@max_line_size = max_line_size
@was_long_line = false
@log = log
end

attr_reader :from_encoding, :encoding, :buffer
attr_reader :from_encoding, :encoding, :buffer, :max_line_size

def <<(chunk)
# Although "chunk" is most likely transient besides String#force_encoding itself
Expand Down Expand Up @@ -1059,7 +1055,27 @@ def read_lines(lines)
rbuf = @buffer.slice(0, idx + 1)
@buffer = @buffer.slice(idx + 1, @buffer.size)
idx = @buffer.index(@eol)
lines << convert(rbuf)

is_long_line = !@max_line_size.nil? && (
rbuf.bytesize > @max_line_size || @was_long_line
)

if !is_long_line
lines << convert(rbuf)
else
@log.warn "received line length is longer than #{@max_line_size}"
@log.debug "skipped line"
@was_long_line = false
end
end

is_long_line = !@max_line_size.nil? && (
@buffer.bytesize > @max_line_size || @was_long_line
)

if is_long_line
@buffer = ''.force_encoding(@from_encoding)
@was_long_line = true
end
end

Expand All @@ -1074,14 +1090,14 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size)
@iobuf = ''.force_encoding('ASCII-8BIT')
@lines = []
@io = nil
Expand Down
38 changes: 29 additions & 9 deletions test/plugin/in_tail/test_fifo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class IntailFIFO < Test::Unit::TestCase
sub_test_case '#read_line' do
test 'returns lines spliting per `\n`' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'concant line when line is separated' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'returns lines which convert encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reads lines as from_encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case 'when it includes multi byte chars' do
test 'handles it as ascii_8bit' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'replaces character with ? when convert error happens' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reutrns nothing when buffer is empty' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
lines = []
fifo.read_lines(lines)
assert_equal [], lines
Expand All @@ -86,11 +86,31 @@ class IntailFIFO < Test::Unit::TestCase
fifo.read_lines(lines)
assert_equal [], lines
end

test 'return empty line when line size is bigger than max_line_size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size: 5)
t = "test" * 3
text = ("#{t}\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
fifo.read_lines(lines)
assert_equal [], lines
end

test 'return lines when line size is less than or equal to max_line_size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size: 5)
t = "test"
text = ("#{t}\n" * 2).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
fifo.read_lines(lines)
assert_equal ["test\n", "test\n"], lines
end
end

sub_test_case '#<<' do
test 'does not make any change about encoding to an argument' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)

assert_equal Encoding::UTF_8, text.encoding
Expand All @@ -101,7 +121,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case '#bytesize' do
test 'reutrns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = "test\n" * 3 + 'test'
fifo << text

Expand Down
31 changes: 31 additions & 0 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,36 @@ def create_watcher
assert_equal 5, returned_lines[0].size
assert_equal 3, returned_lines[1].size
end

test 'call receive_lines some times when long line(more than 8192) and max_line_size is 4096' do
t = 'line' * (8192 / 8)
text = "#{t}\n" * 8
t = 'line' * 8
text += "#{t}\n" * 8

@file.write(text)
@file.close

update_pos = 0

watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos {0}
stub(pe).update_pos { |val| update_pos = val }
pe
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, max_line_size: 4096, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end

r.on_notify
assert_equal text.bytesize, update_pos
assert_equal 1, returned_lines.size
assert_equal 8, returned_lines[0].size
end
end
end
70 changes: 35 additions & 35 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2011,41 +2011,6 @@ def test_tag_prefix_and_suffix_ignore
mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once
plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log'))
end

data(
small: ["128", 128],
KiB: ["1k", 1024]
)
test 'max_line_size' do |(label, size)|
config = config_element("", "", {
"tag" => "max_line_size",
"path" => "#{@tmp_dir}/with_long_lines.txt",
"format" => "none",
"read_from_head" => true,
"max_line_size" => label,
"log_level" => "debug"
})
Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f|
f.puts "foo"
f.puts "x" * size # 'x' * size + \n > @max_line_size
f.puts "bar"
end
d = create_driver(config, false)
timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021")
Timecop.freeze(timestamp)
d.run(expect_records: 2)
assert_equal([
[{"message" => "foo"},{"message" => "bar"}],
[
"2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n",
"2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n"
]
],
[
d.events.collect { |event| event.last },
d.logs[-2..]
])
end
end

# Ensure that no fatal exception is raised when a file is missing and that
Expand Down Expand Up @@ -3426,4 +3391,39 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait
)
end
end

data(
small: ["128", 128],
KiB: ["1k", 1024]
)
test 'max_line_size' do |(label, size)|
config = config_element("", "", {
"tag" => "max_line_size",
"path" => "#{@tmp_dir}/with_long_lines.txt",
"format" => "none",
"read_from_head" => true,
"max_line_size" => label,
"log_level" => "debug"
})
Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f|
f.puts "foo"
f.puts "x" * size # 'x' * size + \n > @max_line_size
f.puts "bar"
end
d = create_driver(config, false)
timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021")
Timecop.freeze(timestamp)
d.run(expect_records: 2)
assert_equal([
[{"message" => "foo"},{"message" => "bar"}],
[
"2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n",
"2021-11-29 11:22:33 +0000 [debug]: skipped line\n"
]
],
[
d.events.collect { |event| event.last },
d.logs[-2..]
])
end
end

0 comments on commit cd9affb

Please sign in to comment.