Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keepalive for out_forward plugin #2393

Merged
merged 10 commits into from
May 8, 2019
Merged
240 changes: 225 additions & 15 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class ConnectionClosedError < Error; end
desc 'The transport protocol.'
config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
# TODO: TLS session cache/tickets
# TODO: Connection keepalive

desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
Expand Down Expand Up @@ -103,6 +102,10 @@ class ConnectionClosedError < Error; end
config_param :tls_client_private_key_path, :string, default: nil
desc 'The client private key passphrase for TLS.'
config_param :tls_client_private_key_passphrase, :string, default: nil, secret: true
desc "Enable keepalive connection."
config_param :keepalive, :bool, default: false
desc "Expired time of keepalive. Default value is nil, which means to keep connection as long as possible"
config_param :keepalive_timeout, :time, default: nil

config_section :security, required: false, multi: false do
desc 'The hostname'
Expand Down Expand Up @@ -151,6 +154,7 @@ def initialize
@usock = nil
@sock_ack_waiting = nil
@sock_ack_waiting_mutex = nil
@keep_alive_watcher_interval = 5 # TODO
end

def configure(conf)
Expand Down Expand Up @@ -201,9 +205,9 @@ def configure(conf)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout)
else
node = Node.new(self, server, failure: failure)
node = Node.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: @keepalive_timeout)
begin
node.validate_host_resolution!
rescue => e
Expand All @@ -227,6 +231,10 @@ def configure(conf)
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
end

if !@keepalive && @keepalive_timeout
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
end

raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end

Expand Down Expand Up @@ -279,13 +287,21 @@ def start
end
end
end

if @keepalive && @keepalive_timeout
timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
end
end

def close
if @usock
# close socket and ignore errors: this socket will not be used anyway.
@usock.close rescue nil
end

if @keepalive && @keepalive_timeout
@nodes.each(&:clear)
end
super
end

Expand Down Expand Up @@ -450,6 +466,10 @@ def on_heartbeat(sockaddr, msg)
end
end

def on_purge_obsolete_socks
@nodes.each(&:purge_obsolete_socks)
end

# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
Expand Down Expand Up @@ -484,8 +504,13 @@ def read_ack_from_sock(sock, unpacker)
log.error "unexpected error while receiving ack message", error: e
log.error_backtrace
ensure
info.sock.close_write rescue nil
info.sock.close rescue nil
if @keepalive
info.node.socket_cache.dec_ref_by_value(info.sock)
else
info.sock.close_write rescue nil
info.sock.close rescue nil
end

@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting.delete(info)
end
Expand Down Expand Up @@ -513,6 +538,9 @@ def ack_reader
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
if @keepalive
info.node.socket_cache.revoke_by_value(info.sock)
end
info.sock.close rescue nil
rollback_write(info.chunk_id, update_retry: false)
else
Expand All @@ -538,7 +566,142 @@ def ack_reader
end

class Node
def initialize(sender, server, failure:)
class SocketCache
TimedSocket = Struct.new(:timeout, :sock, :ref)

def initialize(timeout, log)
@log = log
@timeout = timeout
@active_socks = {}
@inactive_socks = {}
@mutex = Mutex.new
end

def revoke(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
end
end
end

def clear
@mutex.synchronize do
@inactive_socks.values.each do |s|
s.sock.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding rescue nil is better? Hard to recover broken socket here.

end
@inactive_socks.clear

@active_socks.values.each do |s|
s.sock.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end
@active_socks.clear
end
end

def purge_obsolete_socks
@mutex.synchronize do
@inactive_socks.keys.each do |k|
# 0 means sockets stored in this class received all acks
if @inactive_socks[k].ref <= 0
s = @inactive_socks.delete(k)
s.sock.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@log.debug("purged obsolete socket #{s.sock}")
end
end

@active_socks.keys.each do |k|
if expired?(k) && @active_socks[k].ref <= 0
@inactive_socks[k] = @active_socks.delete(k)
end
end
end
end

# We expect that `yield` returns a unique object in this class
def fetch_or(key = Thread.current.object_id)
@mutex.synchronize do
unless @active_socks[key]
@active_socks[key] = TimedSocket.new(timeout, yield, 1)
@log.debug("connect new socket #{@active_socks[key]}")
return @active_socks[key].sock
end

if expired?(key)
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
@inactive_socks[key] = @active_socks.delete(key)
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
end

@active_socks[key].ref += 1;
@active_socks[key].sock
end
end

def dec_ref(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@active_socks[key].ref -= 1
elsif @inactive_socks[key]
@inactive_socks[key].ref -= 1
else
@log.warn("Not found key for dec_ref: #{key}")
end
end
end

# This method is expected to be called in class which doesn't call #inc_ref
def dec_ref_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@active_socks[key].ref -= 1
return
end

sock = @inactive_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key].ref -= 1
return
else
@log.warn("Not found key for dec_ref_by_value: #{key}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about #{key.name || key.object_id}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect key is Thread.current.object_id. So it already returns object id(and Thread.current.object_id.name raises NoMethodError I think).

end
end
end

# This method is expected to be called in class which doesn't call #fetch_or
def revoke_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use detect so this point guarantees no multiple entries for val, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. and added comment 8bffce6

if sock
key = sock.first
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
else
@log.debug("Not found for revoke_by_value :#{val}")
end
end
end

private

def timeout
@timeout && Time.now + @timeout
end

# This method is thread unsafe
def expired?(key = Thread.current.object_id)
@active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
end
end

# @param keepalive [Bool]
# @param keepalive_timeout [Integer | nil]
def initialize(sender, server, failure:, keepalive: false, keepalive_timeout: nil)
@sender = sender
@log = sender.log
@compress = sender.compress
Expand Down Expand Up @@ -571,13 +734,19 @@ def initialize(sender, server, failure:)
@resolved_host = nil
@resolved_time = 0
@resolved_once = false

@keepalive = keepalive
if @keepalive
@socket_cache = SocketCache.new(keepalive_timeout, @log)
end
end

attr_accessor :usock

attr_reader :name, :host, :port, :weight, :standby, :state
attr_reader :sockaddr # used by on_heartbeat
attr_reader :failure, :available # for test
attr_reader :socket_cache # for ack

RequestInfo = Struct.new(:state, :shared_key_nonce, :auth)

Expand All @@ -598,15 +767,12 @@ def standby?
end

def verify_connection
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
connect do |sock|
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
end
ensure
sock.close
end
end

Expand Down Expand Up @@ -672,24 +838,42 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
sock = connect

begin
send_data_actual(sock, tag, chunk)
rescue
sock.close rescue nil
if @keepalive
@socket_cache.revoke
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following code is needed?

else
  sock.close rescue nil
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I didn't mean to delete the code. d3cb120

raise
end

if @sender.require_ack_response
return sock # to read ACK from socket
end

sock.close_write rescue nil
sock.close rescue nil
if @keepalive
@socket_cache.dec_ref
else
sock.close_write rescue nil
sock.close rescue nil
end
heartbeat(false)
nil
end

def clear
@keepalive && @socket_cache.clear
end

def purge_obsolete_socks
unless @keepalive
raise "Don not call this method without keepalive option"
end
@socket_cache.purge_obsolete_socks
end

# FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
def send_heartbeat
begin
Expand All @@ -705,7 +889,7 @@ def send_heartbeat

case @sender.heartbeat_type
when :transport
@sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
connect(dest_addr) do |sock|
## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA

Expand Down Expand Up @@ -880,6 +1064,32 @@ def on_read(sock, ri, data)
raise "BUG: unknown session state: #{ri.state}"
end
end

private

def connect(host = nil)
sock = if @keepalive
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
else
@log.debug('connect new socket')
@sender.create_transfer_socket(host || resolved_host, port, @hostname)
end

if block_given?
begin
yield(sock)
rescue
@socket_cache.revoke(sock) if @keepalive
raise
else
@socket_cache.dec_ref(sock) if @keepalive
ensure
sock.close unless @keepalive
end
else
sock
end
end
end

# Override Node to disable heartbeat
Expand Down
Loading