Skip to content

Commit

Permalink
return unmatch_line to avoid data loss
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Feb 25, 2020
1 parent 7809c17 commit 67f4055
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 4 deletions.
2 changes: 2 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def configure(conf)
parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"]
end

parser_config['unmatched_lines'] = conf['emit_unmatched_lines']

super

if !@enable_watch_timer && !@enable_stat_watcher
Expand Down
49 changes: 47 additions & 2 deletions lib/fluent/plugin/parser_multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,32 @@ class MultilineParser < Parser

desc 'Specify regexp pattern for start line of multiple lines'
config_param :format_firstline, :string, default: nil
desc 'Enable an option returning line as unmatched_line'
config_param :unmatched_lines, :string, default: nil

FORMAT_MAX_NUM = 20

class MultilineRegexpParser < Fluent::Plugin::RegexpParser
def parse(text)
m = @expression.match(text)
unless m
yield nil, nil
return m
end

r = {}
m.names.each do |name|
if (value = m[name])
r[name] = value
end
end

time, record = convert_values(parse_time(r), r)

yield(time, record)
m
end
end
def configure(conf)
super

Expand All @@ -37,7 +60,7 @@ def configure(conf)
raise "No named captures"
end
regexp_conf = Fluent::Config::Element.new("", "", { "expression" => "/#{formats}/m" }, [])
@parser = Fluent::Plugin::RegexpParser.new
@parser = Fluent::Plugin::MultilineParser::MultilineRegexpParser.new
@parser.configure(conf + regexp_conf)
rescue => e
raise Fluent::ConfigError, "Invalid regexp '#{formats}': #{e}"
Expand All @@ -50,7 +73,29 @@ def configure(conf)
end

def parse(text, &block)
@parser.call(text, &block)
loop do
m =
if @unmatched_lines
@parser.call(text) do |time, record|
if time && record
yield(time, record)
else
yield(Fluent::EventTime.now, { 'unmatched_line' => text })
end
end
else
@parser.call(text, &block)
end

return if m.nil?

text = m.post_match
if text.start_with?("\n")
text = text[1..-1]
end

return if text.empty?
end
end

def has_firstline?
Expand Down
32 changes: 30 additions & 2 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ def cleanup_directory(path)
MULTILINE_CONFIG = config_element(
"", "", {
"format" => "multiline",
"format1" => "/^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.*))?/",
"format1" => "/^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.[^\\n]+))?/",
"format_firstline" => "/^[s]/"
})
PARSE_MULTILINE_CONFIG = config_element(
"", "", {},
[config_element("parse", "", {
"@type" => "multiline",
"format1" => "/^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.*))?/",
"format1" => "/^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>[^\\n]+))?/",
"format_firstline" => "/^[s]/"
})
])
Expand Down Expand Up @@ -741,6 +741,34 @@ def test_multiline_with_emit_unmatched_lines_true(data)
assert_equal({"message1" => "test8"}, events[4][2])
end

data(
# flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_emit_unmatched_lines2(data)
config = data + config_element("", "", { "emit_unmatched_lines" => true })
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(config)
d.run(expect_emits: 0, timeout: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "s test0"
f.puts "f test1"
f.puts "f test2"

f.puts "f test3"

f.puts "s test4"
f.puts "f test5"
f.puts "f test6"
}
end

events = d.events
assert_equal({"message1" => "test0", "message2" => "test1", "message3" => "test2"}, events[0][2])
assert_equal({ 'unmatched_line' => "\nf test3" }, events[1][2])
assert_equal({"message1" => "test4", "message2" => "test5", "message3" => "test6"}, events[2][2])
end

data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_flush_interval(data)
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_parser_multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,15 @@ def test_parse_with_keep_time_key
assert_equal text, record['time']
}
end

def test_parse_unmatched_lines
parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::MultilineParser).configure(
'format1' => '/^message (?<message_id>\d)/',
'unmatched_lines' => true,
)
text = "message 1\nmessage a"
r = []
parser.instance.parse(text) { |_, record| r << record }
assert_equal [{ 'message_id' => '1' }, { 'unmatched_line' => 'message a'}], r
end
end

0 comments on commit 67f4055

Please sign in to comment.