diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index f76bffb5773..dbc0a64419d 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -244,18 +244,28 @@ def start_workers filter_queue_client.set_batch_dimensions(batch_size, batch_delay) - pipeline_workers.times do |t| + # First launch WorkerLoop initialization in separate threads which concurrently + # compiles and initializes the worker pipelines + + worker_loops = pipeline_workers.times + .map { Thread.new { init_worker_loop } } + .map(&:value) + + fail("Some worker(s) were not correctly initialized") if worker_loops.any?{|v| v.nil?} + + # Once all WorkerLoop have been initialized run them in separate threads + + worker_loops.each_with_index do |worker_loop, t| thread = Thread.new do Util.set_thread_name("[#{pipeline_id}]>worker#{t}") ThreadContext.put("pipeline.id", pipeline_id) - org.logstash.execution.WorkerLoop.new( - lir_execution, filter_queue_client, @events_filtered, @events_consumed, - @flushRequested, @flushing, @shutdownRequested, @drain_queue).run + worker_loop.run end @worker_threads << thread end - # inputs should be started last, after all workers + # Finally inputs should be started last, after all workers have been initialized and started + begin start_inputs rescue => e @@ -466,6 +476,26 @@ def inspect private + # @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception + def init_worker_loop + begin + org.logstash.execution.WorkerLoop.new( + lir_execution, + filter_queue_client, + @events_filtered, + @events_consumed, + @flushRequested, + @flushing, + @shutdownRequested, + @drain_queue) + rescue => e + @logger.error( + "Worker loop initialization error", + default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) + nil + end + end + def maybe_setup_out_plugins if @outputs_registered.make_true register_plugins(outputs)