Skip to content

Commit

Permalink
add tests for PluginHelper, and fixes many bugs found
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Mar 14, 2016
1 parent be98274 commit 89274c9
Show file tree
Hide file tree
Showing 14 changed files with 960 additions and 51 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ def flush
@out.flush
end

def reset
@out.reset if @out.respond_to?(:reset)
end

private

def dump_stacktrace(backtrace, level)
Expand Down
74 changes: 67 additions & 7 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,73 @@ class Base
include PluginLoggerMixin
include PluginHelper::Mixin

def initialize; end
def configure(conf); end
def start; end
def stop; end
def shutdown; end
def close; end
def terminate; end
State = Struct.new(:configure, :start, :stop, :shutdown, :close, :terminate)

def initialize
super
@state = State.new(false, false, false, false, false, false)
end

def has_router?
false
end

def configure(conf)
super
@state.configure = true
self
end

def start
@log.reset
@state.start = true
self
end

def stop
@state.stop = true
self
end

def shutdown
@state.shutdown = true
self
end

def close
@state.close = true
self
end

def terminate
@state.terminate = true
@log.reset
self
end

def configured?
@state.configure
end

def started?
@state.start
end

def stopped?
@state.stop
end

def shutdown?
@state.shutdown
end

def closed?
@state.close
end

def terminated?
@state.terminate
end
end
end
end
77 changes: 50 additions & 27 deletions lib/fluent/plugin_helper/child_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,44 @@ module ChildProcess

# stop : [-]
# shutdown : send TERM to all child processes
# close : close all I/O objects for child processes
# terminate: send TERM again and again if processes stil exist, and KILL after timeout
# close : close all I/O objects for child processes, send TERM and then KILL after timeout
# terminate: [-]

def child_process_execute(title, command, arguments: nil, subprocess_name: nil, read: true, write: true, encoding: 'utf-8', interval: nil, immediate: false, &block)
attr_reader :_child_process_processes # for tests

def child_process_execute(
title, command,
arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false,
read: true, write: true, internal_encoding: 'utf-8', external_encoding: 'utf-8', &block
)
raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments
raise ArgumentError, "BUG: both of input and output are disabled" if !read && !write
raise ArgumentError, "BUG: block not specified which receive i/o object" unless block_given?
raise ArgumentError, "BUG: block must have an argument for io" unless block.arity == 1

if interval
if immediate
child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
running = false
callback = ->(io) {
running = true
begin
block.call(io)
ensure
running = false
end
}

if immediate || !interval
child_process_execute_once(title, command, arguments, subprocess_name, read, write, internal_encoding, external_encoding, &callback)
end

if interval
timer_execute(:child_process_execute, interval, repeat: true) do
child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
if !parallel && running
log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel
else
child_process_execute_once(title, command, arguments, subprocess_name, read, write, internal_encoding, external_encoding, &callback)
end
end
else
child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
end
end

Expand Down Expand Up @@ -79,17 +99,20 @@ def shutdown
def close
super

# close_write -> kill processes -> close_read
# * if ext process continues to write data to it's stdout, io.close_read blocks
# so killing process MUST be done before closing io for read

@_child_process_processes.keys.each do |pid|
io = @_child_process_processes[pid].io
io.close rescue nil
io.close_write rescue nil
end
end

def terminate
alive_process_exist = true
timeout = Time.now + @_child_process_kill_timeout

while alive_process_exist
while true
break if Time.now > timeout

@_child_process_processes.keys.each do |pid|
process = @_child_process_processes[pid]
next unless process.alive
Expand All @@ -108,12 +131,9 @@ def terminate
@_child_process_processes.each_pair do |pid, process|
alive_process_found = true if process.alive
end
break unless alive_process_found

if alive_process_found
sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
else
alive_process_exist = false
end
sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
end

@_child_process_processes.keys.each do |pid|
Expand All @@ -125,14 +145,15 @@ def terminate
@_child_process_processes.delete(pid)
end

super
@_child_process_processes.keys.each do |pid|
io = @_child_process_processes[pid].io
io.close_read rescue nil
end
end

ProcessInfo = Struct.new(:thread, :io, :alive)

def child_process_execute_once(title, command, arguments, subprocess_name, read, write, encoding, &block)
ext_enc = encoding
int_enc = "utf-8"
def child_process_execute_once(title, command, arguments, subprocess_name, read, write, internal_encoding, external_encoding, &block)
mode = case
when read && write then "r+"
when read then "r"
Expand All @@ -141,22 +162,22 @@ def child_process_execute_once(title, command, arguments, subprocess_name, read,
raise "BUG: both read and write are false is banned before here"
end
cmd = command
if arguments # if arguments is nil, then command is executed by shell
if arguments || subprocess_name # if arguments is nil, then command is executed by shell
cmd = []
if subprocess_name
cmd.push [command, subprocess_name]
else
cmd.push command
end
cmd += arguments
cmd += (arguments || [])
end
log.debug "Executing command", title: title, command: command, arguments: arguments, read: read, write: write
io = IO.popen(cmd, mode, external_encoding: ext_enc, internal_encoding: int_enc)
io = IO.popen(cmd, mode, external_encoding: external_encoding, internal_encoding: internal_encoding)
pid = io.pid

m = Mutex.new
m.lock
thread = thread_create do
thread = thread_create :child_process_callback do
m.lock # run after plugin thread get pid, thread instance and i/o
with_error = false
m.unlock
Expand All @@ -173,6 +194,8 @@ def child_process_execute_once(title, command, arguments, subprocess_name, read,
rescue => e
log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error_class: e.class, error: e
end
io.close rescue nil
@_child_process_processes.delete(pid)
end
@_child_process_processes[pid] = ProcessInfo.new(thread, io, true)
m.unlock
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def configure(conf)
end
end

def emits?
def has_router?
true
end

Expand Down
13 changes: 12 additions & 1 deletion lib/fluent/plugin_helper/event_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,29 @@
module Fluent
module PluginHelper
module EventLoop
# Currently this plugin helper is only for other helpers, not plugins.
# there's no way to create customized watchers to attach event loops.
include Fluent::PluginHelper::Thread

# stop : [-]
# shutdown : detach all event watchers on event loop
# close : stop event loop
# terminate: initialize internal state

EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.2
EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.5

attr_reader :_event_loop # for tests

def event_loop_attach(watcher)
@_event_loop_mutex.synchronize do
@_event_loop.attach(watcher)
end
end

def event_loop_wait_until_start
::Thread.pass until event_loop_running?
end

def event_loop_running?
@_event_loop_running
end
Expand Down Expand Up @@ -65,6 +73,9 @@ def shutdown
@_event_loop_mutex.synchronize do
@_event_loop.watchers.each {|w| w.detach if w.attached? }
end
while @_event_loop_running
::Thread.pass
end

super
end
Expand Down
40 changes: 29 additions & 11 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,56 +27,74 @@ module Thread
attr_reader :_threads # for test driver

def thread_current_running?
# checker for code in callback of thread_create
::Thread.current[:_fluentd_plugin_helper_thread_running] || false
end

def thread_wait_until_start
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && t[:_fluentd_plugin_helper_thread_started] } }
::Thread.pass
end
end

def thread_create(title, *args)
raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
raise ArgumentError, "BUG: callback not specified" unless block_given?
m = Mutex.new
m.lock
thread = ::Thread.new(*args) do |*t_args|
m.lock # run thread after that thread is successfully set into @_thread
::Thread.current[:_fluentd_plugin_helper_thread_title] = title
::Thread.current[:_fluentd_plugin_helper_thread_running] = true
m.lock # run thread after that thread is successfully set into @_threads
m.unlock
thread_exit = false
::Thread.current[:_fluentd_plugin_helper_thread_title] = title
::Thread.current[:_fluentd_plugin_helper_thread_started] = true
::Thread.current[:_fluentd_plugin_helper_thread_running] = true
begin
yield *t_args
thread_exit = true
rescue => e
log.warn "thread exited by unexpected error", plugin: self.class, title: title, error_class: e.class, error: e
thread_exit = true
raise
ensure
unless thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title
end
@_threads.delete(::Thread.current.object_id)
@_threads_mutex.synchronize do
@_threads.delete(::Thread.current.object_id)
end
::Thread.current[:_fluentd_plugin_helper_thread_running] = false
end
end
thread.abort_on_exception = true
@_threads[thread.object_id] = thread
@_threads_mutex.synchronize do
@_threads[thread.object_id] = thread
end
m.unlock
thread
end

def initialize
super
@_threads_mutex = Mutex.new
@_threads = {}
@_thread_wait_seconds = THREAD_DEFAULT_WAIT_SECONDS
end

def stop
super
@_threads.each_pair do |obj_id, thread|
thread[:_fluentd_plugin_helper_thread_running] = false
@_threads_mutex.synchronize do
@_threads.each_pair do |obj_id, thread|
thread[:_fluentd_plugin_helper_thread_running] = false
end
end
end

def close
@_threads.keys.each do |obj_id|
@_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
thread = @_threads[obj_id]
if !thread || thread.join(@_thread_wait_seconds)
@_threads.delete(obj_id)
@_threads_mutex.synchronize{ @_threads.delete(obj_id) }
end
end

Expand All @@ -85,13 +103,13 @@ def close

def terminate
super
@_threads.keys.each do |obj_id|
@_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
thread = @_threads[obj_id]
if thread
thread.kill
thread.join
end
@_threads.delete(obj_id)
@_threads_mutex.synchronize{ @_threads.delete(obj_id) }
end
end
end
Expand Down
Loading

0 comments on commit 89274c9

Please sign in to comment.