Skip to content

Commit

Permalink
Merge pull request fluent#4530 from yugeeklab/master
Browse files Browse the repository at this point in the history
Fix line skipping issue in receive_lines method
  • Loading branch information
ashie authored Jul 14, 2024
2 parents d4729ef + 955cee5 commit 64595e2
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 68 deletions.
75 changes: 56 additions & 19 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def update_watcher(tail_watcher, pe, new_inode)
if @follow_inodes && new_inode.nil?
# nil inode means the file disappeared, so we only need to stop it.
@tails.delete(tail_watcher.path)
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
# Duplication will occur if `refresh_watcher` is called during the `rotate_wait`.
Expand Down Expand Up @@ -696,14 +696,6 @@ def flush_buffer(tw, buf)

# @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
def receive_lines(lines, tail_watcher)
lines = lines.reject do |line|
skip_line = @max_line_size ? line.bytesize > @max_line_size : false
if skip_line
log.warn "received line length is longer than #{@max_line_size}"
log.debug "skipped line: #{line.chomp}"
end
skip_line
end
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
tag = if @tag_prefix || @tag_suffix
Expand Down Expand Up @@ -819,6 +811,7 @@ def io_handler(watcher, path)
from_encoding: @from_encoding,
encoding: @encoding,
metrics: @metrics,
max_line_size: @max_line_size,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -1011,15 +1004,19 @@ def swap_state(pe)
end

class FIFO
def initialize(from_encoding, encoding)
def initialize(from_encoding, encoding, log, max_line_size=nil)
@from_encoding = from_encoding
@encoding = encoding
@need_enc = from_encoding != encoding
@buffer = ''.force_encoding(from_encoding)
@eol = "\n".encode(from_encoding).freeze
@max_line_size = max_line_size
@skip_current_line = false
@skipping_current_line_bytesize = 0
@log = log
end

attr_reader :from_encoding, :encoding, :buffer
attr_reader :from_encoding, :encoding, :buffer, :max_line_size

def <<(chunk)
# Although "chunk" is most likely transient besides String#force_encoding itself
Expand Down Expand Up @@ -1051,6 +1048,7 @@ def convert(s)

def read_lines(lines)
idx = @buffer.index(@eol)
has_skipped_line = false

until idx.nil?
# Using freeze and slice is faster than slice!
Expand All @@ -1059,11 +1057,47 @@ def read_lines(lines)
rbuf = @buffer.slice(0, idx + 1)
@buffer = @buffer.slice(idx + 1, @buffer.size)
idx = @buffer.index(@eol)

is_long_line = @max_line_size && (
@skip_current_line || rbuf.bytesize > @max_line_size
)

if is_long_line
@log.warn "received line length is longer than #{@max_line_size}"
if @skip_current_line
@log.debug("The continuing line is finished. Finally discarded data: ") { convert(rbuf).chomp }
else
@log.debug("skipped line: ") { convert(rbuf).chomp }
end
has_skipped_line = true
@skip_current_line = false
@skipping_current_line_bytesize = 0
next
end

lines << convert(rbuf)
end

is_long_current_line = @max_line_size && (
@skip_current_line || @buffer.bytesize > @max_line_size
)

if is_long_current_line
@log.debug(
"The continuing current line length is longer than #{@max_line_size}." +
" The received data will be discarded until this line is finished." +
" Discarded data: "
) { convert(@buffer).chomp }
@skip_current_line = true
@skipping_current_line_bytesize += @buffer.bytesize
@buffer.clear
end

return has_skipped_line
end

def bytesize
def reading_bytesize
return @skipping_current_line_bytesize if @skip_current_line
@buffer.bytesize
end
end
Expand All @@ -1074,14 +1108,14 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size)
@iobuf = ''.force_encoding('ASCII-8BIT')
@lines = []
@io = nil
Expand Down Expand Up @@ -1164,6 +1198,7 @@ def handle_notify
with_io do |io|
begin
read_more = false
has_skipped_line = false

if !io.nil? && @lines.empty?
begin
Expand All @@ -1177,7 +1212,7 @@ def handle_notify
@fifo << data

n_lines_before_read = @lines.size
@fifo.read_lines(@lines)
has_skipped_line = @fifo.read_lines(@lines) || has_skipped_line
group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read)

group_watcher_limit = group_watcher&.limit_lines_reached?(@path)
Expand All @@ -1200,9 +1235,11 @@ def handle_notify
end
end

unless @lines.empty?
if @lines.empty?
@watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) if has_skipped_line
else
if @receive_lines.call(@lines, @watcher)
@watcher.pe.update_pos(io.pos - @fifo.bytesize)
@watcher.pe.update_pos(io.pos - @fifo.reading_bytesize)
@lines.clear
else
read_more = false
Expand All @@ -1214,12 +1251,12 @@ def handle_notify

def open
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
io.seek(@watcher.pe.read_pos + @fifo.reading_bytesize)
@metrics.opened.inc
io
rescue RangeError
io.close if io
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.reading_bytesize.to_s(16)}"
rescue Errno::EACCES => e
@log.warn "#{e}"
nil
Expand Down
98 changes: 84 additions & 14 deletions test/plugin/in_tail/test_fifo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class IntailFIFO < Test::Unit::TestCase
sub_test_case '#read_line' do
test 'returns lines spliting per `\n`' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'concant line when line is separated' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'returns lines which convert encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reads lines as from_encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case 'when it includes multi byte chars' do
test 'handles it as ascii_8bit' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'replaces character with ? when convert error happens' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reutrns nothing when buffer is empty' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
lines = []
fifo.read_lines(lines)
assert_equal [], lines
Expand All @@ -86,11 +86,54 @@ class IntailFIFO < Test::Unit::TestCase
fifo.read_lines(lines)
assert_equal [], lines
end

data('bigger than max_line_size', [
["test test test\n" * 3],
[],
])
data('less than or equal to max_line_size', [
["test\n" * 2],
["test\n", "test\n"],
])
data('mix', [
["test test test\ntest\ntest test test\ntest\ntest test test\n"],
["test\n", "test\n"],
])
data('mix and multiple', [
[
"test test test\ntest\n",
"test",
" test test\nt",
"est\nt"
],
["test\n", "test\n"],
])
data('remaining data bigger than max_line_size should be discarded', [
[
"test\nlong line still not having EOL",
"following texts to the previous long line\ntest\n",
],
["test\n", "test\n"],
])
test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)|
max_line_size = 5
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

input_texts.each do |text|
fifo << text.force_encoding(Encoding::ASCII_8BIT)
fifo.read_lines(lines)
# The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size.
assert { fifo.buffer.bytesize <= max_line_size }
end

assert_equal expected, lines
end
end

sub_test_case '#<<' do
test 'does not make any change about encoding to an argument' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)

assert_equal Encoding::UTF_8, text.encoding
Expand All @@ -99,23 +142,50 @@ class IntailFIFO < Test::Unit::TestCase
end
end

sub_test_case '#bytesize' do
test 'reutrns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
sub_test_case '#reading_bytesize' do
test 'returns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = "test\n" * 3 + 'test'
fifo << text

assert_equal text.bytesize, fifo.bytesize
assert_equal text.bytesize, fifo.reading_bytesize
lines = []
fifo.read_lines(lines)
assert_equal ["test\n", "test\n", "test\n"], lines

assert_equal 'test'.bytesize, fifo.bytesize
assert_equal 'test'.bytesize, fifo.reading_bytesize
fifo << "2\n"
fifo.read_lines(lines)
assert_equal ["test\n", "test\n", "test\n", "test2\n"], lines

assert_equal 0, fifo.bytesize
assert_equal 0, fifo.reading_bytesize
end

test 'returns the entire line size even if the size is over max_line_size' do
max_line_size = 20
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

text = "long line still not having EOL"
fifo << text
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal text.bytesize, fifo.reading_bytesize

text2 = " following texts"
fifo << text2
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal text.bytesize + text2.bytesize, fifo.reading_bytesize

text3 = " end of the line\n"
fifo << text3
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal 0, fifo.reading_bytesize
end
end
end
Loading

0 comments on commit 64595e2

Please sign in to comment.