Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture unmatched lines #1421

Merged
merged 4 commits into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def initialize
config_param :read_lines_limit, :integer, default: 1000
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
config_param :emit_unmatched_lines, :bool, default: false
desc 'Enable the additional watch timer.'
config_param :enable_watch_timer, :bool, default: true
desc 'The encoding after conversion of the input.'
Expand Down Expand Up @@ -357,6 +359,11 @@ def convert_line_to_event(line, es, tail_watcher)
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(time, record)
else
if @emit_unmatched_lines
record = {'unmatched_line' => line}
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(Fluent::EventTime.now, record)
end
log.warn "pattern not match: #{line.inspect}"
end
}
Expand Down Expand Up @@ -387,6 +394,9 @@ def parse_multilines(lines, tail_watcher)
lb = line
else
if lb.nil?
if @emit_unmatched_lines
convert_line_to_event(line, es, tail_watcher)
end
log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
else
lb << line
Expand Down
51 changes: 51 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ def test_emit(data)
assert_equal(1, d.emit_count)
end

def test_emit_with_emit_unmatched_lines_true
config = config_element("", "", { "format" => "/^(?<message>test.*)/", "emit_unmatched_lines" => true })
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(config)
d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
f.puts "test line 1"
f.puts "test line 2"
f.puts "bad line 1"
f.puts "test line 3"
}
end

events = d.events
assert_equal(4, events.length)
assert_equal({"message" => "test line 1"}, events[0][2])
assert_equal({"message" => "test line 2"}, events[1][2])
assert_equal({"unmatched_line" => "bad line 1"}, events[2][2])
assert_equal({"message" => "test line 3"}, events[3][2])
end

data('flat 1' => [:flat, 1, 2],
'flat 10' => [:flat, 10, 1],
'parse 1' => [:parse, 1, 2],
Expand Down Expand Up @@ -516,6 +538,35 @@ def test_multiline(data)
assert_equal({"message1" => "test8"}, events[3][2])
end

data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_emit_unmatched_lines_true(data)
config = data + config_element("", "", { "emit_unmatched_lines" => true })
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver(config)
d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "f test1"
f.puts "s test2"
f.puts "f test3"
f.puts "f test4"
f.puts "s test5"
f.puts "s test6"
f.puts "f test7"
f.puts "s test8"
}
end

events = d.events
assert_equal(5, events.length)
assert_equal({"unmatched_line" => "f test1"}, events[0][2])
assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[1][2])
assert_equal({"message1" => "test5"}, events[2][2])
assert_equal({"message1" => "test6", "message2" => "test7"}, events[3][2])
assert_equal({"message1" => "test8"}, events[4][2])
end

data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
def test_multiline_with_flush_interval(data)
Expand Down