Skip to content

Commit

Permalink
fix to commit the correct pos to continue processing correctly
Browse files Browse the repository at this point in the history
Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Jun 12, 2024
1 parent 6c03717 commit fdf74e5
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 33 deletions.
18 changes: 11 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil)
@eol = "\n".encode(from_encoding).freeze
@max_line_size = max_line_size
@skip_current_line = false
@skipping_current_line_bytesize = 0
@log = log
end

Expand Down Expand Up @@ -1064,8 +1065,9 @@ def read_lines(lines)
if is_long_line
@log.warn "received line length is longer than #{@max_line_size}"
@log.debug("skipped line: ") { convert(rbuf).chomp }
@skip_current_line = false
has_skipped_line = true
@skip_current_line = false
@skipping_current_line_bytesize = 0
next
end

Expand All @@ -1077,14 +1079,16 @@ def read_lines(lines)
)

if is_long_current_line
@buffer.clear
@skip_current_line = true
@skipping_current_line_bytesize += @buffer.bytesize
@buffer.clear
end

return has_skipped_line
end

def bytesize
def reading_bytesize
return @skipping_current_line_bytesize if @skip_current_line
@buffer.bytesize
end
end
Expand Down Expand Up @@ -1223,10 +1227,10 @@ def handle_notify
end

if @lines.empty?
@watcher.pe.update_pos(io.pos - @fifo.bytesize) if has_skipped_line
@watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) if has_skipped_line
else
if @receive_lines.call(@lines, @watcher)
@watcher.pe.update_pos(io.pos - @fifo.bytesize)
@watcher.pe.update_pos(io.pos - @fifo.reading_bytesize)
@lines.clear
else
read_more = false
Expand All @@ -1238,12 +1242,12 @@ def handle_notify

def open
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
io.seek(@watcher.pe.read_pos + @fifo.reading_bytesize)
@metrics.opened.inc
io
rescue RangeError
io.close if io
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.reading_bytesize.to_s(16)}"
rescue Errno::EACCES => e
@log.warn "#{e}"
nil
Expand Down
39 changes: 33 additions & 6 deletions test/plugin/in_tail/test_fifo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class IntailFIFO < Test::Unit::TestCase
fifo << text.force_encoding(Encoding::ASCII_8BIT)
fifo.read_lines(lines)
# The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size.
assert { fifo.bytesize <= max_line_size }
assert { fifo.buffer.bytesize <= max_line_size }
end

assert_equal expected, lines
Expand All @@ -142,23 +142,50 @@ class IntailFIFO < Test::Unit::TestCase
end
end

sub_test_case '#bytesize' do
test 'reutrns buffer size' do
sub_test_case '#reading_bytesize' do
test 'returns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = "test\n" * 3 + 'test'
fifo << text

assert_equal text.bytesize, fifo.bytesize
assert_equal text.bytesize, fifo.reading_bytesize
lines = []
fifo.read_lines(lines)
assert_equal ["test\n", "test\n", "test\n"], lines

assert_equal 'test'.bytesize, fifo.bytesize
assert_equal 'test'.bytesize, fifo.reading_bytesize
fifo << "2\n"
fifo.read_lines(lines)
assert_equal ["test\n", "test\n", "test\n", "test2\n"], lines

assert_equal 0, fifo.bytesize
assert_equal 0, fifo.reading_bytesize
end

test 'returns the entire line size even if the size is over max_line_size' do
max_line_size = 20
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

text = "long line still not having EOL"
fifo << text
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal text.bytesize, fifo.reading_bytesize

text2 = " following texts"
fifo << text2
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal text.bytesize + text2.bytesize, fifo.reading_bytesize

text3 = " end of the line\n"
fifo << text3
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal 0, fifo.reading_bytesize
end
end
end
151 changes: 131 additions & 20 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,33 +148,144 @@ def create_watcher
end
end

test 'does not call receive_lines when line_size exceeds max_line_size' do
t = 'x' * (8192)
text = "#{t}\n"
sub_test_case 'max_line_size' do
test 'does not call receive_lines when line_size exceeds max_line_size' do
t = 'x' * (8192)
text = "#{t}\n"

max_line_size = 8192
max_line_size = 8192

@file.write(text)
@file.close
@file.write(text)
@file.close

update_pos = 0
update_pos = 0

watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos {0}
stub(pe).update_pos { |val| update_pos = val }
pe
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos {0}
stub(pe).update_pos { |val| update_pos = val }
pe
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end

r.on_notify
assert_equal text.bytesize, update_pos
assert_equal 0, returned_lines.size
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
data(
"open_on_every_update false" => false,
"open_on_every_update true" => true,
)
test 'manage pos correctly if a long line not having EOL occurs' do |open_on_every_update|
max_line_size = 20
returned_lines = []
pos = 0

watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { pos }
stub(pe).update_pos { |val| pos = val }
pe
end

io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(
watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1,
max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update,
metrics: @metrics
) do |lines, _watcher|
returned_lines << lines.dup
true
end

short_line = "short line\n"
long_lines = [
"long line still not having EOL",
" end of the line\n",
]

@file.write(short_line)
@file.write(long_lines[0])
@file.flush
io_handler.on_notify

assert_equal [[short_line]], returned_lines
assert_equal short_line.bytesize, pos

@file.write(long_lines[1])
@file.flush
io_handler.on_notify

assert_equal [[short_line]], returned_lines
expected_size = short_line.bytesize + long_lines[0..1].map{|l| l.bytesize}.sum
assert_equal expected_size, pos

io_handler.close
end

r.on_notify
assert_equal text.bytesize, update_pos
assert_equal 0, returned_lines.size
data(
"open_on_every_update false" => false,
"open_on_every_update true" => true,
)
test 'discards a subsequent data in a long line even if restarting occurs between' do |open_on_every_update|
max_line_size = 20
returned_lines = []
pos = 0

watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { pos }
stub(pe).update_pos { |val| pos = val }
pe
end

io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(
watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1,
max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update,
metrics: @metrics
) do |lines, _watcher|
returned_lines << lines.dup
true
end

short_line = "short line\n"
long_lines = [
"long line still not having EOL",
" end of the line\n",
]

@file.write(short_line)
@file.write(long_lines[0])
@file.flush
io_handler.on_notify

assert_equal [[short_line]], returned_lines

io_handler.close
io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(
watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1,
max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update,
metrics: @metrics
) do |lines, _watcher|
returned_lines << lines.dup
true
end

@file.write(long_lines[1])
@file.flush
io_handler.on_notify

assert_equal [[short_line]], returned_lines

io_handler.close
end
end
end

0 comments on commit fdf74e5

Please sign in to comment.