Skip to content

Commit 934740b

Browse files
yaauiekaisecheng
andauthored
management: confirm server-side filtering 404s (#18265)
* management: fix behaviour with SystemIndex-backed ES vs 404's CHANGED BEHAVIOR (system indices): - when communicating with Elasticsearch >= 7.10, a fetch-all-pipelines query that results in a 404 error is treated as a meaningful error since Elasticsearch does NOT emit 404 when no pipelines have been configured, preventing a misconfigured proxy from causing pipelines to be shut down - when communicating with Elasticsearch >= 8.3, a server-side query for pipelines that results in a 404 error response no longer results in the immediate shut-down of existing pipelines unless a second client-side filtering of all available pipelines indicates that none should be running. MAINTAINED BEHAVIOUR (legacy hidden indices): - when commmunicating with Elasticsearch < 7.10, a pipeline-fetching query that results in a 404 error response is STILL treated as an empty result set, and will cause any running pipelines to be shut down. * management: deprecate legacy hidden index source (ES < 7.10) * Update x-pack/lib/config_management/elasticsearch_source.rb Co-authored-by: kaisecheng <[email protected]> --------- Co-authored-by: kaisecheng <[email protected]>
1 parent 4a1c7bf commit 934740b

File tree

2 files changed

+101
-33
lines changed

2 files changed

+101
-33
lines changed

x-pack/lib/config_management/elasticsearch_source.rb

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,7 @@ def pipeline_configs
6363
es_version = get_es_version
6464
fetcher = get_pipeline_fetcher(es_version)
6565

66-
begin
67-
fetcher.fetch_config(es_version, pipeline_ids, client)
68-
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
69-
# es-output 12.0.2 throws 404 as error, but we want to handle it as empty config
70-
return [] if e.response_code == 404
71-
raise e
72-
end
66+
fetcher.fetch_config(es_version, pipeline_ids, client)
7367

7468
fetcher.get_pipeline_ids.collect do |pid|
7569
get_pipeline(pid, fetcher)
@@ -210,22 +204,46 @@ class SystemIndicesFetcher
210204
SYSTEM_INDICES_API_PATH = "_logstash/pipeline"
211205

212206
def fetch_config(es_version, pipeline_ids, client)
213-
es_supports_pipeline_wildcard_search = es_supports_pipeline_wildcard_search?(es_version)
207+
# if we are talking with an Elasticsearch that supports wildcard search, and get
208+
# a successful response, use it. But wildcard search has a weird quirk that it 404's
209+
# when there are no matches, so we need to fall through to traditional client-side
210+
# search to rule out a proxy emitting a 404.
211+
if es_supports_pipeline_wildcard_search?(es_version)
212+
begin
213+
logger.trace? && logger.trace("querying for pipelines #{pipeline_ids.join(",")} using server-side wildcard search")
214+
response = get_response(client,"#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}")
215+
intercept_error(response, pipeline_ids)
216+
@pipelines = response
217+
return @pipelines
218+
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
219+
raise unless e.response_code == 404
220+
logger.warn("got 404 requesting pipelines from Elasticsearch using wildcard search; falling back to client-side filtering")
221+
end
222+
end
223+
224+
# client-side filtering
225+
logger.trace("querying for pipelines #{pipeline_ids.join(",")} using client-side wildcard search")
226+
response = get_response(client,"#{SYSTEM_INDICES_API_PATH}/")
227+
intercept_error(response, pipeline_ids)
228+
@pipelines = get_wildcard_pipelines(pipeline_ids, response)
229+
230+
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
231+
raise ElasticsearchSource::RemoteConfigError, "Cannot load configuration for pipeline_id: #{pipeline_ids}, server returned `#{e}`"
232+
end
233+
234+
def get_response(client, path)
214235
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
215236
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
216-
path = es_supports_pipeline_wildcard_search ?
217-
"#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}" :
218-
"#{SYSTEM_INDICES_API_PATH}/"
219237
client.get(path)
220238
}
221239

240+
response
241+
end
242+
243+
def intercept_error(response, pipeline_ids)
222244
if response["error"]
223245
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
224246
end
225-
226-
@pipelines = es_supports_pipeline_wildcard_search ?
227-
response :
228-
get_wildcard_pipelines(pipeline_ids, response)
229247
end
230248

231249
def es_supports_pipeline_wildcard_search?(es_version)
@@ -271,11 +289,18 @@ class LegacyHiddenIndicesFetcher
271289
PIPELINE_INDEX = ".logstash"
272290

273291
def fetch_config(es_version, pipeline_ids, client)
292+
deprecation_logger.deprecated("Fetching pipeline configs from Elasticsearch #{es_version}; Central Management will soon require Elasticsearch 8.x+")
274293
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
275294
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
276-
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
277-
client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)
278-
}
295+
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) do
296+
begin
297+
client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)
298+
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
299+
raise e unless e.response_code == 404
300+
@pipelines = {}
301+
return @pipelines
302+
end
303+
end
279304

280305
if response["error"]
281306
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"

x-pack/spec/config_management/elasticsearch_source_spec.rb

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
describe LogStash::ConfigManagement::ElasticsearchSource do
1515
let(:system_indices_api) { LogStash::ConfigManagement::SystemIndicesFetcher::SYSTEM_INDICES_API_PATH }
1616
let(:system_indices_url_regex) { Regexp.new("^#{system_indices_api}") }
17+
let(:system_indices_url_search_regex) { Regexp.new("^#{Regexp.escape(system_indices_api)}[?]id=([^=&]+)$") }
18+
let(:system_indices_url_all_regex) { Regexp.new("^#{Regexp.escape(system_indices_api)}/?$") }
1719
let(:elasticsearch_url) { ["https://localhost:9898"] }
1820
let(:elasticsearch_username) { "elastictest" }
1921
let(:elasticsearch_password) { "testchangeme" }
@@ -91,6 +93,7 @@
9193

9294
let(:es_version_response) { es_version_8_response }
9395
let(:es_version_8_response) { cluster_info("8.0.0-SNAPSHOT") }
96+
let(:es_version_8_3_response) { cluster_info("8.3.0") }
9497
let(:es_version_7_9_response) { cluster_info("7.9.1") }
9598

9699
let(:elasticsearch_7_9_err_response) {
@@ -230,7 +233,9 @@
230233
let(:pipeline_id) { "super_generator" }
231234
let(:elasticsearch_response) { {"#{pipeline_id}" => {"pipeline" => "#{config}"}} }
232235
let(:all_pipelines) { JSON.parse(::File.read(::File.join(::File.dirname(__FILE__), "fixtures", "pipelines.json"))) }
233-
let(:mock_logger) { double("fetcher's logger") }
236+
let(:mock_logger) { double("fetcher's logger").as_null_object }
237+
238+
let(:bad_response_code_error) { LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError }
234239

235240
before(:each) {
236241
allow(subject).to receive(:logger).and_return(mock_logger)
@@ -242,16 +247,36 @@
242247
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
243248
end
244249

245-
it "#fetch_config from ES v8.3" do
246-
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone)
247-
expect(subject.fetch_config(es_version_8_3, [pipeline_id], mock_client)).to eq(elasticsearch_response)
248-
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
249-
end
250+
{
251+
'v8.3' => { major: 8, minor: 3},
252+
'v9.0' => { major: 9, minor: 0}
253+
}.each do |desc, es_version|
254+
context "ES #{desc}" do
255+
it "#fetch_config works" do
256+
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone)
257+
expect(subject.fetch_config(es_version, [pipeline_id], mock_client)).to eq(elasticsearch_response)
258+
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
259+
end
250260

251-
it "#fetch_config from ES v9.0" do
252-
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone)
253-
expect(subject.fetch_config(es_version_9_0, [pipeline_id], mock_client)).to eq(elasticsearch_response)
254-
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
261+
it "#fetch_config from ES v8.3 with 404->200 is empty list" do
262+
wildcard_search_path = "#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}"
263+
expect(mock_client).to receive(:get).with(wildcard_search_path)
264+
.and_raise(bad_response_code_error.new(404, wildcard_search_path, nil, '{}'))
265+
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/").and_return({})
266+
expect(subject.fetch_config(es_version, [pipeline_id], mock_client)).to eq({})
267+
end
268+
269+
it "#fetch_config from ES v8.3 with 404->404 is error" do
270+
# 404's on Wildcard search need to be confirmed with a 404 from the get-all endpoint
271+
wildcard_search_path = "#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}"
272+
expect(mock_client).to receive(:get).with(wildcard_search_path)
273+
.and_raise(bad_response_code_error.new(404, wildcard_search_path, nil, '{}'))
274+
get_all_path = "#{described_class::SYSTEM_INDICES_API_PATH}/"
275+
expect(mock_client).to receive(:get).with(get_all_path)
276+
.and_raise(bad_response_code_error.new(404, get_all_path, nil, '{}'))
277+
expect { subject.fetch_config(es_version, [pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError)
278+
end
279+
end
255280
end
256281

257282
it "#fetch_config should raise error" do
@@ -359,7 +384,7 @@
359384
expect { subject.fetch_config(empty_es_version, [pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError)
360385
end
361386

362-
it "#fetch_config should raise error when response is empty" do
387+
it "#fetch_config should raise error when response is malformed" do
363388
expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(LogStash::Json.load("{}"))
364389
expect { subject.fetch_config(empty_es_version, [pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError)
365390
end
@@ -760,10 +785,28 @@
760785
expect { subject.pipeline_configs }.to raise_error /Something bad/
761786
end
762787

763-
it "returns empty pipeline when ES client raise BadResponseCodeError in [8]" do
764-
allow(mock_client).to receive(:get).with("/").and_return(es_version_response)
765-
expect(mock_client).to receive(:get).with(system_indices_url_regex).and_raise(bad_response_404)
766-
expect(subject.pipeline_configs).to be_empty
788+
context "8.0" do
789+
it "returns empty pipeline when ES client raise BadResponseCodeError in [8]" do
790+
allow(mock_client).to receive(:get).with("/").and_return(es_version_response)
791+
expect(mock_client).to receive(:get).with(system_indices_url_regex).and_raise(bad_response_404)
792+
expect { subject.pipeline_configs }.to raise_error /response code '404'/
793+
end
794+
end
795+
796+
context "8.3+" do
797+
it "returns empty pipeline when ES search 404's but client-side 200's and includes no matching pipelines" do
798+
expect(mock_client).to receive(:get).with("/").and_return(es_version_8_3_response)
799+
expect(mock_client).to receive(:get).with(system_indices_url_search_regex).and_raise(bad_response_404)
800+
expect(mock_client).to receive(:get).with(system_indices_url_all_regex).and_return({})
801+
expect(subject.pipeline_configs).to be_empty
802+
end
803+
804+
it "raises the 404 when ES search 404's and client-side query 404's too" do
805+
expect(mock_client).to receive(:get).with("/").and_return(es_version_8_3_response)
806+
expect(mock_client).to receive(:get).with(system_indices_url_search_regex).and_raise(bad_response_404)
807+
expect(mock_client).to receive(:get).with(system_indices_url_all_regex).and_raise(bad_response_404)
808+
expect { subject.pipeline_configs }.to raise_error /response code '404'/
809+
end
767810
end
768811

769812
it "returns empty pipeline when ES client raise BadResponseCodeError in [7.9]" do

0 commit comments

Comments
 (0)