Skip to content

Commit

Permalink
Merge pull request #2399 from ganmacs/support-tls-in-syslog
Browse files Browse the repository at this point in the history
Support tls in syslog
  • Loading branch information
repeatedly authored May 17, 2019
2 parents f3a6066 + bf319e9 commit 3ac6966
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
18 changes: 12 additions & 6 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SyslogInput < Input
desc 'The prefix of the tag. The tag itself is generated by the tag prefix, facility level, and priority.'
config_param :tag, :string
desc 'The transport protocol used to receive logs.(udp, tcp)'
config_param :protocol_type, :enum, list: [:tcp, :udp], default: :udp
config_param :protocol_type, :enum, list: [:tcp, :udp], default: nil, deprecated: "use transport directive"
desc 'The message frame type.(traditional, octet_count)'
config_param :frame_type, :enum, list: [:traditional, :octet_count], default: :traditional

Expand Down Expand Up @@ -107,6 +107,11 @@ class SyslogInput < Input
config_param :with_priority, :bool, default: true
end

# overwrite server plugin to change default to :udp
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: [:tcp, :udp, :tls], default: :udp
end

def configure(conf)
compat_parameters_convert(conf, :parser)

Expand Down Expand Up @@ -141,12 +146,13 @@ def multi_workers_ready?
def start
super

log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
case @protocol_type
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type || @transport_config.protocol}"
case @protocol_type || @transport_config.protocol
when :udp then start_udp_server
when :tcp then start_tcp_server
when :tls then start_tcp_server(tls: true)
else
raise "BUG: invalid protocol_type value:#{@protocol_type}"
raise "BUG: invalid transport value: #{@protocol_type || @transport_config.protocol}"
end
end

Expand All @@ -156,12 +162,12 @@ def start_udp_server
end
end

def start_tcp_server
def start_tcp_server(tls: false)
octet_count_frame = @frame_type == :octet_count

delimiter = octet_count_frame ? " " : @delimiter
delimiter_size = delimiter.size
server_create_connection(:in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn|
server_create_connection(tls ? :in_syslog_tls_server : :in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn|
conn.data do |data|
buffer = conn.buffer
buffer << data
Expand Down
37 changes: 33 additions & 4 deletions test/plugin/test_in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ def test_configure_resolve_hostname(param)
end
end

data('Use protocol_type' => ['protocol_type tcp', :tcp, :udp],
'Use transport' => ["<transport tcp>\n </transport>", nil, :tcp],
'Use transport and protocol' => ["protocol_type udp\n<transport tcp>\n </transport>", :udp, :tcp])
def test_configure_protocol(param)
conf, proto_type, transport_proto_type = *param
d = create_driver([CONFIG, conf].join("\n"))

assert_equal(d.instance.protocol_type, proto_type)
assert_equal(d.instance.transport_config.protocol, transport_proto_type)
end

# For backward compat
def test_respect_protocol_type_than_transport
d = create_driver([CONFIG, "<transport tcp> \n</transport>", "protocol_type udp"].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
u = UDPSocket.new
u.connect('127.0.0.1', PORT)
tests.each {|test|
u.send(test['msg'], 0)
}
end

assert(d.events.size > 0)
compare_test_result(d.events, tests)
end


data(
ipv4: ['127.0.0.1', CONFIG, ::Socket::AF_INET],
ipv6: ['::1', IPv6_CONFIG, ::Socket::AF_INET6],
Expand Down Expand Up @@ -119,7 +148,7 @@ def test_msg_size_udp_for_large_msg
end

def test_msg_size_with_tcp
d = create_driver([CONFIG, 'protocol_type tcp'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>"].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand All @@ -135,7 +164,7 @@ def test_msg_size_with_tcp
end

def test_msg_size_with_same_tcp_connection
d = create_driver([CONFIG, 'protocol_type tcp'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>"].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand Down Expand Up @@ -289,7 +318,7 @@ def compare_test_result(events, tests, options = {})

sub_test_case 'octet counting frame' do
def test_msg_size_with_tcp
d = create_driver([CONFIG, 'protocol_type tcp', 'frame_type octet_count'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>", 'frame_type octet_count'].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand All @@ -305,7 +334,7 @@ def test_msg_size_with_tcp
end

def test_msg_size_with_same_tcp_connection
d = create_driver([CONFIG, 'protocol_type tcp', 'frame_type octet_count'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>", 'frame_type octet_count'].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand Down

0 comments on commit 3ac6966

Please sign in to comment.