Skip to content

Commit

Permalink
plugin_helper/server: Add receive_buffer_size parameter in transport …
Browse files Browse the repository at this point in the history
…section (#4649)

Signed-off-by: Shizuo Fujita <[email protected]>
  • Loading branch information
Watson1978 authored Nov 26, 2024
1 parent 713b738 commit ebc55b0
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
7 changes: 6 additions & 1 deletion lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ class UdpInput < Input
desc "Remove newline from the end of incoming payload"
config_param :remove_newline, :bool, default: true
desc "The max size of socket receive buffer. SO_RCVBUF"
config_param :receive_buffer_size, :size, default: nil
config_param :receive_buffer_size, :size, default: nil, deprecated: "use receive_buffer_size in transport section instead."

config_param :blocking_timeout, :time, default: 0.5

# overwrite server plugin to change default to :udp and remove tcp/tls protocol from list
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: [:udp], default: :udp
end

def configure(conf)
compat_parameters_convert(conf, :parser)
parser_config = conf.elements('parse').first
Expand Down
7 changes: 7 additions & 0 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t
socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
end

socket_options[:receive_buffer_size] ||= @transport_config&.receive_buffer_size

socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }

Expand Down Expand Up @@ -136,6 +138,8 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket
socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
end

socket_options[:receive_buffer_size] ||= @transport_config&.receive_buffer_size

unless socket
socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
Expand Down Expand Up @@ -266,6 +270,9 @@ module ServerTransportParams

### Socket Params ###

desc "The max size of socket receive buffer. SO_RCVBUF"
config_param :receive_buffer_size, :size, default: nil

# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src.
# Set positive value if needing to send FIN on closing on non-Windows.
# (On Windows, Fluentd can send FIN with zero `linger_timeout` since Fluentd doesn't set 0 to SO_LINGER on Windows.
Expand Down
17 changes: 17 additions & 0 deletions test/plugin_helper/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ class Dummy < Fluent::Plugin::TestBase
assert d.log
assert_equal 1, d.transport_config.linger_timeout
end

test 'can change receive_buffer_size option' do
d = Dummy.new

transport_opts = {
'receive_buffer_size' => 1024,
}
transport_conf = config_element('transport', 'tcp', transport_opts)
conf = config_element('source', 'tag.*', {}, [transport_conf])

assert_nothing_raised do
d.configure(conf)
end
assert d.plugin_id
assert d.log
assert_equal 1024, d.transport_config.receive_buffer_size
end
end

# run tests for tcp, udp, tls and unix
Expand Down

0 comments on commit ebc55b0

Please sign in to comment.