diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index bd9bb92395..2a1435f339 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -213,8 +213,10 @@ def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_ socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| - @_server_mutex.synchronize do - @_server_connections << conn + unless conn.closing + @_server_mutex.synchronize do + @_server_connections << conn + end end end server.listen(backlog) if backlog @@ -227,8 +229,10 @@ def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_o socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| - @_server_mutex.synchronize do - @_server_connections << conn + unless conn.closing + @_server_mutex.synchronize do + @_server_connections << conn + end end end server.listen(backlog) if backlog @@ -538,6 +542,8 @@ def on_readable_with_sock end class TCPServer < Coolio::TCPSocket + attr_reader :closing + def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket) @@ -618,6 +624,8 @@ def close end class TLSServer < Coolio::Socket + attr_reader :closing + # It can't use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket. def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)