Skip to content

Commit

Permalink
Server helper: Suppress error of UDPServer over max_bytes on Windows
Browse files Browse the repository at this point in the history
This fix just suppresses the error message and output a info level log.

On Windows, `UDPSocket::recv` and `UDPSocket::recvfrom` can
raise `Errno::EMSGSIZE` error when the receiving data size is
larger than the `maxlen`, the value specified as the method's
argument.
(On non-Windows, this error doesn't happen, and we can receive data
that is cut off to the specified size.)

We need to consider this case since this causes an unexpected
error message as follows.

     unexpected error in processing UDP data
     error_class=Errno::EMSGSIZE
     error="A message sent on a datagram socket was larger than
     the internal message buffer or some other network limit,
     or the buffer used to receive a datagram into was smaller
     than the datagram itself. - recvfrom(2)"

On Windows, we can't cut data off the same way as Non-Windows.
So, the data is simply ignored on Windows.
It is not a big problem since cutting data off usually causes a
failure of parsing and the data is eventually ignored.

We want information about what data is ignored, but it is
difficult to know it. That is an issue for the future.

Ref: `winsock` specification

* https://learn.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-recvfrom
* https://learn.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2

Ref: Ruby implementation

* https://github.com/ruby/ruby/blob/e51014f9c05aa65cbf203442d37fef7c12390015/win32/win32.c#L3540-L3589
* https://github.com/ruby/ruby/blob/e51014f9c05aa65cbf203442d37fef7c12390015/win32/win32.c#L3730-L3782

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Apr 7, 2023
1 parent a6def0c commit 93dbdcd
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
8 changes: 8 additions & 0 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ def on_readable_without_sock
data = @sock.recv(@max_bytes, @flags)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF
return
rescue Errno::EMSGSIZE
# Windows ONLY: This happens when the data size is larger than `@max_bytes`.
@log.info "A received data is ignored since it is too large."
return
end
@callback.call(data)
rescue => e
Expand All @@ -558,6 +562,10 @@ def on_readable_with_sock
data, addr = @sock.recvfrom(@max_bytes)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF
return
rescue Errno::EMSGSIZE
# Windows ONLY: This happens when the data size is larger than `@max_bytes`.
@log.info "A received data is ignored since it is too large."
return
end
@callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket))
rescue => e
Expand Down
50 changes: 50 additions & 0 deletions test/plugin_helper/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Dummy < Fluent::Plugin::TestBase
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s

@d = Dummy.new
@d.under_plugin_development = true
@d.start
@d.after_start
end
Expand Down Expand Up @@ -796,6 +797,51 @@ class Dummy < Fluent::Plugin::TestBase
end
end

sub_test_case "UDPServer" do
data("Normal", { options: { max_bytes: 1024 }, records: ["Hello", "World!"], expected: ["Hello", "World!"] }, keep: true)
data("Over max_bytes on Non-Windows", { options: { max_bytes: 32 }, records: ["a" * 40], expected: ["a" * 32] }, keep: true) unless Fluent.windows?
data("Over max_bytes on Windows", { options: { max_bytes: 32 }, records: ["a" * 40], expected: [] }, keep: true) if Fluent.windows?
test "With sock" do |data|
options, records, expected = data.values

actual_records = []
@d.server_create(:myserver, @port, proto: :udp, **options) do |data, sock|
actual_records << data
end

open_client(:udp, "127.0.0.1", @port) do |sock|
records.each do |record|
sock.send(record, 0)
end
end

waiting(10) { sleep 0.1 until actual_records.size >= expected.size }
sleep 1 if expected.size == 0 # To confirm no record recieved.

assert_equal expected, actual_records
end

test "Without sock" do |data|
options, records, expected = data.values

actual_records = []
@d.server_create(:myserver, @port, proto: :udp, **options) do |data|
actual_records << data
end

open_client(:udp, "127.0.0.1", @port) do |sock|
records.each do |record|
sock.send(record, 0)
end
end

waiting(10) { sleep 0.1 until actual_records.size >= expected.size }
sleep 1 if expected.size == 0 # To confirm no record recieved.

assert_equal expected, actual_records
end
end

module CertUtil
extend Fluent::PluginHelper::CertOption
end
Expand Down Expand Up @@ -1575,6 +1621,10 @@ def assert_certificate(cert, expected_extensions)

def open_client(proto, addr, port)
client = case proto
when :udp
c = UDPSocket.open
c.connect(addr, port)
c
when :tcp
TCPSocket.open(addr, port)
when :tls
Expand Down

0 comments on commit 93dbdcd

Please sign in to comment.