Skip to content

Commit

Permalink
Merge pull request #2170 from fluent/output-remove-thread-run
Browse files Browse the repository at this point in the history
Remove `Thread#run` to avoid unexpected interruption.
  • Loading branch information
repeatedly authored Nov 9, 2018
2 parents 8ed8aeb + 1c79a23 commit f287c60
Showing 1 changed file with 48 additions and 23 deletions.
71 changes: 48 additions & 23 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def multi_workers_ready?
end

# Internal states
FlushThreadState = Struct.new(:thread, :next_clock)
FlushThreadState = Struct.new(:thread, :next_clock, :mutex, :cond_var)
DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do
def expired?
time + timeout < Time.now
Expand Down Expand Up @@ -443,7 +443,7 @@ def start

@buffer_config.flush_thread_count.times do |i|
thread_title = "flush_thread_#{i}".to_sym
thread_state = FlushThreadState.new(nil, nil)
thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new)
thread = thread_create(thread_title) do
flush_thread_run(thread_state)
end
Expand Down Expand Up @@ -509,9 +509,14 @@ def after_shutdown
@output_flush_threads_running = false
if @output_flush_threads && !@output_flush_threads.empty?
@output_flush_threads.each do |state|
state.thread.run if state.thread.alive? # to wakeup thread and make it to stop by itself
end
@output_flush_threads.each do |state|
# to wakeup thread and make it to stop by itself
state.mutex.synchronize {
if state.thread && state.thread.status
state.next_clock = 0
state.cond_var.signal
end
}
Thread.pass
state.thread.join
end
end
Expand Down Expand Up @@ -1263,12 +1268,15 @@ def submit_flush_once
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
state.next_clock = 0
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.thread.run
else
log.warn "thread is already dead"
end
state.mutex.synchronize {
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.next_clock = 0
state.cond_var.signal
else
log.warn "thread is already dead"
end
}
Thread.pass
end

def force_flush
Expand Down Expand Up @@ -1305,8 +1313,13 @@ def enqueue_thread_wait
# only for tests of output plugin
def flush_thread_wakeup
@output_flush_threads.each do |state|
state.next_clock = 0
state.thread.run
state.mutex.synchronize {
if state.thread && state.thread.status
state.next_clock = 0
state.cond_var.signal
end
}
Thread.pass
end
end

Expand Down Expand Up @@ -1385,6 +1398,7 @@ def flush_thread_run(state)
end
log.debug "flush_thread actually running"

state.mutex.lock
begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
Expand All @@ -1400,30 +1414,41 @@ def flush_thread_run(state)
elsif next_retry_time && next_retry_time > Time.now
interval = next_retry_time.to_f - Time.now.to_f
else
try_flush

# next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
interval = next_flush_time.to_f - Time.now.to_f
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
# because @retry still exists (#commit_write is not called yet in #try_flush)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_clock = Fluent::Clock.now + interval
state.mutex.unlock
begin
try_flush
# next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
interval = next_flush_time.to_f - Time.now.to_f
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
# because @retry still exists (#commit_write is not called yet in #try_flush)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_clock = Fluent::Clock.now + interval
ensure
state.mutex.lock
end
end

if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
unless @output_flush_interrupted
try_rollback_write
state.mutex.unlock
begin
try_rollback_write
ensure
state.mutex.lock
end
end
end

sleep interval if interval > 0
state.cond_var.wait(state.mutex, interval) if interval > 0
end
rescue => e
# normal errors are rescued by output plugins in #try_flush
# so this rescue section is for critical & unrecoverable errors
log.error "error on output thread", error: e
log.error_backtrace
raise
ensure
state.mutex.unlock
end
end
end
Expand Down

0 comments on commit f287c60

Please sign in to comment.