diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 7e27a09b68..541c0ad607 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -545,6 +545,10 @@ def on_readable_without_sock data = @sock.recv(@max_bytes, @flags) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF return + rescue Errno::EMSGSIZE + # Windows ONLY: This happens when the data size is larger than `@max_bytes`. + @log.info "A received data is ignored since it is too large." + return end @callback.call(data) rescue => e @@ -558,6 +562,10 @@ def on_readable_with_sock data, addr = @sock.recvfrom(@max_bytes) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF return + rescue Errno::EMSGSIZE + # Windows ONLY: This happens when the data size is larger than `@max_bytes`. + @log.info "A received data is ignored since it is too large." + return end @callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket)) rescue => e diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index 0cf1aa5812..644f5a33ab 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -29,6 +29,7 @@ class Dummy < Fluent::Plugin::TestBase ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s @d = Dummy.new + @d.under_plugin_development = true @d.start @d.after_start end @@ -796,6 +797,51 @@ class Dummy < Fluent::Plugin::TestBase end end + sub_test_case "UDPServer" do + data("Normal", { options: { max_bytes: 1024 }, records: ["Hello", "World!"], expected: ["Hello", "World!"] }, keep: true) + data("Over max_bytes on Non-Windows", { options: { max_bytes: 32 }, records: ["a" * 40], expected: ["a" * 32] }, keep: true) unless Fluent.windows? + data("Over max_bytes on Windows", { options: { max_bytes: 32 }, records: ["a" * 40], expected: [] }, keep: true) if Fluent.windows? + test "With sock" do |data| + options, records, expected = data.values + + actual_records = [] + @d.server_create(:myserver, @port, proto: :udp, **options) do |data, sock| + actual_records << data + end + + open_client(:udp, "127.0.0.1", @port) do |sock| + records.each do |record| + sock.send(record, 0) + end + end + + waiting(10) { sleep 0.1 until actual_records.size >= expected.size } + sleep 1 if expected.size == 0 # To confirm no record recieved. + + assert_equal expected, actual_records + end + + test "Without sock" do |data| + options, records, expected = data.values + + actual_records = [] + @d.server_create(:myserver, @port, proto: :udp, **options) do |data| + actual_records << data + end + + open_client(:udp, "127.0.0.1", @port) do |sock| + records.each do |record| + sock.send(record, 0) + end + end + + waiting(10) { sleep 0.1 until actual_records.size >= expected.size } + sleep 1 if expected.size == 0 # To confirm no record recieved. + + assert_equal expected, actual_records + end + end + module CertUtil extend Fluent::PluginHelper::CertOption end @@ -1575,6 +1621,10 @@ def assert_certificate(cert, expected_extensions) def open_client(proto, addr, port) client = case proto + when :udp + c = UDPSocket.open + c.connect(addr, port) + c when :tcp TCPSocket.open(addr, port) when :tls