diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 7e27a09b68..eecee5ae7e 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -545,6 +545,10 @@ def on_readable_without_sock data = @sock.recv(@max_bytes, @flags) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF return + rescue Errno::EMSGSIZE + # Windows ONLY: This happens when the data size is larger than `@max_bytes`. + @log.info "A received data was ignored since it was too large." + return end @callback.call(data) rescue => e @@ -558,6 +562,10 @@ def on_readable_with_sock data, addr = @sock.recvfrom(@max_bytes) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF return + rescue Errno::EMSGSIZE + # Windows ONLY: This happens when the data size is larger than `@max_bytes`. + @log.info "A received data was ignored since it was too large." + return end @callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket)) rescue => e diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index 0cf1aa5812..cb44b8363d 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -29,6 +29,7 @@ class Dummy < Fluent::Plugin::TestBase ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s @d = Dummy.new + @d.under_plugin_development = true @d.start @d.after_start end @@ -794,6 +795,50 @@ class Dummy < Fluent::Plugin::TestBase end end end + + sub_test_case 'over max_bytes' do + data("cut off on Non-Windows", { max_bytes: 32, records: ["a" * 40], expected: ["a" * 32] }, keep: true) unless Fluent.windows? + data("drop on Windows", { max_bytes: 32, records: ["a" * 40], expected: [] }, keep: true) if Fluent.windows? + test 'with sock' do |data| + max_bytes, records, expected = data.values + + actual_records = [] + @d.server_create_udp(:myserver, @port, max_bytes: max_bytes) do |data, sock| + actual_records << data + end + + open_client(:udp, "127.0.0.1", @port) do |sock| + records.each do |record| + sock.send(record, 0) + end + end + + waiting(10) { sleep 0.1 until actual_records.size >= expected.size } + sleep 1 if expected.size == 0 # To confirm no record recieved. + + assert_equal expected, actual_records + end + + test 'without sock' do |data| + max_bytes, records, expected = data.values + + actual_records = [] + @d.server_create_udp(:myserver, @port, max_bytes: max_bytes) do |data| + actual_records << data + end + + open_client(:udp, "127.0.0.1", @port) do |sock| + records.each do |record| + sock.send(record, 0) + end + end + + waiting(10) { sleep 0.1 until actual_records.size >= expected.size } + sleep 1 if expected.size == 0 # To confirm no record recieved. + + assert_equal expected, actual_records + end + end end module CertUtil @@ -1575,6 +1620,10 @@ def assert_certificate(cert, expected_extensions) def open_client(proto, addr, port) client = case proto + when :udp + c = UDPSocket.open + c.connect(addr, port) + c when :tcp TCPSocket.open(addr, port) when :tls