From ebc55b00be6a38d71e42befad2e0f34db4f322a4 Mon Sep 17 00:00:00 2001 From: Watson Date: Tue, 26 Nov 2024 17:29:27 +0900 Subject: [PATCH] plugin_helper/server: Add receive_buffer_size parameter in transport section (#4649) Signed-off-by: Shizuo Fujita --- lib/fluent/plugin/in_udp.rb | 7 ++++++- lib/fluent/plugin_helper/server.rb | 7 +++++++ test/plugin_helper/test_server.rb | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..e16c412c07 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -43,10 +43,15 @@ class UdpInput < Input desc "Remove newline from the end of incoming payload" config_param :remove_newline, :bool, default: true desc "The max size of socket receive buffer. SO_RCVBUF" - config_param :receive_buffer_size, :size, default: nil + config_param :receive_buffer_size, :size, default: nil, deprecated: "use receive_buffer_size in transport section instead." config_param :blocking_timeout, :time, default: 0.5 + # overwrite server plugin to change default to :udp and remove tcp/tls protocol from list + config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do + config_argument :protocol, :enum, list: [:udp], default: :udp + end + def configure(conf) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index eecee5ae7e..720ad46a15 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -84,6 +84,8 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0 end + socket_options[:receive_buffer_size] ||= @transport_config&.receive_buffer_size + socket_option_validate!(proto, **socket_options) socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } @@ -136,6 +138,8 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0 end + socket_options[:receive_buffer_size] ||= @transport_config&.receive_buffer_size + unless socket socket_option_validate!(proto, **socket_options) socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } @@ -266,6 +270,9 @@ module ServerTransportParams ### Socket Params ### + desc "The max size of socket receive buffer. SO_RCVBUF" + config_param :receive_buffer_size, :size, default: nil + # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src. # Set positive value if needing to send FIN on closing on non-Windows. # (On Windows, Fluentd can send FIN with zero `linger_timeout` since Fluentd doesn't set 0 to SO_LINGER on Windows. diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index 63beb0bd70..512134bf62 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -89,6 +89,23 @@ class Dummy < Fluent::Plugin::TestBase assert d.log assert_equal 1, d.transport_config.linger_timeout end + + test 'can change receive_buffer_size option' do + d = Dummy.new + + transport_opts = { + 'receive_buffer_size' => 1024, + } + transport_conf = config_element('transport', 'tcp', transport_opts) + conf = config_element('source', 'tag.*', {}, [transport_conf]) + + assert_nothing_raised do + d.configure(conf) + end + assert d.plugin_id + assert d.log + assert_equal 1024, d.transport_config.receive_buffer_size + end end # run tests for tcp, udp, tls and unix