Skip to content

Commit

Permalink
Merge pull request #1264 from fluent/wakeup-threads-in-shutdown-seque…
Browse files Browse the repository at this point in the history
…nce-to-quit-sleep

Fix to wakeup threads to quit loop in threads
  • Loading branch information
tagomoris authored Oct 7, 2016
2 parents 0eb12ba + 9cfa22e commit ca20c54
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
23 changes: 17 additions & 6 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def initialize
@secondary = nil
@retry = nil
@dequeued_chunks = nil
@output_enqueue_thread = nil
@output_flush_threads = nil

@simple_chunking = nil
Expand Down Expand Up @@ -367,6 +368,9 @@ def start

@buffer.start

@output_enqueue_thread = nil
@output_enqueue_thread_running = true

@output_flush_threads = []
@output_flush_threads_mutex = Mutex.new
@output_flush_threads_running = true
Expand Down Expand Up @@ -397,7 +401,7 @@ def start

unless @in_tests
if @flush_mode == :interval || @chunk_key_time
thread_create(:enqueue_thread, &method(:enqueue_thread_run))
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
end
end
Expand All @@ -424,6 +428,12 @@ def before_shutdown
force_flush
end
@buffer.before_shutdown
# Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data
@output_enqueue_thread_running = false
if @output_enqueue_thread && @output_enqueue_thread.alive?
@output_enqueue_thread.wakeup
@output_enqueue_thread.join
end
end

super
Expand Down Expand Up @@ -1115,7 +1125,7 @@ def enqueue_thread_run
log.debug "enqueue_thread actually running"

begin
while @output_flush_threads_running
while @output_enqueue_thread_running
now_int = Time.now.to_i
if @output_flush_interrupted
sleep interval
Expand All @@ -1140,16 +1150,17 @@ def enqueue_thread_run
end
rescue => e
raise if @under_plugin_development
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error_class: e.class, error: e
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e
log.error_backtrace
ensure
@output_enqueue_thread_waiting = false
@output_enqueue_thread_mutex.unlock
end
@output_enqueue_thread_waiting = false
@output_enqueue_thread_mutex.unlock
sleep interval
end
rescue => e
# normal errors are rescued by inner begin-rescue clause.
log.error "error on enqueue thread", plugin_id: plugin_id, error_class: e.class, error: e
log.error "error on enqueue thread", plugin_id: plugin_id, error: e
log.error_backtrace
raise
end
Expand Down
24 changes: 22 additions & 2 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def thread_create(title)
thread_exit = true
raise
ensure
unless thread_exit
if ::Thread.current.alive? && !thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
@_threads_mutex.synchronize do
Expand Down Expand Up @@ -110,11 +110,31 @@ def initialize

def stop
super
wakeup_threads = []
@_threads_mutex.synchronize do
@_threads.each_pair do |obj_id, thread|
@_threads.each_value do |thread|
thread[:_fluentd_plugin_helper_thread_running] = false
wakeup_threads << thread if thread.alive? && thread.status == "sleep"
end
end
wakeup_threads.each do |thread|
if thread.alive?
thread.wakeup
end
end
end

def after_shutdown
super
wakeup_threads = []
@_threads_mutex.synchronize do
@_threads.each_value do |thread|
wakeup_threads << thread if thread.alive? && thread.status == "sleep"
end
end
wakeup_threads.each do |thread|
thread.wakeup if thread.alive?
end
end

def close
Expand Down

0 comments on commit ca20c54

Please sign in to comment.