Skip to content

Commit

Permalink
Check the remote connection is available in the in_forward plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroaki Izu <[email protected]>
  • Loading branch information
akihiro17 committed Mar 28, 2019
1 parent 3566901 commit 4a838d6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 3 deletions.
7 changes: 7 additions & 0 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ForwardInput < Input
config_param :resolve_hostname, :bool, default: nil
desc 'Connections will be disconnected right after receiving first message if this value is true.'
config_param :deny_keepalive, :bool, default: false
desc 'Check the remote connection is still available by sending a keepalive packet if this value is true.'
config_param :send_keepalive_packet, :bool, default: false

desc 'Log warning if received chunk size is larger than this value.'
config_param :chunk_size_warn_limit, :size, default: nil
Expand Down Expand Up @@ -141,6 +143,10 @@ def configure(conf)
})
end
end

if @send_keepalive_packet && @deny_keepalive
raise Fluent::ConfigError, "both 'send_keepalive_packet' and 'deny_keepalive' cannot be set to true"
end
end

def multi_workers_ready?
Expand All @@ -161,6 +167,7 @@ def start
shared: shared_socket,
resolve_name: @resolve_hostname,
linger_timeout: @linger_timeout,
send_keepalive_packet: @send_keepalive_packet,
backlog: @backlog,
&method(:handle_connection)
)
Expand Down
12 changes: 10 additions & 2 deletions lib/fluent/plugin_helper/socket_option.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module SocketOption
FORMAT_STRUCT_LINGER = 'I!I!' # { int l_onoff; int l_linger; }
FORMAT_STRUCT_TIMEVAL = 'L!L!' # { time_t tv_sec; suseconds_t tv_usec; }

def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil)
def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil, send_keepalive_packet: nil)
unless resolve_name.nil?
if protocol != :tcp && protocol != :udp && protocol != :tls
raise ArgumentError, "BUG: resolve_name in available for tcp/udp/tls"
Expand All @@ -35,9 +35,14 @@ def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, re
raise ArgumentError, "BUG: linger_timeout is available for tcp/tls"
end
end
if send_keepalive_packet
if protocol != :tcp
raise ArgumentError, "BUG: send_keepalive_packet is available for tcp"
end
end
end

def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil)
def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil, send_keepalive_packet: nil)
unless resolve_name.nil?
sock.do_not_reverse_lookup = !resolve_name
end
Expand All @@ -59,6 +64,9 @@ def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout:
if receive_buffer_size
socket_option_set_one(sock, :SO_RCVBUF, receive_buffer_size.to_i)
end
if send_keepalive_packet
socket_option_set_one(sock, :SO_KEEPALIVE, true)
end
sock
end

Expand Down
21 changes: 21 additions & 0 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ def create_driver(conf=CONFIG)
assert_equal 1, d.instance.security.users.size
assert_equal 1, d.instance.security.clients.size
end

test 'send_keepalive_packet is disabled by default' do
@d = d = create_driver(CONFIG_AUTH)
assert_false d.instance.send_keepalive_packet
end

test 'send_keepalive_packet can be enabled' do
@d = d = create_driver(CONFIG_AUTH + %[
send_keepalive_packet true
])
assert_true d.instance.send_keepalive_packet
end

test 'both send_keepalive_packet and deny_keepalive cannot be enabled' do
assert_raise(Fluent::ConfigError.new("both 'send_keepalive_packet' and 'deny_keepalive' cannot be set to true")) do
create_driver(CONFIG_AUTH + %[
send_keepalive_packet true
deny_keepalive true
])
end
end
end

sub_test_case 'message' do
Expand Down
11 changes: 10 additions & 1 deletion test/plugin_helper/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ class Dummy < Fluent::Plugin::TestBase
end
end

data(
'server_create udp' => [:server_create, :udp],
)
test 'raise error if tcp/tls/unix options specified for udp' do |(m, proto)|
assert_raise(ArgumentError.new("BUG: send_keepalive_packet is available for tcp")) do
@d.__send__(m, :myserver, PORT, proto: proto, send_keepalive_packet: true){|x| x }
end
end

data(
'server_create tcp' => [:server_create, :tcp, {}],
'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
Expand Down Expand Up @@ -352,7 +361,7 @@ class Dummy < Fluent::Plugin::TestBase
sub_test_case '#server_create_tcp' do
test 'can accept all keyword arguments valid for tcp server' do
assert_nothing_raised do
@d.server_create_tcp(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500) do |data, conn|
@d.server_create_tcp(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500, send_keepalive_packet: true) do |data, conn|
# ...
end
end
Expand Down

0 comments on commit 4a838d6

Please sign in to comment.