Skip to content

Commit

Permalink
Merge pull request #2393 from ganmacs/keepalive-for-out_forward
Browse files Browse the repository at this point in the history
Keepalive for out_forward plugin
  • Loading branch information
repeatedly authored May 8, 2019
2 parents add2af9 + d3cb120 commit 272bc85
Show file tree
Hide file tree
Showing 2 changed files with 436 additions and 15 deletions.
242 changes: 227 additions & 15 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class ConnectionClosedError < Error; end
desc 'The transport protocol.'
config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
# TODO: TLS session cache/tickets
# TODO: Connection keepalive

desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
Expand Down Expand Up @@ -103,6 +102,10 @@ class ConnectionClosedError < Error; end
config_param :tls_client_private_key_path, :string, default: nil
desc 'The client private key passphrase for TLS.'
config_param :tls_client_private_key_passphrase, :string, default: nil, secret: true
desc "Enable keepalive connection."
config_param :keepalive, :bool, default: false
desc "Expired time of keepalive. Default value is nil, which means to keep connection as long as possible"
config_param :keepalive_timeout, :time, default: nil

config_section :security, required: false, multi: false do
desc 'The hostname'
Expand Down Expand Up @@ -151,6 +154,7 @@ def initialize
@usock = nil
@sock_ack_waiting = nil
@sock_ack_waiting_mutex = nil
@keep_alive_watcher_interval = 5 # TODO
end

def configure(conf)
Expand Down Expand Up @@ -201,9 +205,9 @@ def configure(conf)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout)
else
node = Node.new(self, server, failure: failure)
node = Node.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout)
begin
node.validate_host_resolution!
rescue => e
Expand All @@ -227,6 +231,10 @@ def configure(conf)
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
end

if !@keepalive && @keepalive_timeout
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
end

raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end

Expand Down Expand Up @@ -279,13 +287,21 @@ def start
end
end
end

if @keepalive && @keepalive_timeout
timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
end
end

def close
if @usock
# close socket and ignore errors: this socket will not be used anyway.
@usock.close rescue nil
end

if @keepalive && @keepalive_timeout
@nodes.each(&:clear)
end
super
end

Expand Down Expand Up @@ -453,6 +469,10 @@ def on_heartbeat(sockaddr, msg)
end
end

def on_purge_obsolete_socks
@nodes.each(&:purge_obsolete_socks)
end

# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
Expand Down Expand Up @@ -487,8 +507,13 @@ def read_ack_from_sock(sock, unpacker)
log.error "unexpected error while receiving ack message", error: e
log.error_backtrace
ensure
info.sock.close_write rescue nil
info.sock.close rescue nil
if @keepalive
info.node.socket_cache.dec_ref_by_value(info.sock)
else
info.sock.close_write rescue nil
info.sock.close rescue nil
end

@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting.delete(info)
end
Expand Down Expand Up @@ -516,6 +541,9 @@ def ack_reader
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
if @keepalive
info.node.socket_cache.revoke_by_value(info.sock)
end
info.sock.close rescue nil
rollback_write(info.chunk_id, update_retry: false)
else
Expand All @@ -541,7 +569,142 @@ def ack_reader
end

class Node
def initialize(sender, server, failure:)
class SocketCache
TimedSocket = Struct.new(:timeout, :sock, :ref)

def initialize(timeout, log)
@log = log
@timeout = timeout
@active_socks = {}
@inactive_socks = {}
@mutex = Mutex.new
end

def revoke(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
end
end
end

def clear
@mutex.synchronize do
@inactive_socks.values.each do |s|
s.sock.close rescue nil
end
@inactive_socks.clear

@active_socks.values.each do |s|
s.sock.close rescue nil
end
@active_socks.clear
end
end

def purge_obsolete_socks
@mutex.synchronize do
@inactive_socks.keys.each do |k|
# 0 means sockets stored in this class received all acks
if @inactive_socks[k].ref <= 0
s = @inactive_socks.delete(k)
s.sock.close rescue nil
@log.debug("purged obsolete socket #{s.sock}")
end
end

@active_socks.keys.each do |k|
if expired?(k) && @active_socks[k].ref <= 0
@inactive_socks[k] = @active_socks.delete(k)
end
end
end
end

# We expect that `yield` returns a unique object in this class
def fetch_or(key = Thread.current.object_id)
@mutex.synchronize do
unless @active_socks[key]
@active_socks[key] = TimedSocket.new(timeout, yield, 1)
@log.debug("connect new socket #{@active_socks[key]}")
return @active_socks[key].sock
end

if expired?(key)
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
@inactive_socks[key] = @active_socks.delete(key)
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
end

@active_socks[key].ref += 1;
@active_socks[key].sock
end
end

def dec_ref(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@active_socks[key].ref -= 1
elsif @inactive_socks[key]
@inactive_socks[key].ref -= 1
else
@log.warn("Not found key for dec_ref: #{key}")
end
end
end

# This method is expected to be called in class which doesn't call #inc_ref
def dec_ref_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@active_socks[key].ref -= 1
return
end

sock = @inactive_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key].ref -= 1
return
else
@log.warn("Not found key for dec_ref_by_value: #{key}")
end
end
end

# This method is expected to be called in class which doesn't call #fetch_or
def revoke_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
else
@log.debug("Not found for revoke_by_value :#{val}")
end
end
end

private

def timeout
@timeout && Time.now + @timeout
end

# This method is thread unsafe
def expired?(key = Thread.current.object_id)
@active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
end
end

# @param keepalive [Bool]
# @param keepalive_timeout [Integer | nil]
def initialize(sender, server, failure:, keepalive: false, keepalive_timeout: nil)
@sender = sender
@log = sender.log
@compress = sender.compress
Expand Down Expand Up @@ -574,13 +737,19 @@ def initialize(sender, server, failure:)
@resolved_host = nil
@resolved_time = 0
@resolved_once = false

@keepalive = keepalive
if @keepalive
@socket_cache = SocketCache.new(keepalive_timeout, @log)
end
end

attr_accessor :usock

attr_reader :name, :host, :port, :weight, :standby, :state
attr_reader :sockaddr # used by on_heartbeat
attr_reader :failure, :available # for test
attr_reader :socket_cache # for ack

RequestInfo = Struct.new(:state, :shared_key_nonce, :auth)

Expand All @@ -601,15 +770,12 @@ def standby?
end

def verify_connection
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
connect do |sock|
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
end
ensure
sock.close
end
end

Expand Down Expand Up @@ -675,24 +841,44 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
sock = connect

begin
send_data_actual(sock, tag, chunk)
rescue
sock.close rescue nil
if @keepalive
@socket_cache.revoke
else
sock.close rescue nil
end
raise
end

if @sender.require_ack_response
return sock # to read ACK from socket
end

sock.close_write rescue nil
sock.close rescue nil
if @keepalive
@socket_cache.dec_ref
else
sock.close_write rescue nil
sock.close rescue nil
end
heartbeat(false)
nil
end

def clear
@keepalive && @socket_cache.clear
end

def purge_obsolete_socks
unless @keepalive
raise "Don not call this method without keepalive option"
end
@socket_cache.purge_obsolete_socks
end

# FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
def send_heartbeat
begin
Expand All @@ -708,7 +894,7 @@ def send_heartbeat

case @sender.heartbeat_type
when :transport
@sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
connect(dest_addr) do |sock|
## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA

Expand Down Expand Up @@ -883,6 +1069,32 @@ def on_read(sock, ri, data)
raise "BUG: unknown session state: #{ri.state}"
end
end

private

def connect(host = nil)
sock = if @keepalive
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
else
@log.debug('connect new socket')
@sender.create_transfer_socket(host || resolved_host, port, @hostname)
end

if block_given?
begin
yield(sock)
rescue
@socket_cache.revoke(sock) if @keepalive
raise
else
@socket_cache.dec_ref(sock) if @keepalive
ensure
sock.close unless @keepalive
end
else
sock
end
end
end

# Override Node to disable heartbeat
Expand Down
Loading

0 comments on commit 272bc85

Please sign in to comment.