Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.13.0
- add technology preview support for allowing events to individually encode a default pipeline with `[@metadata][target_ingest_pipeline]` (as part of a technology preview, this feature may change without notice) [#1113](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1113)

## 11.12.4
- Changed the `manage_template` default value to `false` when data streams is enabled [#1111](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1111)
- Added the `manage_template => false` as a valid data stream option
Expand Down
7 changes: 3 additions & 4 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -849,12 +849,11 @@ not also set this field. That will raise an error at startup
===== `pipeline`

* Value type is <<string,string>>
* Default value is `nil`
* There is no default value.

Set which ingest pipeline you wish to execute for an event. You can also use
event dependent configuration here like `pipeline =>
"%{[@metadata][pipeline]}"`. The pipeline parameter won't be set if the value
resolves to empty string ("").
event dependent configuration here like `pipeline => "%{[@metadata][pipeline]}"`.
The pipeline parameter won't be set if the value resolves to empty string ("").

[id="plugins-{type}s-{plugin}-pool_max"]
===== `pool_max`
Expand Down
21 changes: 12 additions & 9 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -524,19 +524,22 @@ def common_event_params(event)
routing_field_name => @routing ? event.sprintf(@routing) : nil
}

if @pipeline
value = event.sprintf(@pipeline)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = value unless value.empty?
end
target_pipeline = resolve_pipeline(event)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)

params
end

def resolve_pipeline(event)
pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s
pipeline_template && event.sprintf(pipeline_template)
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

@@plugins.each do |plugin|
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.12.4'
s.version = '11.13.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
63 changes: 61 additions & 2 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@

let(:event) { LogStash::Event.new("pipeline" => "my-ingest-pipeline") }

it "should interpolate the pipeline value and set it" do
it "interpolate the pipeline value and set it" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline")
end
end
Expand All @@ -600,7 +600,66 @@

let(:event) { LogStash::Event.new("pipeline" => "") }

it "should interpolate the pipeline value but not set it because it is empty" do
it "interpolates the pipeline value but not set it because it is empty" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end

context "with both pipeline and target_ingest_pipeline" do
let(:options) { {"pipeline" => "%{pipeline}" } }
let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", "[@metadata][target_ingest_pipeline]" => "meta-ingest-pipeline"}) }

it "interpolates the plugin's pipeline value" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline")
end

context "when the plugin's `pipeline` is constant" do
let(:options) { super().merge("pipeline" => "my-constant-pipeline") }
it "uses plugin's pipeline value" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-constant-pipeline")
end
end

context "when the plugin's `pipeline` includes an unresolvable sprintf placeholder" do
let(:options) { super().merge("pipeline" => "reference-%{unset}-field") }
it "does not use the target_ingest_pipeline" do
# when sprintf doesn't resolve a placeholder, the behaviour of our `pipeline` is UNSPECIFIED.
# here we only validate that the presence of the magic field does not
# override an explicitly-configured pipeline.
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:pipeline => "my-ingest-pipeline")
end
end
end

context "with empty pipeline and target_ingest_pipeline" do
let(:options) { {"pipeline" => "%{pipeline}" } }
let(:event) { LogStash::Event.new({"pipeline" => "", "[@metadata][target_ingest_pipeline]" => "meta-ingest-pipeline"}) }

it "interpolates the pipeline value but not set it because pipeline is empty" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end

context "with target_ingest_pipeline" do
let(:event) { LogStash::Event.new({"pipeline" => "", "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline"}}) }

it "interpolates the target_ingest_pipeline value and set it" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "meta-ingest-pipeline")
end
end

context "with empty target_ingest_pipeline" do
let(:event) { LogStash::Event.new({"pipeline" => "", "@metadata" => {"host" => "elastic"}}) }

it "does not set pipeline" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end

context "with empty pipeline and empty target_ingest_pipeline" do
let(:event) { LogStash::Event.new }

it "does not set pipeline" do
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
end
end
Expand Down