Skip to content

Commit

Permalink
add tests for PluginHelper, and fixes many bugs found
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Mar 15, 2016
1 parent be98274 commit 6271788
Show file tree
Hide file tree
Showing 14 changed files with 970 additions and 56 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ def flush
@out.flush
end

def reset
@out.reset if @out.respond_to?(:reset)
end

private

def dump_stacktrace(backtrace, level)
Expand Down
74 changes: 67 additions & 7 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,73 @@ class Base
include PluginLoggerMixin
include PluginHelper::Mixin

def initialize; end
def configure(conf); end
def start; end
def stop; end
def shutdown; end
def close; end
def terminate; end
State = Struct.new(:configure, :start, :stop, :shutdown, :close, :terminate)

def initialize
super
@state = State.new(false, false, false, false, false, false)
end

def has_router?
false
end

def configure(conf)
super
@state.configure = true
self
end

def start
@log.reset
@state.start = true
self
end

def stop
@state.stop = true
self
end

def shutdown
@state.shutdown = true
self
end

def close
@state.close = true
self
end

def terminate
@state.terminate = true
@log.reset
self
end

def configured?
@state.configure
end

def started?
@state.start
end

def stopped?
@state.stop
end

def shutdown?
@state.shutdown
end

def closed?
@state.close
end

def terminated?
@state.terminate
end
end
end
end
77 changes: 50 additions & 27 deletions lib/fluent/plugin_helper/child_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,44 @@ module ChildProcess

# stop : [-]
# shutdown : send TERM to all child processes
# close : close all I/O objects for child processes
# terminate: send TERM again and again if processes stil exist, and KILL after timeout
# close : close all I/O objects for child processes, send TERM and then KILL after timeout
# terminate: [-]

def child_process_execute(title, command, arguments: nil, subprocess_name: nil, read: true, write: true, encoding: 'utf-8', interval: nil, immediate: false, &block)
attr_reader :_child_process_processes # for tests

def child_process_execute(
title, command,
arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false,
read: true, write: true, internal_encoding: 'utf-8', external_encoding: 'utf-8', &block
)
raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments
raise ArgumentError, "BUG: both of input and output are disabled" if !read && !write
raise ArgumentError, "BUG: block not specified which receive i/o object" unless block_given?
raise ArgumentError, "BUG: block must have an argument for io" unless block.arity == 1

if interval
if immediate
child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
running = false
callback = ->(io) {
running = true
begin
block.call(io)
ensure
running = false
end
}

if immediate || !interval
child_process_execute_once(title, command, arguments, subprocess_name, read, write, internal_encoding, external_encoding, &callback)
end

if interval
timer_execute(:child_process_execute, interval, repeat: true) do
child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
if !parallel && running
log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel
else
child_process_execute_once(title, command, arguments, subprocess_name, read, write, internal_encoding, external_encoding, &callback)
end
end
else
child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
end
end

Expand Down Expand Up @@ -79,17 +99,20 @@ def shutdown
def close
super

# close_write -> kill processes -> close_read
# * if ext process continues to write data to it's stdout, io.close_read blocks
# so killing process MUST be done before closing io for read

@_child_process_processes.keys.each do |pid|
io = @_child_process_processes[pid].io
io.close rescue nil
io.close_write rescue nil
end
end

def terminate
alive_process_exist = true
timeout = Time.now + @_child_process_kill_timeout

while alive_process_exist
while true
break if Time.now > timeout

@_child_process_processes.keys.each do |pid|
process = @_child_process_processes[pid]
next unless process.alive
Expand All @@ -108,12 +131,9 @@ def terminate
@_child_process_processes.each_pair do |pid, process|
alive_process_found = true if process.alive
end
break unless alive_process_found

if alive_process_found
sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
else
alive_process_exist = false
end
sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
end

@_child_process_processes.keys.each do |pid|
Expand All @@ -125,14 +145,15 @@ def terminate
@_child_process_processes.delete(pid)
end

super
@_child_process_processes.keys.each do |pid|
io = @_child_process_processes[pid].io
io.close_read rescue nil
end
end

ProcessInfo = Struct.new(:thread, :io, :alive)

def child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
ext_enc = encoding
int_enc = "utf-8"
def child_process_execute_once(title, command, arguments, subprocess_name, read, write, internal_encoding, external_encoding, &block)
mode = case
when read && write then "r+"
when read then "r"
Expand All @@ -141,22 +162,22 @@ def child_process_execute_once(title, command, arguments, subprocess_name, read,
raise "BUG: both read and write are false is banned before here"
end
cmd = command
if arguments # if arguments is nil, then command is executed by shell
if arguments || subprocess_name # if arguments is nil, then command is executed by shell
cmd = []
if subprocess_name
cmd.push [command, subprocess_name]
else
cmd.push command
end
cmd += arguments
cmd += (arguments || [])
end
log.debug "Executing command", title: title, command: command, arguments: arguments, read: read, write: write
io = IO.popen(cmd, mode, external_encoding: ext_enc, internal_encoding: int_enc)
io = IO.popen(cmd, mode, external_encoding: external_encoding, internal_encoding: internal_encoding)
pid = io.pid

m = Mutex.new
m.lock
thread = thread_create do
thread = thread_create :child_process_callback do
m.lock # run after plugin thread get pid, thread instance and i/o
with_error = false
m.unlock
Expand All @@ -173,6 +194,8 @@ def child_process_execute_once(title, command, arguments, subprocess_name, read,
rescue => e
log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error_class: e.class, error: e
end
io.close rescue nil
@_child_process_processes.delete(pid)
end
@_child_process_processes[pid] = ProcessInfo.new(thread, io, true)
m.unlock
Expand Down
7 changes: 3 additions & 4 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ def configure(conf)
end
end

def emits?
def has_router?
true
end

def shutdown
def close
super

@router = nil # TODO: is it correct shutdown phase to disable router?
@router = nil
end
end
end
Expand Down
15 changes: 13 additions & 2 deletions lib/fluent/plugin_helper/event_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,29 @@
module Fluent
module PluginHelper
module EventLoop
# Currently this plugin helper is only for other helpers, not plugins.
# there's no way to create customized watchers to attach event loops.
include Fluent::PluginHelper::Thread

# stop : [-]
# shutdown : detach all event watchers on event loop
# close : stop event loop
# terminate: initialize internal state

EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.2
EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.5

attr_reader :_event_loop # for tests

def event_loop_attach(watcher)
@_event_loop_mutex.synchronize do
@_event_loop.attach(watcher)
end
end

def event_loop_wait_until_start
::Thread.pass until event_loop_running?
end

def event_loop_running?
@_event_loop_running
end
Expand Down Expand Up @@ -65,6 +73,9 @@ def shutdown
@_event_loop_mutex.synchronize do
@_event_loop.watchers.each {|w| w.detach if w.attached? }
end
while @_event_loop_running
::Thread.pass
end

super
end
Expand All @@ -85,7 +96,7 @@ def terminate
@_event_loop = nil
@_event_loop_running = false
@_event_loop_mutex = nil
@_event_loop_run_timeout = EVENT_LOOP_RUN_DEFAULT_TIMEOUT
@_event_loop_run_timeout = nil

super
end
Expand Down
Loading

0 comments on commit 6271788

Please sign in to comment.