Skip to content

Commit

Permalink
Enable server plugins to specify socket-option SO_LINGER
Browse files Browse the repository at this point in the history
`in_forward` plugin has the config, but most of plugins using the `server`
helper doesn't have the config, such as `in_tcp` or `in_http`.

The helper sets `0` to this option by default.
It causes the server sends RST rather than FIN on closing.
So this value needs to be changed if needing FIN signal on closing.

Adding this option to `transport_config` enable all plugins using the helper
to change this value.

Signed-off-by: daipom <[email protected]>
  • Loading branch information
daipom committed Feb 25, 2022
1 parent 296f021 commit 81d35f6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 7 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ForwardInput < Input
config_param :backlog, :integer, default: nil
# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
desc 'The timeout time used to set linger option.'
config_param :linger_timeout, :integer, default: 0
config_param :linger_timeout, :integer, default: nil, deprecated: "use transport directive"
# This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value.
config_param :blocking_timeout, :time, default: 0.5
desc 'Try to resolve hostname from IP addresses or not.'
Expand Down
25 changes: 21 additions & 4 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t
raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1

if proto == :tcp || proto == :tls # default linger_timeout only for server
socket_options[:linger_timeout] ||= 0
if proto == :tcp || proto == :tls
socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
end

socket_option_validate!(proto, **socket_options)
Expand Down Expand Up @@ -132,8 +132,8 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket
raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

if proto == :tcp || proto == :tls # default linger_timeout only for server
socket_options[:linger_timeout] ||= 0
if proto == :tcp || proto == :tls
socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
end

unless socket
Expand Down Expand Up @@ -263,6 +263,23 @@ module ServerTransportParams
include Fluent::Configurable
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: [:tcp, :tls], default: :tcp

### Socket Params ###

# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src.
# Set positive value if needing to send FIN on closing.
# NOTE:
# Socket-options can be specified from each plugin as needed, so most of them is not defined here for now.
# This is because there is no positive reason to do so.
# `linger_timeout` option in particular needs to be defined here
# although it can be specified from each plugin as well.
# This is because this helper fixes the default value to `0` for its own reason
# and it has a critical effect on the behavior.
desc 'The timeout time used to set linger option.'
config_param :linger_timeout, :integer, default: 0

### TLS Params ###

config_param :version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: Fluent::TLS::DEFAULT_VERSION
config_param :min_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: nil
config_param :max_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: nil
Expand Down
2 changes: 0 additions & 2 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def create_driver(conf=base_config)
@d = d = create_driver
assert_equal @port, d.instance.port
assert_equal '127.0.0.1', d.instance.bind
assert_equal 0, d.instance.linger_timeout
assert_equal 0.5, d.instance.blocking_timeout
assert !d.instance.backlog
end
Expand All @@ -77,7 +76,6 @@ def create_driver(conf=base_config)
@d = d = create_driver(config_auth)
assert_equal @port, d.instance.port
assert_equal '127.0.0.1', d.instance.bind
assert_equal 0, d.instance.linger_timeout
assert !d.instance.backlog

assert d.instance.security
Expand Down
18 changes: 18 additions & 0 deletions test/plugin_helper/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ class Dummy < Fluent::Plugin::TestBase
end
assert d.plugin_id
assert d.log
assert_equal 0, d.transport_config.linger_timeout
end

test 'can change linger_timeout option' do
d = Dummy.new

transport_opts = {
'linger_timeout' => 1,
}
transport_conf = config_element('transport', 'tcp', transport_opts)
conf = config_element('source', 'tag.*', {}, [transport_conf])

assert_nothing_raised do
d.configure(conf)
end
assert d.plugin_id
assert d.log
assert_equal 1, d.transport_config.linger_timeout
end
end

Expand Down

0 comments on commit 81d35f6

Please sign in to comment.