Skip to content

Commit

Permalink
Merge pull request #2193 from okkez/fluent-cat-retry
Browse files Browse the repository at this point in the history
fluent-cat: Handle retry limit
  • Loading branch information
repeatedly authored Nov 27, 2018
2 parents 2747ce2 + 9a6baa2 commit b8e32fc
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
format = 'json'
message_key = 'message'
time_as_integer = false
retry_limit = 5

op.on('-p', '--port PORT', "fluent tcp port (default: #{port})", Integer) {|i|
port = i
Expand Down Expand Up @@ -75,6 +76,10 @@
time_as_integer = true
}

op.on('--retry-limit N', "Specify the number of retry limit (default: #{retry_limit})", Integer) {|n|
retry_limit = n
}

singleton_class.module_eval do
define_method(:usage) do |msg|
puts op.to_s
Expand Down Expand Up @@ -107,6 +112,8 @@
class Writer
include MonitorMixin

RetryLimitError = Class.new(StandardError)

class TimerThread
def initialize(writer)
@writer = writer
Expand All @@ -130,7 +137,7 @@ def run
end
end

def initialize(tag, connector, time_as_integer: false)
def initialize(tag, connector, time_as_integer: false, retry_limit: 5)
@tag = tag
@connector = connector
@socket = false
Expand All @@ -142,7 +149,7 @@ def initialize(tag, connector, time_as_integer: false)
@pending = []
@pending_limit = 1024 # TODO
@retry_wait = 1
@retry_limit = 5 # TODO
@retry_limit = retry_limit
@time_as_integer = time_as_integer

super()
Expand Down Expand Up @@ -236,21 +243,24 @@ def get_socket
end

def try_connect
now = Time.now.to_i

unless @error_history.empty?
# wait before re-connecting
wait = @retry_wait * (2 ** (@error_history.size-1))
if now <= @socket_time + wait
return false
begin
now = Time.now.to_i

unless @error_history.empty?
# wait before re-connecting
wait = 1 #@retry_wait * (2 ** (@error_history.size-1))
if now <= @socket_time + wait
sleep(wait)
try_connect
end
end
end

begin
@socket = @connector.call
@error_history.clear
return true

rescue RetryLimitError => ex
raise ex
rescue
$stderr.puts "connect failed: #{$!}"
@error_history << $!
Expand All @@ -263,9 +273,10 @@ def try_connect
}
@pending.clear
@error_history.clear
raise RetryLimitError, "exceed retry limit"
else
retry
end

return false
end
end

Expand All @@ -285,7 +296,7 @@ def abort_message(time, record)
}
end

w = Writer.new(tag, connector, time_as_integer: time_as_integer)
w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit)
w.start

case format
Expand Down

0 comments on commit b8e32fc

Please sign in to comment.