From 1c79a234ef7711bf1a77771dba458481aec3ec5d Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 8 Nov 2018 09:53:14 +0900 Subject: [PATCH] Remove `Thread#run` to avoid unexpected interruption. Thread#run causes unexpected interruption and it affects the behaviour of some libraries, e.g. DataError in zlib. Use condition variable instead to handle thread execution. This change is based on jl2005's #1984 patch. Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/output.rb | 71 +++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0efcab423c..d3cf709da7 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -154,7 +154,7 @@ def multi_workers_ready? end # Internal states - FlushThreadState = Struct.new(:thread, :next_clock) + FlushThreadState = Struct.new(:thread, :next_clock, :mutex, :cond_var) DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do def expired? time + timeout < Time.now @@ -440,7 +440,7 @@ def start @buffer_config.flush_thread_count.times do |i| thread_title = "flush_thread_#{i}".to_sym - thread_state = FlushThreadState.new(nil, nil) + thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new) thread = thread_create(thread_title) do flush_thread_run(thread_state) end @@ -506,9 +506,14 @@ def after_shutdown @output_flush_threads_running = false if @output_flush_threads && !@output_flush_threads.empty? @output_flush_threads.each do |state| - state.thread.run if state.thread.alive? # to wakeup thread and make it to stop by itself - end - @output_flush_threads.each do |state| + # to wakeup thread and make it to stop by itself + state.mutex.synchronize { + if state.thread && state.thread.status + state.next_clock = 0 + state.cond_var.signal + end + } + Thread.pass state.thread.join end end @@ -1254,12 +1259,15 @@ def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] - state.next_clock = 0 - if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) - state.thread.run - else - log.warn "thread is already dead" - end + state.mutex.synchronize { + if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) + state.next_clock = 0 + state.cond_var.signal + else + log.warn "thread is already dead" + end + } + Thread.pass end def force_flush @@ -1296,8 +1304,13 @@ def enqueue_thread_wait # only for tests of output plugin def flush_thread_wakeup @output_flush_threads.each do |state| - state.next_clock = 0 - state.thread.run + state.mutex.synchronize { + if state.thread && state.thread.status + state.next_clock = 0 + state.cond_var.signal + end + } + Thread.pass end end @@ -1376,6 +1389,7 @@ def flush_thread_run(state) end log.debug "flush_thread actually running" + state.mutex.lock begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running @@ -1391,23 +1405,32 @@ def flush_thread_run(state) elsif next_retry_time && next_retry_time > Time.now interval = next_retry_time.to_f - Time.now.to_f else - try_flush - - # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) - interval = next_flush_time.to_f - Time.now.to_f - # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected - # because @retry still exists (#commit_write is not called yet in #try_flush) - # @retry should be cleared if delayed commit is enabled? Or any other solution? - state.next_clock = Fluent::Clock.now + interval + state.mutex.unlock + begin + try_flush + # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) + interval = next_flush_time.to_f - Time.now.to_f + # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected + # because @retry still exists (#commit_write is not called yet in #try_flush) + # @retry should be cleared if delayed commit is enabled? Or any other solution? + state.next_clock = Fluent::Clock.now + interval + ensure + state.mutex.lock + end end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } unless @output_flush_interrupted - try_rollback_write + state.mutex.unlock + begin + try_rollback_write + ensure + state.mutex.lock + end end end - sleep interval if interval > 0 + state.cond_var.wait(state.mutex, interval) if interval > 0 end rescue => e # normal errors are rescued by output plugins in #try_flush @@ -1415,6 +1438,8 @@ def flush_thread_run(state) log.error "error on output thread", error: e log.error_backtrace raise + ensure + state.mutex.unlock end end end