Skip to content

Commit

Permalink
Merge pull request #1421 from jitran/emit-unmatched-lines
Browse files Browse the repository at this point in the history
Capture unmatched lines
  • Loading branch information
repeatedly authored Jan 25, 2017
2 parents 26ab1bd + d874db1 commit 19eb856
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def initialize
config_param :read_lines_limit, :integer, default: 1000
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
config_param :emit_unmatched_lines, :bool, default: false
desc 'Enable the additional watch timer.'
config_param :enable_watch_timer, :bool, default: true
desc 'The encoding after conversion of the input.'
Expand Down Expand Up @@ -345,6 +347,11 @@ def convert_line_to_event(line, es, tail_watcher)
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(time, record)
else
if @emit_unmatched_lines
record = {'unmatched_line' => line}
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(Fluent::EventTime.now, record)
end
log.warn "pattern not match: #{line.inspect}"
end
}
Expand Down Expand Up @@ -375,6 +382,9 @@ def parse_multilines(lines, tail_watcher)
lb = line
else
if lb.nil?
if @emit_unmatched_lines
convert_line_to_event(line, es, tail_watcher)
end
log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
else
lb << line
Expand Down
51 changes: 51 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,28 @@ def test_emit(data)
assert_equal(1, d.emit_count)
end

def test_emit_with_emit_unmatched_lines_true
config = config_element("", "", { "format" => "/^(?<message>test.*)/", "emit_unmatched_lines" => true })
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(config)
d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
f.puts "test line 1"
f.puts "test line 2"
f.puts "bad line 1"
f.puts "test line 3"
}
end

events = d.events
assert_equal(4, events.length)
assert_equal({"message" => "test line 1"}, events[0][2])
assert_equal({"message" => "test line 2"}, events[1][2])
assert_equal({"unmatched_line" => "bad line 1"}, events[2][2])
assert_equal({"message" => "test line 3"}, events[3][2])
end

data('flat 1' => [:flat, 1, 2],
'flat 10' => [:flat, 10, 1],
'parse 1' => [:parse, 1, 2],
Expand Down Expand Up @@ -555,6 +577,35 @@ def test_multiline(data)
assert_equal({"message1" => "test8"}, events[3][2])
end

data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_emit_unmatched_lines_true(data)
config = data + config_element("", "", { "emit_unmatched_lines" => true })
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(config)
d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "f test1"
f.puts "s test2"
f.puts "f test3"
f.puts "f test4"
f.puts "s test5"
f.puts "s test6"
f.puts "f test7"
f.puts "s test8"
}
end

events = d.events
assert_equal(5, events.length)
assert_equal({"unmatched_line" => "f test1"}, events[0][2])
assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[1][2])
assert_equal({"message1" => "test5"}, events[2][2])
assert_equal({"message1" => "test6", "message2" => "test7"}, events[3][2])
assert_equal({"message1" => "test8"}, events[4][2])
end

data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_flush_interval(data)
Expand Down

0 comments on commit 19eb856

Please sign in to comment.