From 93dbdcd5f83412c6be3c494e09ab7e50a4579195 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 7 Apr 2023 10:00:54 +0900 Subject: [PATCH] Server helper: Suppress error of UDPServer over max_bytes on Windows This fix just suppresses the error message and output a info level log. On Windows, `UDPSocket::recv` and `UDPSocket::recvfrom` can raise `Errno::EMSGSIZE` error when the receiving data size is larger than the `maxlen`, the value specified as the method's argument. (On non-Windows, this error doesn't happen, and we can receive data that is cut off to the specified size.) We need to consider this case since this causes an unexpected error message as follows. unexpected error in processing UDP data error_class=Errno::EMSGSIZE error="A message sent on a datagram socket was larger than the internal message buffer or some other network limit, or the buffer used to receive a datagram into was smaller than the datagram itself. - recvfrom(2)" On Windows, we can't cut data off the same way as Non-Windows. So, the data is simply ignored on Windows. It is not a big problem since cutting data off usually causes a failure of parsing and the data is eventually ignored. We want information about what data is ignored, but it is difficult to know it. That is an issue for the future. Ref: `winsock` specification * https://learn.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-recvfrom * https://learn.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2 Ref: Ruby implementation * https://github.com/ruby/ruby/blob/e51014f9c05aa65cbf203442d37fef7c12390015/win32/win32.c#L3540-L3589 * https://github.com/ruby/ruby/blob/e51014f9c05aa65cbf203442d37fef7c12390015/win32/win32.c#L3730-L3782 Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin_helper/server.rb | 8 +++++ test/plugin_helper/test_server.rb | 50 ++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 7e27a09b68..541c0ad607 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -545,6 +545,10 @@ def on_readable_without_sock data = @sock.recv(@max_bytes, @flags) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF return + rescue Errno::EMSGSIZE + # Windows ONLY: This happens when the data size is larger than `@max_bytes`. + @log.info "A received data is ignored since it is too large." + return end @callback.call(data) rescue => e @@ -558,6 +562,10 @@ def on_readable_with_sock data, addr = @sock.recvfrom(@max_bytes) rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF return + rescue Errno::EMSGSIZE + # Windows ONLY: This happens when the data size is larger than `@max_bytes`. + @log.info "A received data is ignored since it is too large." + return end @callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket)) rescue => e diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index 0cf1aa5812..644f5a33ab 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -29,6 +29,7 @@ class Dummy < Fluent::Plugin::TestBase ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s @d = Dummy.new + @d.under_plugin_development = true @d.start @d.after_start end @@ -796,6 +797,51 @@ class Dummy < Fluent::Plugin::TestBase end end + sub_test_case "UDPServer" do + data("Normal", { options: { max_bytes: 1024 }, records: ["Hello", "World!"], expected: ["Hello", "World!"] }, keep: true) + data("Over max_bytes on Non-Windows", { options: { max_bytes: 32 }, records: ["a" * 40], expected: ["a" * 32] }, keep: true) unless Fluent.windows? + data("Over max_bytes on Windows", { options: { max_bytes: 32 }, records: ["a" * 40], expected: [] }, keep: true) if Fluent.windows? + test "With sock" do |data| + options, records, expected = data.values + + actual_records = [] + @d.server_create(:myserver, @port, proto: :udp, **options) do |data, sock| + actual_records << data + end + + open_client(:udp, "127.0.0.1", @port) do |sock| + records.each do |record| + sock.send(record, 0) + end + end + + waiting(10) { sleep 0.1 until actual_records.size >= expected.size } + sleep 1 if expected.size == 0 # To confirm no record recieved. + + assert_equal expected, actual_records + end + + test "Without sock" do |data| + options, records, expected = data.values + + actual_records = [] + @d.server_create(:myserver, @port, proto: :udp, **options) do |data| + actual_records << data + end + + open_client(:udp, "127.0.0.1", @port) do |sock| + records.each do |record| + sock.send(record, 0) + end + end + + waiting(10) { sleep 0.1 until actual_records.size >= expected.size } + sleep 1 if expected.size == 0 # To confirm no record recieved. + + assert_equal expected, actual_records + end + end + module CertUtil extend Fluent::PluginHelper::CertOption end @@ -1575,6 +1621,10 @@ def assert_certificate(cert, expected_extensions) def open_client(proto, addr, port) client = case proto + when :udp + c = UDPSocket.open + c.connect(addr, port) + c when :tcp TCPSocket.open(addr, port) when :tls