diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index 94a0b6185d9..fab1f42ab3e 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -282,6 +282,10 @@ def to_hash } end + def inspect + "<#{self.class.name}(#{name}): #{value.inspect}" + (@value_is_set ? '' : ' (DEFAULT)') + ">" + end + def ==(other) self.to_hash == other.to_hash end diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 84b56954112..43b7c9bc644 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -26,6 +26,8 @@ class RemoteConfigError < LogStash::Error; end pipeline.workers pipeline.batch.size pipeline.batch.delay + pipeline.ecs_compatibility + pipeline.ordered queue.type queue.max_bytes queue.checkpoint.writes diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 1fa0a0516e9..c9667d2438e 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -368,29 +368,57 @@ let(:mock_client) { double("http_client") } let(:settings) { super().merge({ "xpack.management.pipeline.id" => pipeline_id }) } let(:config) { "input { generator {} } filter { mutate {} } output { }" } + let(:username) { 'log.stash' } + let(:pipeline_settings) do + { + "pipeline.batch.delay" => "50", + "pipeline.workers" => "99", + "pipeline.ordered" => "false", + "pipeline.ecs_compatibility" => "v1", + + # invalid settings to be ignored... + "pipeline.output.workers" => "99", + "nonsensical.invalid.setting"=> "-9999", + } + end + let(:pipeline_metadata) do + { + "version" => 5, + "type" => "logstash_pipeline", + } + end let(:elasticsearch_response) { elasticsearch_8_response } - let(:elasticsearch_8_response) { - "{\"#{pipeline_id}\":{ - \"username\":\"log.stash\", - \"modified_timestamp\":\"2017-02-28T23:02:17.023Z\", - \"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"}, - \"pipeline\":\"#{config}\", - \"pipeline_settings\":{\"pipeline.batch.delay\":\"50\", \"pipeline.workers\":\"99\", \"pipeline.output.workers\":\"99\", \"nonsensical.invalid.setting\":\"-9999\"}}}" } - - let(:elasticsearch_7_9_response) { - "{ \"docs\":[{ - \"_index\":\".logstash\", - \"_type\":\"pipelines\", - \"_id\":\"#{pipeline_id}\", - \"_version\":8, - \"found\":true, - \"_source\":{ - \"id\":\"apache\", - \"description\":\"Process apache logs\", - \"modified_timestamp\":\"2017-02-28T23:02:17.023Z\", - \"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"}, - \"pipeline\":\"#{config}\", - \"pipeline_settings\":{\"pipeline.workers\":\"99\", \"pipeline.output.workers\":\"99\", \"nonsensical.invalid.setting\":\"-9999\"}}}]}" } + let(:elasticsearch_8_response) do + { + pipeline_id => { + username: username, + modified_timestamp: "2017-02-28T23:02:17.023Z", + pipeline_metadata: pipeline_metadata, + pipeline: config, + pipeline_settings: pipeline_settings, + } + }.to_json + end + + let(:elasticsearch_7_9_response) do + { + docs: [{ + _index: ".logstash", + _type: "pipelines", + _id: pipeline_id, + _version: 8, + found: true, + _source: { + id: pipeline_id, + description: "Process apache logs", + modified_timestamp: "2017-02-28T23:02:17.023Z", + pipeline_metadata: pipeline_metadata.merge(username: username), + pipeline: config, + pipeline_settings: pipeline_settings, + } + }] + }.to_json + end let(:es_path) { ".logstash/_mget" } let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) } @@ -412,8 +440,10 @@ context "with one `pipeline_id` configured [#{es_version}]" do context "when successfully fetching a remote configuration" do + let(:logger_stub) { double("Logger").as_null_object } before :each do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + allow_any_instance_of(described_class).to receive(:logger).and_return(logger_stub) allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) end @@ -426,13 +456,29 @@ expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) end - it "ignores non-whitelisted and invalid settings" do + it "applies allowed settings and logs warning about ignored settings" do pipeline_config = subject.pipeline_configs - settings_hash = pipeline_config[0].settings.to_hash - - expect(settings_hash["pipeline.workers"]).to eq(99) - expect(settings_hash["pipeline.output.workers"]).not_to eq(99) - expect(settings_hash["nonsensical.invalid.setting"]).to be_falsey + pipeline_settings = pipeline_config[0].settings + + aggregate_failures do + # explicitly given settings + expect(pipeline_settings.get_setting("pipeline.workers")).to be_set.and(have_attributes(value: 99)) + expect(pipeline_settings.get_setting("pipeline.batch.delay")).to be_set.and(have_attributes(value: 50)) + expect(pipeline_settings.get_setting("pipeline.ordered")).to be_set.and(have_attributes(value: "false")) + expect(pipeline_settings.get_setting("pipeline.ecs_compatibility")).to be_set.and(have_attributes(value: "v1")) + + # valid non-provided settings + expect(pipeline_settings.get_setting("queue.type")).to_not be_set + + # invalid provided settings + %w( + pipeline.output.workers + nonsensical.invalid.setting + ).each do |invalid_setting| + expect(pipeline_settings.registered?(invalid_setting)).to be false + expect(logger_stub).to have_received(:warn).with(/Ignoring .+ '#{Regexp.quote(invalid_setting)}'/) + end + end end end