Skip to content

Commit

Permalink
Merge pull request #2553 from dg-i/feature_add_source_to_unmatched
Browse files Browse the repository at this point in the history
add support for source_hostname_key and source_address_key on umatched syslog messages
  • Loading branch information
repeatedly authored Aug 14, 2019
2 parents 785fe3d + 22ebe9b commit f2b24f5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
13 changes: 10 additions & 3 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,21 @@ def start_tcp_server(tls: false)

private

def emit_unmatched(data, sock)
record = {"unmatched_line" => data}
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key
emit("#{@tag}.unmatched", Fluent::EventTime.now, record)
end

def message_handler(data, sock)
pri = nil
text = data
unless @parser_parse_priority
m = SYSLOG_REGEXP.match(data)
unless m
if @emit_unmatched_lines
emit("#{@tag}.unmatched", Fluent::EventTime.now, {"unmatched_line" => data})
emit_unmatched(data, sock)
end
log.warn "invalid syslog message: #{data.dump}"
return
Expand All @@ -218,7 +225,7 @@ def message_handler(data, sock)
@parser.parse(text) do |time, record|
unless time && record
if @emit_unmatched_lines
emit("#{@tag}.unmatched", Fluent::EventTime.now, {"unmatched_line" => text})
emit_unmatched(data, sock)
end
log.warn "failed to parse message", data: data
return
Expand All @@ -238,7 +245,7 @@ def message_handler(data, sock)
end
rescue => e
if @emit_unmatched_lines
emit("#{@tag}.unmatched", Fluent::EventTime.now, {"unmatched_line" => text})
emit_unmatched(data, sock)
end
log.error "invalid input", data: data, error: e
log.error_backtrace
Expand Down
62 changes: 56 additions & 6 deletions test/plugin/test_in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,31 @@ def create_test_case(large_message: false)
end
end

def test_emit_unmatched_lines
d = create_driver([CONFIG, 'emit_unmatched_lines true'].join("\n"))
tests = [
def create_unmatched_lines_test_case
[
# valid message
{'msg' => '<6>Sep 10 00:00:00 localhost logger: xxx', 'expected' => {'host'=>'localhost', 'ident'=>'logger', 'message'=>'xxx'}},
# missing priority
{'msg' => 'hello world', 'expected' => {'unmatched_line' => 'hello world'}},
# timestamp parsing failure
{'msg' => '<6>ZZZ 99 99:99:99 localhost logger: xxx', 'expected' => {'unmatched_line' => '<6>ZZZ 99 99:99:99 localhost logger: xxx'}},
]
end

def compare_unmatched_lines_test_result(events, tests, options = {})
events.each_index { |i|
tests[i]['expected'].each { |k,v|
assert_equal v, events[i][2][k], "No key <#{k}> in response or value mismatch"
}
assert_equal('syslog.unmatched', events[i][0], 'tag does not match syslog.unmatched') unless i==0
assert_equal(options[:address], events[i][2]['source_address'], 'response has no source_address or mismatch') if options[:address]
assert_equal(options[:hostname], events[i][2]['source_hostname'], 'response has no source_hostname or mismatch') if options[:hostname]
}
end

def test_emit_unmatched_lines
d = create_driver([CONFIG, 'emit_unmatched_lines true'].join("\n"))
tests = create_unmatched_lines_test_case

d.run(expect_emits: 3) do
u = UDPSocket.new
Expand All @@ -383,9 +398,44 @@ def test_emit_unmatched_lines
end

assert_equal tests.size, d.events.size
tests.size.times do |i|
assert_equal tests[i]['expected'], d.events[i][2]
assert_equal 'syslog.unmatched', d.events[i][0] unless i==0
compare_unmatched_lines_test_result(d.events, tests)
end

def test_emit_unmatched_lines_with_hostname
d = create_driver([CONFIG, 'emit_unmatched_lines true', 'source_hostname_key source_hostname'].join("\n"))
tests = create_unmatched_lines_test_case

hostname = nil
d.run(expect_emits: 3) do
u = UDPSocket.new
u.do_not_reverse_lookup = false
u.connect('127.0.0.1', PORT)
hostname = u.peeraddr[2]
tests.each {|test|
u.send(test['msg'], 0)
}
end

assert_equal tests.size, d.events.size
compare_unmatched_lines_test_result(d.events, tests, {hostname: hostname})
end

def test_emit_unmatched_lines_with_address
d = create_driver([CONFIG, 'emit_unmatched_lines true', 'source_address_key source_address'].join("\n"))
tests = create_unmatched_lines_test_case

address = nil
d.run(expect_emits: 3) do
u = UDPSocket.new
u.do_not_reverse_lookup = false
u.connect('127.0.0.1', PORT)
address = u.peeraddr[3]
tests.each {|test|
u.send(test['msg'], 0)
}
end

assert_equal tests.size, d.events.size
compare_unmatched_lines_test_result(d.events, tests, {address: address})
end
end

0 comments on commit f2b24f5

Please sign in to comment.