Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
## 11.13.0
- Add support for `pipeline` to be decoded from `[@metadata][target_ingest_pipeline]` [#1113](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1113)
- Changed the `manage_template` default value to `false` when data streams is enabled [#1111](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1111)
- Added the `manage_template => false` as a valid data stream option

Expand Down
6 changes: 3 additions & 3 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,9 @@ not also set this field. That will raise an error at startup
* Default value is `nil`

Set which ingest pipeline you wish to execute for an event. You can also use
event dependent configuration here like `pipeline =>
"%{[@metadata][pipeline]}"`. The pipeline parameter won't be set if the value
resolves to empty string ("").
event dependent configuration here like `pipeline => "%{[@metadata][pipeline]}"`.
If `pipeline` is not set, the plugin takes the value from `[@metadata][target_ingest_pipeline]`.
The pipeline parameter won't be set if the value resolves to empty string ("").

[id="plugins-{type}s-{plugin}-pool_max"]
===== `pool_max`
Expand Down
21 changes: 12 additions & 9 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -524,19 +524,22 @@ def common_event_params(event)
routing_field_name => @routing ? event.sprintf(@routing) : nil
}

if @pipeline
value = event.sprintf(@pipeline)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = value unless value.empty?
end
target_pipeline = resolve_pipeline(event)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)

params
end

def resolve_pipeline(event)
pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s
pipeline_template && event.sprintf(pipeline_template)
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

@@plugins.each do |plugin|
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.12.3'
s.version = '11.13.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
42 changes: 42 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,48 @@
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end

context "with both pipeline and target_ingest_pipeline" do
let(:options) { {"pipeline" => "%{pipeline}" } }
let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", "[@metadata][target_ingest_pipeline]" => "meta-ingest-pipeline"}) }

it "should interpolate the pipeline value and set it" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline")
end
end

context "with empty pipeline and target_ingest_pipeline" do
let(:options) { {"pipeline" => "%{pipeline}" } }
let(:event) { LogStash::Event.new({"pipeline" => "", "[@metadata][target_ingest_pipeline]" => "meta-ingest-pipeline"}) }

it "should interpolate the pipeline value but not set it because pipeline is empty" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end

context "with target_ingest_pipeline" do
let(:event) { LogStash::Event.new({"pipeline" => "", "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline"}}) }

it "should interpolate the target_ingest_pipeline value and set it" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "meta-ingest-pipeline")
end
end

context "with empty target_ingest_pipeline" do
let(:event) { LogStash::Event.new({"pipeline" => "", "@metadata" => {"host" => "elastic"}}) }

it "should not set pipeline" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end

context "with empty pipeline and empty target_ingest_pipeline" do
let(:event) { LogStash::Event.new }

it "should interpolate the pipeline value but not set it because it is empty" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end
end

describe "the manage_template option" do
Expand Down