diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 401e2ca411..173b0179bd 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -47,6 +47,8 @@ class ForwardInput < Input config_param :resolve_hostname, :bool, default: nil desc 'Connections will be disconnected right after receiving first message if this value is true.' config_param :deny_keepalive, :bool, default: false + desc 'Check the remote connection is still available by sending a keepalive packet if this value is true.' + config_param :send_keepalive_packet, :bool, default: false desc 'Log warning if received chunk size is larger than this value.' config_param :chunk_size_warn_limit, :size, default: nil @@ -141,6 +143,10 @@ def configure(conf) }) end end + + if @send_keepalive_packet && @deny_keepalive + raise Fluent::ConfigError, "both 'send_keepalive_packet' and 'deny_keepalive' cannot be set to true" + end end def multi_workers_ready? @@ -161,6 +167,7 @@ def start shared: shared_socket, resolve_name: @resolve_hostname, linger_timeout: @linger_timeout, + send_keepalive_packet: @send_keepalive_packet, backlog: @backlog, &method(:handle_connection) ) diff --git a/lib/fluent/plugin_helper/socket_option.rb b/lib/fluent/plugin_helper/socket_option.rb index 7eb82b6a9c..6e357c1741 100644 --- a/lib/fluent/plugin_helper/socket_option.rb +++ b/lib/fluent/plugin_helper/socket_option.rb @@ -24,7 +24,7 @@ module SocketOption FORMAT_STRUCT_LINGER = 'I!I!' # { int l_onoff; int l_linger; } FORMAT_STRUCT_TIMEVAL = 'L!L!' # { time_t tv_sec; suseconds_t tv_usec; } - def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil) + def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil, send_keepalive_packet: nil) unless resolve_name.nil? if protocol != :tcp && protocol != :udp && protocol != :tls raise ArgumentError, "BUG: resolve_name in available for tcp/udp/tls" @@ -35,9 +35,14 @@ def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, re raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" end end + if send_keepalive_packet + if protocol != :tcp + raise ArgumentError, "BUG: send_keepalive_packet is available for tcp" + end + end end - def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil) + def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, receive_buffer_size: nil, send_keepalive_packet: nil) unless resolve_name.nil? sock.do_not_reverse_lookup = !resolve_name end @@ -59,6 +64,9 @@ def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: if receive_buffer_size socket_option_set_one(sock, :SO_RCVBUF, receive_buffer_size.to_i) end + if send_keepalive_packet + socket_option_set_one(sock, :SO_KEEPALIVE, true) + end sock end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 6eda018fe8..37c8213d8a 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -81,6 +81,27 @@ def create_driver(conf=CONFIG) assert_equal 1, d.instance.security.users.size assert_equal 1, d.instance.security.clients.size end + + test 'send_keepalive_packet is disabled by default' do + @d = d = create_driver(CONFIG_AUTH) + assert_false d.instance.send_keepalive_packet + end + + test 'send_keepalive_packet can be enabled' do + @d = d = create_driver(CONFIG_AUTH + %[ + send_keepalive_packet true + ]) + assert_true d.instance.send_keepalive_packet + end + + test 'both send_keepalive_packet and deny_keepalive cannot be enabled' do + assert_raise(Fluent::ConfigError.new("both 'send_keepalive_packet' and 'deny_keepalive' cannot be set to true")) do + create_driver(CONFIG_AUTH + %[ + send_keepalive_packet true + deny_keepalive true + ]) + end + end end sub_test_case 'message' do diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index 4847785e54..d628ccfeeb 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -236,6 +236,15 @@ class Dummy < Fluent::Plugin::TestBase end end + data( + 'server_create udp' => [:server_create, :udp], + ) + test 'raise error if tcp/tls/unix options specified for udp' do |(m, proto)| + assert_raise(ArgumentError.new("BUG: send_keepalive_packet is available for tcp")) do + @d.__send__(m, :myserver, PORT, proto: proto, send_keepalive_packet: true){|x| x } + end + end + data( 'server_create tcp' => [:server_create, :tcp, {}], 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], @@ -352,7 +361,7 @@ class Dummy < Fluent::Plugin::TestBase sub_test_case '#server_create_tcp' do test 'can accept all keyword arguments valid for tcp server' do assert_nothing_raised do - @d.server_create_tcp(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500) do |data, conn| + @d.server_create_tcp(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500, send_keepalive_packet: true) do |data, conn| # ... end end