Skip to content

Commit

Permalink
in_tail: add option to skip long lines (maximum_line_limit_size)
Browse files Browse the repository at this point in the history
There is a case that processing unexpected long line causes a
BufferChunkOverflow exception.

With #3562 (Fluentd v1.14.3), it can just ignore only problematic long
record during processing buffer chunk, but it always raise an
exception. Thus it has performance regression.

In this fix, maximum_line_limit_size option enables to just skip processing
event further more in in_tail plugin side.

Signed-off-by: Kentaro Hayashi <[email protected]>
  • Loading branch information
kenhys committed Dec 1, 2021
1 parent 438a82a commit 80459aa
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def initialize
config_param :path_timezone, :string, default: nil
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
config_param :follow_inodes, :bool, default: false
desc 'Maximum length of line. The longer line is just skipped.'
config_param :maximum_line_limit_size, :size, default: nil

config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
config_argument :usage, :string, default: 'in_tail_parser'
Expand Down Expand Up @@ -594,6 +596,14 @@ def flush_buffer(tw, buf)

# @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
def receive_lines(lines, tail_watcher)
lines = lines.reject do |line|
skip_line = @maximum_line_limit_size ? line.bytesize > @maximum_line_limit_size : false
if skip_line
log.warn "received line length is longer than #{@maximum_line_limit_size}"
log.debug "skipped line: #{line.chomp}"
end
skip_line
end
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
tag = if @tag_prefix || @tag_suffix
Expand Down
35 changes: 35 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1707,6 +1707,41 @@ def test_tag_prefix_and_suffix_ignore
mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once
plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log'))
end

data(
small: ["128", 128],
KiB: ["1k", 1024]
)
test 'maximum_line_limit_size' do |(label, size)|
config = config_element("", "", {
"tag" => "maximum_line_limit_size",
"path" => "#{TMP_DIR}/with_long_lines.txt",
"format" => "none",
"read_from_head" => true,
"maximum_line_limit_size" => label,
"log_level" => "debug"
})
File.open("#{TMP_DIR}/with_long_lines.txt", "w+") do |f|
f.puts "foo"
f.puts "x" * size # 'x' * size + \n > @skip_long_lines
f.puts "bar"
end
d = create_driver(config, false)
timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021")
Timecop.freeze(timestamp)
d.run(expect_records: 2)
assert_equal([
[{"message" => "foo"},{"message" => "bar"}],
[
"2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n",
"2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n"
]
],
[
d.events.collect { |event| event.last },
d.logs[-2..]
])
end
end

# Ensure that no fatal exception is raised when a file is missing and that
Expand Down

0 comments on commit 80459aa

Please sign in to comment.