Skip to content

Commit

Permalink
Adds delay_on_retry to wait between each failed connection
Browse files Browse the repository at this point in the history
  • Loading branch information
DinoPullerUqido authored and picandocodigo committed Oct 6, 2021
1 parent f3bb61e commit c2f8311
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
3 changes: 3 additions & 0 deletions lib/elastic/transport/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class Client
#
# @option arguments [Boolean,Number] :retry_on_failure Retry X times when request fails before raising and
# exception (false by default)
# @option arguments [Number] :delay_on_retry Delay in milliseconds between each retry (0 by default)
#
# @option arguments Array<Number> :retry_on_status Retry when specific status codes are returned
#
# @option arguments [Boolean] :reload_on_failure Reload connections after failure (false by default)
Expand Down Expand Up @@ -126,6 +128,7 @@ def initialize(arguments = {}, &block)
@arguments[:tracer] ||= @arguments[:trace] ? DEFAULT_TRACER.call() : nil
@arguments[:reload_connections] ||= false
@arguments[:retry_on_failure] ||= false
@arguments[:delay_on_retry] ||= 0
@arguments[:reload_on_failure] ||= false
@arguments[:randomize_hosts] ||= false
@arguments[:transport_options] ||= {}
Expand Down
7 changes: 3 additions & 4 deletions lib/elastic/transport/transport/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def initialize(arguments = {}, &block)
@options = arguments[:options] || {}
@options[:http] ||= {}
@options[:retry_on_status] ||= []
@options[:delay_on_retry] ||= 0

@block = block
@compression = !!@options[:compression]
Expand Down Expand Up @@ -273,6 +274,7 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts =
start = Time.now
tries = 0
reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry])

max_retries = if opts.key?(:retry_on_failure)
opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
Expand All @@ -284,6 +286,7 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts =
ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }

begin
sleep(delay_on_retry / 1000.0) if tries > 0
tries += 1
connection = get_connection or raise Error.new('Cannot get new connection from pool.')

Expand All @@ -297,7 +300,6 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts =

# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

rescue Elastic::Transport::Transport::ServerError => e
if response && @retry_on_status.include?(response.status)
log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
Expand All @@ -310,7 +312,6 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts =
else
raise e
end

rescue *host_unreachable_exceptions => e
log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

Expand All @@ -332,11 +333,9 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts =
else
raise e
end

rescue Exception => e
log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
raise e

end #/begin

duration = Time.now - start
Expand Down
23 changes: 23 additions & 0 deletions spec/elastic/transport/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,29 @@
end
end

context 'when retry_on_failure is true and delay_on_retry is specified' do
context 'when a node is unreachable' do
let(:hosts) do
[ELASTICSEARCH_HOSTS.first, "foobar1", "foobar2"]
end

let(:options) do
{ retry_on_failure: true, delay_on_retry: 3000 }
end

let(:responses) do
5.times.collect do
client.perform_request('GET', '_nodes/_local')
end
end

it 'retries on failure' do
allow_any_instance_of(Object).to receive(:sleep).with(3000 / 1000)
expect(responses.all? { true }).to be(true)
end
end
end

context 'when reload_on_failure is true' do
let(:hosts) do
[ELASTICSEARCH_HOSTS.first, 'foobar1', 'foobar2']
Expand Down

0 comments on commit c2f8311

Please sign in to comment.