Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,28 @@ def start_workers

filter_queue_client.set_batch_dimensions(batch_size, batch_delay)

pipeline_workers.times do |t|
# First launch WorkerLoop initialization in separate threads which concurrently
# compiles and initializes the worker pipelines

worker_loops = pipeline_workers.times
.map { Thread.new { init_worker_loop } }
.map(&:value)

fail("Some worker(s) were not correctly initialized") if worker_loops.any?{|v| v.nil?}

# Once all WorkerLoop have been initialized run them in separate threads

worker_loops.each_with_index do |worker_loop, t|
thread = Thread.new do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
org.logstash.execution.WorkerLoop.new(
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
worker_loop.run
end
@worker_threads << thread
end

# inputs should be started last, after all workers
# Finally inputs should be started last, after all workers have been initialized and started

begin
start_inputs
rescue => e
Expand Down Expand Up @@ -466,6 +476,26 @@ def inspect

private

# @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception
def init_worker_loop
begin
org.logstash.execution.WorkerLoop.new(
lir_execution,
filter_queue_client,
@events_filtered,
@events_consumed,
@flushRequested,
@flushing,
@shutdownRequested,
@drain_queue)
rescue => e
@logger.error(
"Worker loop initialization error",
default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n")))
nil
end
end

def maybe_setup_out_plugins
if @outputs_registered.make_true
register_plugins(outputs)
Expand Down