Skip to content

Commit

Permalink
Merge pull request #1421 from jitran/emit-unmatched-lines
Browse files Browse the repository at this point in the history
Capture unmatched lines
  • Loading branch information
repeatedly committed Jan 26, 2017
1 parent 60b6bd9 commit f57a6aa
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
10 changes: 10 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def initialize
config_param :read_lines_limit, :integer, default: 1000
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
config_param :emit_unmatched_lines, :bool, default: false
desc 'Enable the additional watch timer.'
config_param :enable_watch_timer, :bool, default: true
desc 'The encoding after conversion of the input.'
Expand Down Expand Up @@ -315,6 +317,11 @@ def convert_line_to_event(line, es, tail_watcher)
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(time, record)
else
if @emit_unmatched_lines
record = {'unmatched_line' => line}
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(::Fluent::Engine.now, record)
end
log.warn "pattern not match: #{line.inspect}"
end
}
Expand Down Expand Up @@ -345,6 +352,9 @@ def parse_multilines(lines, tail_watcher)
lb = line
else
if lb.nil?
if @emit_unmatched_lines
convert_line_to_event(line, es, tail_watcher)
end
log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
else
lb << line
Expand Down
67 changes: 66 additions & 1 deletion test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def test_emit
}

d = create_driver

d.run do
sleep 1

Expand All @@ -115,6 +114,34 @@ def test_emit
assert_equal(1, d.emit_streams.size)
end

def test_emit_with_emit_unmatched_lines_true
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(%[
format /^(?<message>test.*)/
emit_unmatched_lines true
])
d.run do
sleep 1

File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "test line 1"
f.puts "test line 2"
f.puts "bad line 1"
f.puts "test line 3"
}

sleep 1
end

events = d.emits
assert_equal(4, events.length)
assert_equal({"message" => "test line 1"}, events[0][2])
assert_equal({"message" => "test line 2"}, events[1][2])
assert_equal({"unmatched_line" => "bad line 1"}, events[2][2])
assert_equal({"message" => "test line 3"}, events[3][2])
end

data('1' => [1, 2], '10' => [10, 1])
def test_emit_with_read_lines_limit(data)
limit, num_emits = data
Expand Down Expand Up @@ -419,6 +446,44 @@ def test_multiline
assert_equal({"message1" => "test8"}, emits[3][2])
end

def test_multiline_with_emit_unmatched_lines_true
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver %[
format multiline
format1 /^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.*))?/
format_firstline /^[s]/
emit_unmatched_lines true
]
d.run do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "f test1"
f.puts "s test2"
f.puts "f test3"
f.puts "f test4"
f.puts "s test5"
f.puts "s test6"
f.puts "f test7"
f.puts "s test8"
}
sleep 1

events = d.emits
assert_equal(4, events.length)
assert_equal({"unmatched_line" => "f test1"}, events[0][2])
assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[1][2])
assert_equal({"message1" => "test5"}, events[2][2])
assert_equal({"message1" => "test6", "message2" => "test7"}, events[3][2])

sleep 3
assert_equal(4, d.emits.length)
end

emits = d.emits
assert_equal(5, emits.length)
assert_equal({"message1" => "test8"}, emits[4][2])
end

def test_multiline_with_flush_interval
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

Expand Down

0 comments on commit f57a6aa

Please sign in to comment.