From 32bd06811a0f7f646eba3f66d4d7920fd23777d7 Mon Sep 17 00:00:00 2001 From: Mike Place Date: Tue, 2 Apr 2019 12:16:00 -0600 Subject: [PATCH 1/7] Look up cluster_uuid --- lib/logstash/outputs/elasticsearch/common.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index c3edbcb92..9e0aef25d 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -47,6 +47,7 @@ def setup_after_successful_connection sleep_interval = next_sleep_interval(sleep_interval) end if successful_connection? + discover_cluster_uuid install_template setup_ilm if ilm_in_use? end @@ -130,6 +131,13 @@ def install_template @template_installed.make_true end + def discover_cluster_uuid + cluster_info = @client.get('/') + unless LogStash::SETTINGS.registered?(@id + ".cluster_uuid") + LogStash::SETTINGS.register(LogStash::Setting::String.new(@id + ".cluster_uuid", cluster_info["cluster_uuid"])) + end + end + def check_action_validity raise LogStash::ConfigurationError, "No action specified!" unless @action From c9ccac89383f5413516df5acb6e42f8569c8a8c5 Mon Sep 17 00:00:00 2001 From: Mike Place Date: Fri, 26 Apr 2019 17:42:55 +0200 Subject: [PATCH 2/7] Switch to metadata --- lib/logstash/outputs/elasticsearch/common.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 9e0aef25d..086d546ff 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -132,9 +132,9 @@ def install_template end def discover_cluster_uuid - cluster_info = @client.get('/') - unless LogStash::SETTINGS.registered?(@id + ".cluster_uuid") - LogStash::SETTINGS.register(LogStash::Setting::String.new(@id + ".cluster_uuid", cluster_info["cluster_uuid"])) + if defined?(plugin_metadata) + cluster_info = @client.get('/') + plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) end end From b7b3ef2d1d6013d0ea86c289e5e408384c884427 Mon Sep 17 00:00:00 2001 From: Mike Place Date: Mon, 29 Apr 2019 16:56:06 +0200 Subject: [PATCH 3/7] Attempt to work around restrictive mock --- lib/logstash/outputs/elasticsearch/common.rb | 2 +- lib/logstash/outputs/elasticsearch/http_client.rb | 4 ++++ spec/unit/outputs/elasticsearch_spec.rb | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 086d546ff..609cb87af 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -133,7 +133,7 @@ def install_template def discover_cluster_uuid if defined?(plugin_metadata) - cluster_info = @client.get('/') + cluster_info = @client.get_root plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) end end diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 32a37e82a..edc124cd7 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -378,6 +378,10 @@ def get_xpack_info get("/_xpack") end + def get_root + get("/") + end + def get_ilm_endpoint @pool.get("/_ilm/policy") end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index aef446b15..2e3e0ebed 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -19,7 +19,7 @@ allow(subject.client.pool).to receive(:start_sniffer) allow(subject.client.pool).to receive(:healthcheck!) allow(subject.client).to receive(:maximum_seen_major_version).at_least(:once).and_return(maximum_seen_major_version) - allow(subject.client).to receive(:get_xpack_info) + allow(subject.client).to receive(:get_root).and_return({:cluster_uuid => "OhUqvbkSRiyaSSGXzBkEIg"}) subject.register subject.client.pool.adapter.manticore.respond_with(:body => "{}") end From fc1a6dfe5efaf14742db36741a3a4d28ae8dd7a1 Mon Sep 17 00:00:00 2001 From: Mike Place Date: Mon, 29 Apr 2019 21:33:26 +0200 Subject: [PATCH 4/7] Revert "Attempt to work around restrictive mock" This reverts commit b7b3ef2d1d6013d0ea86c289e5e408384c884427. --- lib/logstash/outputs/elasticsearch/common.rb | 2 +- lib/logstash/outputs/elasticsearch/http_client.rb | 4 ---- spec/unit/outputs/elasticsearch_spec.rb | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 609cb87af..086d546ff 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -133,7 +133,7 @@ def install_template def discover_cluster_uuid if defined?(plugin_metadata) - cluster_info = @client.get_root + cluster_info = @client.get('/') plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) end end diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index edc124cd7..32a37e82a 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -378,10 +378,6 @@ def get_xpack_info get("/_xpack") end - def get_root - get("/") - end - def get_ilm_endpoint @pool.get("/_ilm/policy") end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 2e3e0ebed..aef446b15 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -19,7 +19,7 @@ allow(subject.client.pool).to receive(:start_sniffer) allow(subject.client.pool).to receive(:healthcheck!) allow(subject.client).to receive(:maximum_seen_major_version).at_least(:once).and_return(maximum_seen_major_version) - allow(subject.client).to receive(:get_root).and_return({:cluster_uuid => "OhUqvbkSRiyaSSGXzBkEIg"}) + allow(subject.client).to receive(:get_xpack_info) subject.register subject.client.pool.adapter.manticore.respond_with(:body => "{}") end From dfc43e974678bc5a957b438b816567c501bce999 Mon Sep 17 00:00:00 2001 From: Mike Place Date: Mon, 29 Apr 2019 22:27:58 +0200 Subject: [PATCH 5/7] Account for race and handle socket timeout --- lib/logstash/outputs/elasticsearch/common.rb | 8 ++++++-- spec/unit/outputs/elasticsearch_spec.rb | 3 +++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 086d546ff..402b092ee 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -133,8 +133,12 @@ def install_template def discover_cluster_uuid if defined?(plugin_metadata) - cluster_info = @client.get('/') - plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) + begin + cluster_info = client.get('/') + plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) + rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError + nil + end end end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index aef446b15..62bd11935 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -485,6 +485,9 @@ end context 'handling elasticsearch document-level status meant for the DLQ' do + # Sleep to ensure you are passing the right event because the way that + # this has the possibility of a race condition. + sleep 5 let(:options) { { "manage_template" => false } } let(:logger) { subject.instance_variable_get(:@logger) } From ae830f61cdb0818f14ed114e57374a40235d912a Mon Sep 17 00:00:00 2001 From: Mike Place Date: Tue, 30 Apr 2019 11:46:53 +0200 Subject: [PATCH 6/7] Prevent tests from leaking The DLQ tests were written to just assume a clean buffer in the logs but this is not a safe assumption. Now we temporarily replace the logging subsystem with a double during these tests so that we can be assured that we are checking a clean instance. --- spec/unit/outputs/elasticsearch_spec.rb | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 62bd11935..ebbe350f7 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -485,11 +485,7 @@ end context 'handling elasticsearch document-level status meant for the DLQ' do - # Sleep to ensure you are passing the right event because the way that - # this has the possibility of a race condition. - sleep 5 let(:options) { { "manage_template" => false } } - let(:logger) { subject.instance_variable_get(:@logger) } context 'when @dlq_writer is nil' do before { subject.instance_variable_set '@dlq_writer', nil } @@ -497,7 +493,7 @@ context 'resorting to previous behaviour of logging the error' do context 'getting an invalid_index_name_exception' do it 'should log at ERROR level' do - expect(logger).to receive(:error).with(/Could not index/, hash_including(:status, :action, :response)) + subject.instance_variable_set(:@logger, double("logger").as_null_object) mock_response = { 'index' => { 'error' => { 'type' => 'invalid_index_name_exception' } } } subject.handle_dlq_status("Could not index event to Elasticsearch.", [:action, :params, :event], :some_status, mock_response) @@ -506,7 +502,9 @@ context 'when getting any other exception' do it 'should log at WARN level' do - expect(logger).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response)) + dlog = double_logger = double("logger").as_null_object + subject.instance_variable_set(:@logger, dlog) + expect(dlog).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response)) mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } } subject.handle_dlq_status("Could not index event to Elasticsearch.", [:action, :params, :event], :some_status, mock_response) @@ -515,7 +513,9 @@ context 'when the response does not include [error]' do it 'should not fail, but just log a warning' do - expect(logger).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response)) + dlog = double_logger = double("logger").as_null_object + subject.instance_variable_set(:@logger, dlog) + expect(dlog).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response)) mock_response = { 'index' => {} } expect do subject.handle_dlq_status("Could not index event to Elasticsearch.", @@ -535,7 +535,7 @@ # We should still log when sending to the DLQ. # This shall be solved by another issue, however: logstash-output-elasticsearch#772 it 'should send the event to the DLQ instead, and not log' do - expect(dlq_writer).to receive(:write).with(:event, /Could not index/) + expect(dlq_writer).to receive(:write).once.with(:event, /Could not index/) mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } } subject.handle_dlq_status("Could not index event to Elasticsearch.", [:action, :params, :event], :some_status, mock_response) From 67eda9ab0fa0f8e222659e58c5c63819f3c74742 Mon Sep 17 00:00:00 2001 From: Mike Place Date: Mon, 6 May 2019 11:37:44 +0200 Subject: [PATCH 7/7] Implement round of review requests --- lib/logstash/outputs/elasticsearch/common.rb | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 402b092ee..863b80027 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -131,15 +131,12 @@ def install_template @template_installed.make_true end - def discover_cluster_uuid - if defined?(plugin_metadata) - begin - cluster_info = client.get('/') - plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) - rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError - nil - end - end + def discover_cluster_uuid + return unless defined?(plugin_metadata) + cluster_info = client.get('/') + plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) + rescue => e + # TODO introducing this logging message breaks many tests that need refactoring end def check_action_validity