Skip to content

Commit

Permalink
fix code to use child_process pluginn helper
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Nov 1, 2016
1 parent 79f78f9 commit 618069b
Showing 1 changed file with 84 additions and 141 deletions.
225 changes: 84 additions & 141 deletions lib/fluent/plugin/out_exec_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ExecFilterOutput < Output
desc 'The number of spawned process for command.'
config_param :num_children, :integer, default: 1

desc 'Respawn command when command exit.'
desc 'Respawn command when command exit. ["none", "inf" or positive integer for times to respawn (defaut: none)]'
# nil, 'none' or 0: no respawn, 'inf' or -1: infinite times, positive integer: try to respawn specified times only
config_param :child_respawn, :string, default: nil

Expand Down Expand Up @@ -173,45 +173,86 @@ def configure(conf)
@next_log_time = Time.now.to_i
end

ExecutedProcess = Struct.new(:mutex, :pid, :respawns, :readio, :writeio)

def start
super

receiver = case
when @parser.implement?(:parse_io) then method(:run_with_io)
when @parser.implement?(:parse_partial_data) then method(:run_with_partial_read)
else method(:run)
end

@children_mutex = Mutex.new
@children = []
@rr = 0
begin
@num_children.times do
c = ChildProcess.new(receiver, @respawns, log)
c.start(@command)
@children << c

exit_callback = ->(status){
c = @children.select{|child| child.pid == status.pid }.first
if c
log.warn "child process exits with error code", code: status.to_i, status: status.exitstatus, signal: status.termsig
c.mutex.synchronize do
(c.writeio && c.writeio.close) rescue nil
(c.readio && c.readio.close) rescue nil
c.pid = c.readio = c.writeio = nil
end
end
}
child_process_callback = ->(index, readio, writeio){
pid = child_process_id
c = @children[index]
writeio.sync = true
c.mutex.synchronize do
c.pid = pid
c.respawns = @respawns
c.readio = readio
c.writeio = writeio
end

run(readio)
}
execute_child_process = ->(index){
child_process_execute("out_exec_filter_child#{index}".to_sym, @command, on_exit_callback: exit_callback) do |readio, writeio|
child_process_callback.call(index, readio, writeio)
end
}

@children_mutex.synchronize do
@num_children.times do |i|
@children << ExecutedProcess.new(Mutex.new, nil, 0, nil, nil)
execute_child_process.call(i)
end
end

if @respawns != 0
thread_create do
while thread_current_running?
@children.each_with_index do |c, i|
if c.mutex && c.mutex.synchronize{ c.pid.nil? && c.respawns != 0 }
respawns = c.mutex.synchronize do
c.respawns -= 1 if c.respanws > 0
c.respawns
end
log.info "respawning child process", num: i, respawn_counter: respawns
execute_child_process.call(i)
end
end
sleep 0.2
end
end
rescue
shutdown
raise
end
end

def before_shutdown
log.debug "out_exec_filter#before_shutdown called"
@children.each {|c|
c.finished = true
}
sleep 0.5 # TODO wait time before killing child process
def stop
@children.each_with_index do |c, i|
c.mutex.synchronize do
if c.pid
c.writeio.close rescue nil
c.writeio = nil
end
end
end

super
end

def shutdown
@children.reject! {|c|
c.shutdown
true
}

def terminate
@children = []
super
end

Expand All @@ -235,131 +276,33 @@ def format(tag, time, record)
end

def write(chunk)
r = @rr = (@rr + 1) % @children.length
@children[r].write chunk
end

class ChildProcess
attr_accessor :finished

def initialize(parser_proc, respawns=0, log = $log)
@pid = nil
@thread = nil
@parser_proc = parser_proc
@respawns = respawns
@mutex = Mutex.new
@finished = nil
@log = log
end

def start(command)
@command = command
@mutex.synchronize do
@io = IO.popen(command, "r+")
@pid = @io.pid
@io.sync = true
@thread = Thread.new(&method(:run))
end
@finished = false
end

def kill_child(join_wait)
begin
signal = Fluent.windows? ? :KILL : :TERM
Process.kill(signal, @pid)
rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
# Errno::ESRCH 'No such process', ignore
# child process killed by signal chained from fluentd process
end
if @thread.join(join_wait)
# @thread successfully shutdown
return
end
begin
Process.kill(:KILL, @pid)
rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
# ignore if successfully killed by :TERM
end
@thread.join
end

def shutdown
@finished = true
@mutex.synchronize do
kill_child(60) # TODO wait time
end
end

def write(chunk)
begin
chunk.write_to(@io)
rescue Errno::EPIPE => e
# Broken pipe (child process unexpectedly exited)
@log.warn "exec_filter Broken pipe, child process maybe exited.", command: @command
if try_respawn
retry # retry chunk#write_to with child respawned
else
raise e # to retry #write with other ChildProcess instance (when num_children > 1)
end
end
end

def try_respawn
return false if @respawns == 0
@mutex.synchronize do
return false if @respawns == 0

kill_child(5) # TODO wait time

@io = IO.popen(@command, "r+")
@pid = @io.pid
@io.sync = true
@thread = ::Thread.new(&method(:run))

@respawns -= 1 if @respawns > 0
end
@log.warn "exec_filter child process successfully respawned.", command: @command, respawns: @respawns
true
end

def run
@parser_proc.call(@io)
rescue => e
@log.error "exec_filter thread unexpectedly failed with an error.", command: @command, error: e
@log.warn_backtrace e.backtrace
ensure
_pid, stat = Process.waitpid2(@pid)
unless @finished
@log.error "exec_filter process unexpectedly exited.", command: @command, ecode: stat.to_i
unless @respawns == 0
@log.warn "exec_filter child process will respawn for next input data (respawns #{@respawns})."
end
try_times = 0
while true
r = @rr = (@rr + 1) % @children.length
if @children[r].pid && writeio = @children[r].writeio
chunk.write_to(writeio)
break
end
try_times += 1
raise "no healthy child processes exist" if try_times >= @children.length
end
end

def run_with_io(io)
@parser.parse_io(io, &method(:on_record))
end

def run_with_partial_read(io)
until io.eof?
@parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
end
rescue EOFError
# ignore
end

def run(io)
if @parser.parser_type == :text_per_line
case
when @parser.implement?(:parse_io)
@parser.parse_io(io, &method(:on_record))
when @parser.implement?(:parse_partial_data)
until io.eof?
@parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
end
when @parser.parser_type == :text_per_line
io.each_line do |line|
@parser.parse(line.chomp, &method(:on_record))
end
else
@parser.parse(io.read, &method(:on_record))
end
rescue EOFError
# ignore
end

def on_record(time, record)
Expand Down

0 comments on commit 618069b

Please sign in to comment.