From 82d94f3bff7e8417abded909739a56414acd724d Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 5 Oct 2016 21:16:18 +0900 Subject: [PATCH 1/3] Fix to wakeup threads to quit loop in threads It's done before returning from stop/after_shutdown, but after setting flag to break while-loop. This change suppresses log messages like 'thread doesn't exit correctly (killed or other reason)' in shutdown sequence. --- lib/fluent/plugin/output.rb | 11 ++++++----- lib/fluent/plugin_helper/thread.rb | 24 ++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index a33a344ae1..2c0c23f967 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1107,7 +1107,7 @@ def enqueue_thread_run log.debug "enqueue_thread actually running" begin - while @output_flush_threads_running + while @output_flush_threads_running && thread_current_running? now_int = Time.now.to_i if @output_flush_interrupted sleep interval @@ -1132,16 +1132,17 @@ def enqueue_thread_run end rescue => e raise if @under_plugin_development - log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error_class: e.class, error: e + log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e log.error_backtrace + ensure + @output_enqueue_thread_waiting = false + @output_enqueue_thread_mutex.unlock end - @output_enqueue_thread_waiting = false - @output_enqueue_thread_mutex.unlock sleep interval end rescue => e # normal errors are rescued by inner begin-rescue clause. - log.error "error on enqueue thread", plugin_id: plugin_id, error_class: e.class, error: e + log.error "error on enqueue thread", plugin_id: plugin_id, error: e log.error_backtrace raise end diff --git a/lib/fluent/plugin_helper/thread.rb b/lib/fluent/plugin_helper/thread.rb index 8202a79483..1003e16702 100644 --- a/lib/fluent/plugin_helper/thread.rb +++ b/lib/fluent/plugin_helper/thread.rb @@ -70,7 +70,7 @@ def thread_create(title) thread_exit = true raise ensure - unless thread_exit + if ::Thread.current.alive? && !thread_exit log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $! end @_threads_mutex.synchronize do @@ -110,11 +110,31 @@ def initialize def stop super + wakeup_threads = [] @_threads_mutex.synchronize do - @_threads.each_pair do |obj_id, thread| + @_threads.values.each do |thread| thread[:_fluentd_plugin_helper_thread_running] = false + wakeup_threads << thread if thread.alive? && thread.status == "sleep" end end + wakeup_threads.each do |thread| + if thread.alive? + thread.wakeup + end + end + end + + def after_shutdown + super + wakeup_threads = [] + @_threads_mutex.synchronize do + @_threads.values.each do |thread| + wakeup_threads << thread if thread.alive? && thread.status == "sleep" + end + end + wakeup_threads.each do |thread| + thread.wakeup if thread.alive? + end end def close From 1fd916434a8a7eb4157455a0986401cfc361b3b8 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 6 Oct 2016 19:18:36 +0900 Subject: [PATCH 2/3] ensure to stop enqueue thread before stopping flush threads --- lib/fluent/plugin/output.rb | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 2c0c23f967..95d3ba06db 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -181,6 +181,7 @@ def initialize @secondary = nil @retry = nil @dequeued_chunks = nil + @output_enqueue_thread = nil @output_flush_threads = nil @simple_chunking = nil @@ -359,6 +360,9 @@ def start @buffer.start + @output_enqueue_thread = nil + @output_enqueue_thread_running = true + @output_flush_threads = [] @output_flush_threads_mutex = Mutex.new @output_flush_threads_running = true @@ -389,7 +393,7 @@ def start unless @in_tests if @flush_mode == :interval || @chunk_key_time - thread_create(:enqueue_thread, &method(:enqueue_thread_run)) + @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run)) end end end @@ -416,6 +420,12 @@ def before_shutdown force_flush end @buffer.before_shutdown + # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data + @output_enqueue_thread_running = false + if @output_enqueue_thread && @output_enqueue_thread.alive? + @output_enqueue_thread.wakeup + @output_enqueue_thread.join + end end super @@ -1107,7 +1117,7 @@ def enqueue_thread_run log.debug "enqueue_thread actually running" begin - while @output_flush_threads_running && thread_current_running? + while @output_enqueue_thread_running now_int = Time.now.to_i if @output_flush_interrupted sleep interval From 9cfa22e4b781b80212d0bf3b02000d321f6a6eb7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 7 Oct 2016 15:07:38 +0900 Subject: [PATCH 3/3] not to instantiate temp array --- lib/fluent/plugin_helper/thread.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin_helper/thread.rb b/lib/fluent/plugin_helper/thread.rb index 1003e16702..c02f645e2b 100644 --- a/lib/fluent/plugin_helper/thread.rb +++ b/lib/fluent/plugin_helper/thread.rb @@ -112,7 +112,7 @@ def stop super wakeup_threads = [] @_threads_mutex.synchronize do - @_threads.values.each do |thread| + @_threads.each_value do |thread| thread[:_fluentd_plugin_helper_thread_running] = false wakeup_threads << thread if thread.alive? && thread.status == "sleep" end @@ -128,7 +128,7 @@ def after_shutdown super wakeup_threads = [] @_threads_mutex.synchronize do - @_threads.values.each do |thread| + @_threads.each_value do |thread| wakeup_threads << thread if thread.alive? && thread.status == "sleep" end end