Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to wakeup threads to quit loop in threads #1264

Merged
merged 3 commits into from
Oct 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def initialize
@secondary = nil
@retry = nil
@dequeued_chunks = nil
@output_enqueue_thread = nil
@output_flush_threads = nil

@simple_chunking = nil
Expand Down Expand Up @@ -359,6 +360,9 @@ def start

@buffer.start

@output_enqueue_thread = nil
@output_enqueue_thread_running = true

@output_flush_threads = []
@output_flush_threads_mutex = Mutex.new
@output_flush_threads_running = true
Expand Down Expand Up @@ -389,7 +393,7 @@ def start

unless @in_tests
if @flush_mode == :interval || @chunk_key_time
thread_create(:enqueue_thread, &method(:enqueue_thread_run))
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
end
end
Expand All @@ -416,6 +420,12 @@ def before_shutdown
force_flush
end
@buffer.before_shutdown
# Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data
@output_enqueue_thread_running = false
if @output_enqueue_thread && @output_enqueue_thread.alive?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check status?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do so. Living threads are running or sleeping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

@output_enqueue_thread.wakeup
@output_enqueue_thread.join
end
end

super
Expand Down Expand Up @@ -1107,7 +1117,7 @@ def enqueue_thread_run
log.debug "enqueue_thread actually running"

begin
while @output_flush_threads_running
while @output_enqueue_thread_running
now_int = Time.now.to_i
if @output_flush_interrupted
sleep interval
Expand All @@ -1132,16 +1142,17 @@ def enqueue_thread_run
end
rescue => e
raise if @under_plugin_development
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error_class: e.class, error: e
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e
log.error_backtrace
ensure
@output_enqueue_thread_waiting = false
@output_enqueue_thread_mutex.unlock
end
@output_enqueue_thread_waiting = false
@output_enqueue_thread_mutex.unlock
sleep interval
end
rescue => e
# normal errors are rescued by inner begin-rescue clause.
log.error "error on enqueue thread", plugin_id: plugin_id, error_class: e.class, error: e
log.error "error on enqueue thread", plugin_id: plugin_id, error: e
log.error_backtrace
raise
end
Expand Down
24 changes: 22 additions & 2 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def thread_create(title)
thread_exit = true
raise
ensure
unless thread_exit
if ::Thread.current.alive? && !thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
@_threads_mutex.synchronize do
Expand Down Expand Up @@ -110,11 +110,31 @@ def initialize

def stop
super
wakeup_threads = []
@_threads_mutex.synchronize do
@_threads.each_pair do |obj_id, thread|
@_threads.each_value do |thread|
thread[:_fluentd_plugin_helper_thread_running] = false
wakeup_threads << thread if thread.alive? && thread.status == "sleep"
end
end
wakeup_threads.each do |thread|
if thread.alive?
thread.wakeup
end
end
end

def after_shutdown
super
wakeup_threads = []
@_threads_mutex.synchronize do
@_threads.each_value do |thread|
wakeup_threads << thread if thread.alive? && thread.status == "sleep"
end
end
wakeup_threads.each do |thread|
thread.wakeup if thread.alive?
end
end

def close
Expand Down