Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the data loss in in_tail when emit_unmatched_lines is enabled. #2837

Merged
merged 4 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ def configure(conf)
parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"]
end

parser_config['unmatched_lines'] = conf['emit_unmatched_lines']

super

if !@enable_watch_timer && !@enable_stat_watcher
Expand Down Expand Up @@ -447,6 +449,16 @@ def flush_buffer(tw, buf)
record[@path_key] ||= tw.path unless @path_key.nil?
router.emit(tag, time, record)
else
if @emit_unmatched_lines
record = { 'unmatched_line' => buf }
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
tag = if @tag_prefix || @tag_suffix
@tag_prefix + tw.tag + @tag_suffix
else
@tag
end
router.emit(tag, Fluent::EventTime.now, record)
end
log.warn "got incomplete line at shutdown from #{tw.path}: #{buf.inspect}"
end
}
Expand Down
50 changes: 48 additions & 2 deletions lib/fluent/plugin/parser_multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,33 @@ class MultilineParser < Parser

desc 'Specify regexp pattern for start line of multiple lines'
config_param :format_firstline, :string, default: nil
desc 'Enable an option returning line as unmatched_line'
config_param :unmatched_lines, :string, default: nil

FORMAT_MAX_NUM = 20

class MultilineRegexpParser < Fluent::Plugin::RegexpParser
def parse(text)
m = @expression.match(text)
unless m
yield nil, nil
return m
end

r = {}
m.names.each do |name|
if (value = m[name])
r[name] = value
end
end

time, record = convert_values(parse_time(r), r)

yield(time, record)
m
end
end

def configure(conf)
ganmacs marked this conversation as resolved.
Show resolved Hide resolved
super

Expand All @@ -37,7 +61,7 @@ def configure(conf)
raise "No named captures"
end
regexp_conf = Fluent::Config::Element.new("", "", { "expression" => "/#{formats}/m" }, [])
@parser = Fluent::Plugin::RegexpParser.new
@parser = Fluent::Plugin::MultilineParser::MultilineRegexpParser.new
@parser.configure(conf + regexp_conf)
rescue => e
raise Fluent::ConfigError, "Invalid regexp '#{formats}': #{e}"
Expand All @@ -50,7 +74,29 @@ def configure(conf)
end

def parse(text, &block)
@parser.call(text, &block)
loop do
m =
if @unmatched_lines
@parser.call(text) do |time, record|
if time && record
yield(time, record)
else
yield(Fluent::EventTime.now, { 'unmatched_line' => text })
end
end
else
@parser.call(text, &block)
end

return if m.nil?

text = m.post_match
if text.start_with?("\n")
text = text[1..-1]
end

return if text.empty?
end
end

def has_firstline?
Expand Down
43 changes: 43 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ def cleanup_directory(path)
})
])

MULTILINE_CONFIG_WITH_NEWLINE = config_element(
"", "", {
"format" => "multiline",
"format1" => "/^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.[^\\n]+))?/",
"format_firstline" => "/^[s]/"
})
PARSE_MULTILINE_CONFIG_WITH_NEWLINE = config_element(
"", "", {},
[config_element("parse", "", {
"@type" => "multiline",
"format1" => "/^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.[^\\n]+))?/",
"format_firstline" => "/^[s]/"
})
])

def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
config = use_common_conf ? COMMON_CONFIG + conf : conf
Fluent::Test::Driver::Input.new(Fluent::Plugin::TailInput).configure(config)
Expand Down Expand Up @@ -741,6 +756,34 @@ def test_multiline_with_emit_unmatched_lines_true(data)
assert_equal({"message1" => "test8"}, events[4][2])
end

data(
flat: MULTILINE_CONFIG_WITH_NEWLINE,
parse: PARSE_MULTILINE_CONFIG_WITH_NEWLINE)
def test_multiline_with_emit_unmatched_lines2(data)
config = data + config_element("", "", { "emit_unmatched_lines" => true })
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(config)
d.run(expect_emits: 0, timeout: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "s test0"
f.puts "f test1"
f.puts "f test2"

f.puts "f test3"

f.puts "s test4"
f.puts "f test5"
f.puts "f test6"
}
end

events = d.events
assert_equal({"message1" => "test0", "message2" => "test1", "message3" => "test2"}, events[0][2])
assert_equal({ 'unmatched_line' => "f test3" }, events[1][2])
assert_equal({"message1" => "test4", "message2" => "test5", "message3" => "test6"}, events[2][2])
end

data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_flush_interval(data)
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_parser_multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,15 @@ def test_parse_with_keep_time_key
assert_equal text, record['time']
}
end

def test_parse_unmatched_lines
parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::MultilineParser).configure(
'format1' => '/^message (?<message_id>\d)/',
'unmatched_lines' => true,
)
text = "message 1\nmessage a"
r = []
parser.instance.parse(text) { |_, record| r << record }
assert_equal [{ 'message_id' => '1' }, { 'unmatched_line' => 'message a'}], r
end
end