Skip to content

Commit 244a9f4

Browse files
authored
delete pipeline in registry (#12414)
deletes the pipeline in the pipelines_registry if it is terminated and is removed in the source Fixed: #12414
1 parent 6bb2bd6 commit 244a9f4

File tree

6 files changed

+155
-3
lines changed

6 files changed

+155
-3
lines changed

logstash-core/lib/logstash/pipeline_action.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
require "logstash/pipeline_action/create"
2020
require "logstash/pipeline_action/stop"
2121
require "logstash/pipeline_action/reload"
22+
require "logstash/pipeline_action/delete"
2223

2324
module LogStash module PipelineAction
2425
ORDERING = {
2526
LogStash::PipelineAction::Create => 100,
2627
LogStash::PipelineAction::Reload => 200,
27-
LogStash::PipelineAction::Stop => 300
28+
LogStash::PipelineAction::Stop => 300,
29+
LogStash::PipelineAction::Delete => 400
2830
}
2931
end end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
require "logstash/pipeline_action/base"
19+
20+
module LogStash module PipelineAction
21+
class Delete < Base
22+
attr_reader :pipeline_id
23+
24+
def initialize(pipeline_id)
25+
@pipeline_id = pipeline_id
26+
end
27+
28+
def execute(agent, pipelines_registry)
29+
success = pipelines_registry.delete_pipeline(@pipeline_id)
30+
31+
LogStash::ConvergeResult::ActionResult.create(self, success)
32+
end
33+
34+
def to_s
35+
"PipelineAction::Delete<#{pipeline_id}>"
36+
end
37+
end
38+
end end

logstash-core/lib/logstash/pipelines_registry.rb

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ def put(pipeline_id, state)
7676
def remove(pipeline_id)
7777
@lock.synchronize do
7878
@states.delete(pipeline_id)
79-
@locks.delete(pipeline_id)
8079
end
8180
end
8281

@@ -209,6 +208,32 @@ def reload_pipeline(pipeline_id, &reload_block)
209208
lock.unlock
210209
end
211210

211+
# Delete the pipeline that is terminated
212+
# @param pipeline_id [String, Symbol] the pipeline id
213+
# @return [Boolean] pipeline delete success
214+
def delete_pipeline(pipeline_id)
215+
lock = @states.get_lock(pipeline_id)
216+
lock.lock
217+
218+
state = @states.get(pipeline_id)
219+
220+
if state.nil?
221+
logger.error("Attempted to delete a pipeline that does not exists", :pipeline_id => pipeline_id)
222+
return false
223+
end
224+
225+
if state.terminated?
226+
@states.remove(pipeline_id)
227+
logger.info("Removed pipeline from registry successfully", :pipeline_id => pipeline_id)
228+
return true
229+
else
230+
logger.info("Attempted to delete a pipeline that is not terminated", :pipeline_id => pipeline_id)
231+
return false
232+
end
233+
ensure
234+
lock.unlock
235+
end
236+
212237
# @param pipeline_id [String, Symbol] the pipeline id
213238
# @return [Pipeline] the pipeline object or nil if none for pipeline_id
214239
def get_pipeline(pipeline_id)

logstash-core/lib/logstash/state_resolver.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,19 @@ def resolve(pipelines_registry, pipeline_configs)
4141
end
4242
end
4343

44-
configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym }
44+
configured_pipelines = pipeline_configs.each_with_object(Set.new) { |config, set| set.add(config.pipeline_id.to_sym) }
4545

4646
# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
4747
# stop it.
4848
pipelines_registry.running_pipelines.keys
4949
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
5050
.each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }
5151

52+
# If one of the terminated pipeline is not in the pipeline_configs, delete it in registry.
53+
pipelines_registry.non_running_pipelines.keys
54+
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
55+
.each { |pipeline_id| actions << LogStash::PipelineAction::Delete.new(pipeline_id)}
56+
5257
actions.sort # See logstash/pipeline_action.rb
5358
end
5459
end

logstash-core/spec/logstash/pipelines_registry_spec.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,48 @@
228228
end
229229
end
230230

231+
context "deleting a pipeline" do
232+
context "when pipeline is in registry" do
233+
before :each do
234+
subject.create_pipeline(pipeline_id, pipeline) { true }
235+
end
236+
237+
it "should not delete pipeline if pipeline is not terminated" do
238+
expect(pipeline).to receive(:finished_execution?).and_return(false)
239+
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
240+
expect(logger).to receive(:info)
241+
expect(subject.delete_pipeline(pipeline_id)).to be_falsey
242+
expect(subject.get_pipeline(pipeline_id)).not_to be_nil
243+
end
244+
245+
it "should delete pipeline if pipeline is terminated" do
246+
expect(pipeline).to receive(:finished_execution?).and_return(true)
247+
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
248+
expect(logger).to receive(:info)
249+
expect(subject.delete_pipeline(pipeline_id)).to be_truthy
250+
expect(subject.get_pipeline(pipeline_id)).to be_nil
251+
end
252+
253+
it "should recreate pipeline if pipeline is delete and create again" do
254+
expect(pipeline).to receive(:finished_execution?).and_return(true)
255+
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
256+
expect(logger).to receive(:info)
257+
expect(subject.delete_pipeline(pipeline_id)).to be_truthy
258+
expect(subject.get_pipeline(pipeline_id)).to be_nil
259+
subject.create_pipeline(pipeline_id, pipeline) { true }
260+
expect(subject.get_pipeline(pipeline_id)).not_to be_nil
261+
end
262+
end
263+
264+
context "when pipeline is not in registry" do
265+
it "should log error" do
266+
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
267+
expect(logger).to receive(:error)
268+
expect(subject.delete_pipeline(pipeline_id)).to be_falsey
269+
end
270+
end
271+
end
272+
231273
context "pipelines collections" do
232274
context "with a non terminated pipelines" do
233275
before :each do

logstash-core/spec/logstash/state_resolver_spec.rb

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,5 +172,45 @@
172172
end
173173
end
174174
end
175+
176+
context "when a pipeline stops" do
177+
let(:main_pipeline) { mock_pipeline(:main) }
178+
let(:main_pipeline_config) { main_pipeline.pipeline_config }
179+
let(:pipelines) do
180+
r = LogStash::PipelinesRegistry.new
181+
r.create_pipeline(:main, main_pipeline) { true }
182+
r
183+
end
184+
185+
before do
186+
expect(main_pipeline).to receive(:finished_execution?).at_least(:once).and_return(true)
187+
end
188+
189+
context "when pipeline config contains a new one and the existing" do
190+
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }
191+
192+
it "creates the new one and keep the other one stop" do
193+
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:create, :hello_world])
194+
expect(pipelines.non_running_pipelines.size).to eq(1)
195+
end
196+
end
197+
198+
context "when pipeline config contains an updated pipeline" do
199+
let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }
200+
201+
it "should reload the stopped pipeline" do
202+
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:reload, :main])
203+
end
204+
end
205+
206+
context "when pipeline config contains no pipeline" do
207+
let(:pipeline_configs) { [] }
208+
209+
it "should delete the stopped one" do
210+
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:delete, :main])
211+
end
212+
end
213+
end
214+
175215
end
176216
end

0 commit comments

Comments
 (0)