diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 7101c79896..d2667cb8fb 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -36,7 +36,6 @@ class ConnectionClosedError < Error; end desc 'The transport protocol.' config_param :transport, :enum, list: [:tcp, :tls], default: :tcp # TODO: TLS session cache/tickets - # TODO: Connection keepalive desc 'The timeout time when sending event logs.' config_param :send_timeout, :time, default: 60 @@ -103,6 +102,10 @@ class ConnectionClosedError < Error; end config_param :tls_client_private_key_path, :string, default: nil desc 'The client private key passphrase for TLS.' config_param :tls_client_private_key_passphrase, :string, default: nil, secret: true + desc "Enable keepalive connection." + config_param :keepalive, :bool, default: false + desc "Expired time of keepalive. Default value is nil, which means to keep connection as long as possible" + config_param :keepalive_timeout, :time, default: nil config_section :security, required: false, multi: false do desc 'The hostname' @@ -151,6 +154,7 @@ def initialize @usock = nil @sock_ack_waiting = nil @sock_ack_waiting_mutex = nil + @keep_alive_watcher_interval = 5 # TODO end def configure(conf) @@ -201,9 +205,9 @@ def configure(conf) log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id if @heartbeat_type == :none - @nodes << NoneHeartbeatNode.new(self, server, failure: failure) + @nodes << NoneHeartbeatNode.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout) else - node = Node.new(self, server, failure: failure) + node = Node.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout) begin node.validate_host_resolution! rescue => e @@ -227,6 +231,10 @@ def configure(conf) raise Fluent::ConfigError, "forward output plugin requires at least one is required" end + if !@keepalive && @keepalive_timeout + log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.') + end + raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 end @@ -279,6 +287,10 @@ def start end end end + + if @keepalive && @keepalive_timeout + timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks)) + end end def close @@ -286,6 +298,10 @@ def close # close socket and ignore errors: this socket will not be used anyway. @usock.close rescue nil end + + if @keepalive && @keepalive_timeout + @nodes.each(&:clear) + end super end @@ -450,6 +466,10 @@ def on_heartbeat(sockaddr, msg) end end + def on_purge_obsolete_socks + @nodes.each(&:purge_obsolete_socks) + end + # return chunk id to be committed def read_ack_from_sock(sock, unpacker) begin @@ -484,8 +504,13 @@ def read_ack_from_sock(sock, unpacker) log.error "unexpected error while receiving ack message", error: e log.error_backtrace ensure - info.sock.close_write rescue nil - info.sock.close rescue nil + if @keepalive + info.node.socket_cache.dec_ref_by_value(info.sock) + else + info.sock.close_write rescue nil + info.sock.close rescue nil + end + @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting.delete(info) end @@ -513,6 +538,9 @@ def ack_reader # (2) the node does support sending response but responses have not arrived for some reasons. log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! + if @keepalive + info.node.socket_cache.revoke_by_value(info.sock) + end info.sock.close rescue nil rollback_write(info.chunk_id, update_retry: false) else @@ -538,7 +566,142 @@ def ack_reader end class Node - def initialize(sender, server, failure:) + class SocketCache + TimedSocket = Struct.new(:timeout, :sock, :ref) + + def initialize(timeout, log) + @log = log + @timeout = timeout + @active_socks = {} + @inactive_socks = {} + @mutex = Mutex.new + end + + def revoke(key = Thread.current.object_id) + @mutex.synchronize do + if @active_socks[key] + @inactive_socks[key] = @active_socks.delete(key) + @inactive_socks[key].ref = 0 + end + end + end + + def clear + @mutex.synchronize do + @inactive_socks.values.each do |s| + s.sock.close rescue nil + end + @inactive_socks.clear + + @active_socks.values.each do |s| + s.sock.close rescue nil + end + @active_socks.clear + end + end + + def purge_obsolete_socks + @mutex.synchronize do + @inactive_socks.keys.each do |k| + # 0 means sockets stored in this class received all acks + if @inactive_socks[k].ref <= 0 + s = @inactive_socks.delete(k) + s.sock.close rescue nil + @log.debug("purged obsolete socket #{s.sock}") + end + end + + @active_socks.keys.each do |k| + if expired?(k) && @active_socks[k].ref <= 0 + @inactive_socks[k] = @active_socks.delete(k) + end + end + end + end + + # We expect that `yield` returns a unique object in this class + def fetch_or(key = Thread.current.object_id) + @mutex.synchronize do + unless @active_socks[key] + @active_socks[key] = TimedSocket.new(timeout, yield, 1) + @log.debug("connect new socket #{@active_socks[key]}") + return @active_socks[key].sock + end + + if expired?(key) + # Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack) + @inactive_socks[key] = @active_socks.delete(key) + @log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...") + @active_socks[key] = TimedSocket.new(timeout, yield, 0) + end + + @active_socks[key].ref += 1; + @active_socks[key].sock + end + end + + def dec_ref(key = Thread.current.object_id) + @mutex.synchronize do + if @active_socks[key] + @active_socks[key].ref -= 1 + elsif @inactive_socks[key] + @inactive_socks[key].ref -= 1 + else + @log.warn("Not found key for dec_ref: #{key}") + end + end + end + + # This method is expected to be called in class which doesn't call #inc_ref + def dec_ref_by_value(val) + @mutex.synchronize do + sock = @active_socks.detect { |_, v| v.sock == val } + if sock + key = sock.first + @active_socks[key].ref -= 1 + return + end + + sock = @inactive_socks.detect { |_, v| v.sock == val } + if sock + key = sock.first + @inactive_socks[key].ref -= 1 + return + else + @log.warn("Not found key for dec_ref_by_value: #{key}") + end + end + end + + # This method is expected to be called in class which doesn't call #fetch_or + def revoke_by_value(val) + @mutex.synchronize do + sock = @active_socks.detect { |_, v| v.sock == val } + if sock + key = sock.first + @inactive_socks[key] = @active_socks.delete(key) + @inactive_socks[key].ref = 0 + else + @log.debug("Not found for revoke_by_value :#{val}") + end + end + end + + private + + def timeout + @timeout && Time.now + @timeout + end + + # This method is thread unsafe + def expired?(key = Thread.current.object_id) + @active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false + end + end + + # @param keepalive [Bool] + # @param keepalive_timeout [Integer | nil] + def initialize(sender, server, failure:, keepalive: false, keepalive_timeout: nil) @sender = sender @log = sender.log @compress = sender.compress @@ -571,6 +734,11 @@ def initialize(sender, server, failure:) @resolved_host = nil @resolved_time = 0 @resolved_once = false + + @keepalive = keepalive + if @keepalive + @socket_cache = SocketCache.new(keepalive_timeout, @log) + end end attr_accessor :usock @@ -578,6 +746,7 @@ def initialize(sender, server, failure:) attr_reader :name, :host, :port, :weight, :standby, :state attr_reader :sockaddr # used by on_heartbeat attr_reader :failure, :available # for test + attr_reader :socket_cache # for ack RequestInfo = Struct.new(:state, :shared_key_nonce, :auth) @@ -598,15 +767,12 @@ def standby? end def verify_connection - sock = @sender.create_transfer_socket(resolved_host, port, @hostname) - begin + connect do |sock| ri = RequestInfo.new(@sender.security ? :helo : :established) if ri.state != :established establish_connection(sock, ri) raise if ri.state != :established end - ensure - sock.close end end @@ -672,11 +838,16 @@ def send_data_actual(sock, tag, chunk) end def send_data(tag, chunk) - sock = @sender.create_transfer_socket(resolved_host, port, @hostname) + sock = connect + begin send_data_actual(sock, tag, chunk) rescue - sock.close rescue nil + if @keepalive + @socket_cache.revoke + else + sock.close rescue nil + end raise end @@ -684,12 +855,27 @@ def send_data(tag, chunk) return sock # to read ACK from socket end - sock.close_write rescue nil - sock.close rescue nil + if @keepalive + @socket_cache.dec_ref + else + sock.close_write rescue nil + sock.close rescue nil + end heartbeat(false) nil end + def clear + @keepalive && @socket_cache.clear + end + + def purge_obsolete_socks + unless @keepalive + raise "Don not call this method without keepalive option" + end + @socket_cache.purge_obsolete_socks + end + # FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack def send_heartbeat begin @@ -705,7 +891,7 @@ def send_heartbeat case @sender.heartbeat_type when :transport - @sender.create_transfer_socket(dest_addr, port, @hostname) do |sock| + connect(dest_addr) do |sock| ## don't send any data to not cause a compatibility problem # sock.write FORWARD_TCP_HEARTBEAT_DATA @@ -880,6 +1066,32 @@ def on_read(sock, ri, data) raise "BUG: unknown session state: #{ri.state}" end end + + private + + def connect(host = nil) + sock = if @keepalive + @socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) } + else + @log.debug('connect new socket') + @sender.create_transfer_socket(host || resolved_host, port, @hostname) + end + + if block_given? + begin + yield(sock) + rescue + @socket_cache.revoke(sock) if @keepalive + raise + else + @socket_cache.dec_ref(sock) if @keepalive + ensure + sock.close unless @keepalive + end + else + sock + end + end end # Override Node to disable heartbeat diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index af4d652276..dc6bc0e210 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -920,4 +920,213 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG assert_equal(['test', time, records[1]], events[1]) end end + + test 'Create new connection per send_data' do + target_input_driver = create_target_input_driver(conf: TARGET_CONFIG) + output_conf = CONFIG + d = create_driver(output_conf) + d.instance_start + + begin + chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + mock.proxy(d.instance).create_transfer_socket(TARGET_HOST, TARGET_PORT, 'test') { |sock| mock(sock).close.once; sock }.twice + + target_input_driver.run(timeout: 15) do + d.run(shutdown: false) do + node = d.instance.nodes.first + 2.times do + node.send_data('test', chunk) rescue nil + end + end + end + ensure + d.instance_shutdown + end + end + + sub_test_case 'keepalive' do + test 'Do not create connection per send_data' do + target_input_driver = create_target_input_driver(conf: TARGET_CONFIG) + output_conf = CONFIG + %[ + keepalive true + keepalive_timeout 2 + ] + d = create_driver(output_conf) + d.instance_start + + begin + chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + mock.proxy(d.instance).create_transfer_socket(TARGET_HOST, TARGET_PORT, 'test') { |sock| mock(sock).close.once; sock }.once + + target_input_driver.run(timeout: 15) do + d.run(shutdown: false) do + node = d.instance.nodes.first + 2.times do + node.send_data('test', chunk) rescue nil + end + end + end + ensure + d.instance_shutdown + end + end + + sub_test_case 'with require_ack_response' do + test 'Do not create connection per send_data' do + target_input_driver = create_target_input_driver(conf: TARGET_CONFIG) + output_conf = CONFIG + %[ + require_ack_response true + keepalive true + keepalive_timeout 2 + ] + d = create_driver(output_conf) + d.instance_start + + begin + chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) + mock.proxy(d.instance).create_transfer_socket(TARGET_HOST, TARGET_PORT, 'test') { |sock| mock(sock).close.once; sock }.once + + target_input_driver.run(timeout: 15) do + d.run(shutdown: false) do + node = d.instance.nodes.first + 2.times do + node.send_data('test', chunk) rescue nil + end + end + end + ensure + d.instance_shutdown + end + end + end + end + + sub_test_case 'SocketCache' do + sub_test_case 'fetch_or' do + test 'when gived key does not exist' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + sock = mock!.open { 1 }.subject + assert_equal(1, c.fetch_or { sock.open }) + end + + test 'when given key exists' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + assert_equal(1, c.fetch_or { 1 }) + + sock = dont_allow(mock!).open + assert_equal(1, c.fetch_or { sock.open }) + end + + test "when given key's value was expired" do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(0, Logger.new(nil)) + assert_equal(1, c.fetch_or { 1 }) + + sock = mock!.open { 1 }.subject + assert_equal(1, c.fetch_or { sock.open }) + end + end + + test 'revoke' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + c.fetch_or { 1 } + c.revoke + + sock = mock!.open { 1 }.subject + assert_equal(1, c.fetch_or { sock.open }) + end + + test 'revoke_by_value' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + c.fetch_or { 1 } + c.revoke_by_value(1) + + sock = mock!.open { 1 }.subject + assert_equal(1, c.fetch_or { sock.open }) + end + + sub_test_case 'dec_ref' do + test 'when value exists in active_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + c.fetch_or { 1 } + c.dec_ref + + assert_equal(0, c.instance_variable_get(:@active_socks)[Thread.current.object_id].ref) + end + + test 'when value exists in inactive_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + c.fetch_or { 1 } + c.revoke + c.dec_ref + assert_equal(-1, c.instance_variable_get(:@inactive_socks)[Thread.current.object_id].ref) + end + end + + sub_test_case 'dec_ref_by_value' do + test 'when value exists in active_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + c.fetch_or { 1 } + c.dec_ref_by_value(1) + + assert_equal(0, c.instance_variable_get(:@active_socks)[Thread.current.object_id].ref) + end + + test 'when value exists in inactive_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + c.fetch_or { 1 } + c.revoke + c.dec_ref_by_value(1) + assert_equal(-1, c.instance_variable_get(:@inactive_socks)[Thread.current.object_id].ref) + end + end + + sub_test_case 'clear' do + test 'when value is in active_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + m = mock!.close { 'closed' }.subject + c.fetch_or { m } + assert_true(!c.instance_variable_get(:@active_socks).empty?) + + c.clear + assert_true(c.instance_variable_get(:@active_socks).empty?) + end + + test 'when value is in inactive_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + m = mock!.close { 'closed' }.subject + c.fetch_or { m } + c.revoke + assert_true(!c.instance_variable_get(:@inactive_socks).empty?) + + c.clear + assert_true(c.instance_variable_get(:@active_socks).empty?) + end + end + + sub_test_case 'purge_obsolete_socks' do + test 'delete key in inactive_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + m = mock!.close { 'closed' }.subject + c.fetch_or { m } + c.revoke + assert_true(!c.instance_variable_get(:@inactive_socks).empty?) + + c.purge_obsolete_socks + assert_true(c.instance_variable_get(:@active_socks).empty?) + end + + test 'move key from active_socks to inactive_socks' do + c = Fluent::Plugin::ForwardOutput::Node::SocketCache.new(10, Logger.new(nil)) + m = dont_allow(mock!).close + stub(m).inspect # for log + c.fetch_or { m } + assert_true(!c.instance_variable_get(:@active_socks).empty?) + assert_true(c.instance_variable_get(:@inactive_socks).empty?) + + c.purge_obsolete_socks + assert_true(!c.instance_variable_get(:@active_socks).empty?) + assert_true(c.instance_variable_get(:@inactive_socks).empty?) + end + end + end end