From c2f8311409ca63a293588cb7eea5a0c672dbd436 Mon Sep 17 00:00:00 2001 From: Dino Date: Wed, 6 Oct 2021 20:38:57 +0100 Subject: [PATCH] Adds delay_on_retry to wait between each failed connection --- lib/elastic/transport/client.rb | 3 +++ lib/elastic/transport/transport/base.rb | 7 +++---- spec/elastic/transport/client_spec.rb | 23 +++++++++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/elastic/transport/client.rb b/lib/elastic/transport/client.rb index 6b06022..a78316d 100644 --- a/lib/elastic/transport/client.rb +++ b/lib/elastic/transport/client.rb @@ -86,6 +86,8 @@ class Client # # @option arguments [Boolean,Number] :retry_on_failure Retry X times when request fails before raising and # exception (false by default) + # @option arguments [Number] :delay_on_retry Delay in milliseconds between each retry (0 by default) + # # @option arguments Array :retry_on_status Retry when specific status codes are returned # # @option arguments [Boolean] :reload_on_failure Reload connections after failure (false by default) @@ -126,6 +128,7 @@ def initialize(arguments = {}, &block) @arguments[:tracer] ||= @arguments[:trace] ? DEFAULT_TRACER.call() : nil @arguments[:reload_connections] ||= false @arguments[:retry_on_failure] ||= false + @arguments[:delay_on_retry] ||= 0 @arguments[:reload_on_failure] ||= false @arguments[:randomize_hosts] ||= false @arguments[:transport_options] ||= {} diff --git a/lib/elastic/transport/transport/base.rb b/lib/elastic/transport/transport/base.rb index 6041d45..617c831 100644 --- a/lib/elastic/transport/transport/base.rb +++ b/lib/elastic/transport/transport/base.rb @@ -53,6 +53,7 @@ def initialize(arguments = {}, &block) @options = arguments[:options] || {} @options[:http] ||= {} @options[:retry_on_status] ||= [] + @options[:delay_on_retry] ||= 0 @block = block @compression = !!@options[:compression] @@ -273,6 +274,7 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts = start = Time.now tries = 0 reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) + delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry]) max_retries = if opts.key?(:retry_on_failure) opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] @@ -284,6 +286,7 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts = ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } begin + sleep(delay_on_retry / 1000.0) if tries > 0 tries += 1 connection = get_connection or raise Error.new('Cannot get new connection from pool.') @@ -297,7 +300,6 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts = # Raise an exception so we can catch it for `retry_on_status` __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) - rescue Elastic::Transport::Transport::ServerError => e if response && @retry_on_status.include?(response.status) log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" @@ -310,7 +312,6 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts = else raise e end - rescue *host_unreachable_exceptions => e log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" @@ -332,11 +333,9 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts = else raise e end - rescue Exception => e log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" raise e - end #/begin duration = Time.now - start diff --git a/spec/elastic/transport/client_spec.rb b/spec/elastic/transport/client_spec.rb index 6f4de11..5689540 100644 --- a/spec/elastic/transport/client_spec.rb +++ b/spec/elastic/transport/client_spec.rb @@ -1328,6 +1328,29 @@ end end + context 'when retry_on_failure is true and delay_on_retry is specified' do + context 'when a node is unreachable' do + let(:hosts) do + [ELASTICSEARCH_HOSTS.first, "foobar1", "foobar2"] + end + + let(:options) do + { retry_on_failure: true, delay_on_retry: 3000 } + end + + let(:responses) do + 5.times.collect do + client.perform_request('GET', '_nodes/_local') + end + end + + it 'retries on failure' do + allow_any_instance_of(Object).to receive(:sleep).with(3000 / 1000) + expect(responses.all? { true }).to be(true) + end + end + end + context 'when reload_on_failure is true' do let(:hosts) do [ELASTICSEARCH_HOSTS.first, 'foobar1', 'foobar2']