Skip to content

Commit

Permalink
Merge pull request #965 from fluent/fix-lifecycle-control
Browse files Browse the repository at this point in the history
Fix lifecycle control
  • Loading branch information
tagomoris committed May 20, 2016
2 parents f41ab4f + 0133a4b commit 5512f83
Show file tree
Hide file tree
Showing 22 changed files with 123 additions and 77 deletions.
36 changes: 12 additions & 24 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def initialize
@system_config = SystemConfig.new
end

MAINLOOP_SLEEP_INTERVAL = 0.3

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

Expand Down Expand Up @@ -177,38 +179,24 @@ def run
@log_emit_thread = Thread.new(&method(:log_event_loop))
end

unless @engine_stopped
# for empty loop
@default_loop = Coolio::Loop.default
@default_loop.attach Coolio::TimerWatcher.new(1, true)
# TODO attach async watch for thread pool
@default_loop.run
end

if @engine_stopped and @default_loop
@default_loop.stop
@default_loop = nil
end
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped

rescue => e
rescue Exception => e
$log.error "unexpected error", error: e
$log.error_backtrace
ensure
$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
raise
end

$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
end

def stop
@engine_stopped = true
if @default_loop
@default_loop.stop
@default_loop = nil
end
nil
end

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_debug_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def configure(conf)
end

def start
super

if @unix_path
require 'drb/unix'
uri = "drbunix:#{@unix_path}"
Expand All @@ -55,6 +57,8 @@ def start

def shutdown
@server.stop_service if @server

super
end
end
end
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def start
def shutdown
@running = false
@thread.join
super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def setup_parser(conf)
end

def start
super

if @run_interval
@finished = false
@thread = Thread.new(&method(:run_periodic))
Expand Down Expand Up @@ -134,6 +136,8 @@ def shutdown
end
@thread.join
end

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def configure(conf)
end

def start
super

@loop = Coolio::Loop.new

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
Expand Down Expand Up @@ -89,6 +91,8 @@ def shutdown
@usock.close
@thread.join
@lsock.close

super
end

def listen(client)
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_gc_stat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def configure(conf)
end

def start
super

@loop = Coolio::Loop.new
@timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
@loop.attach(@timer)
Expand All @@ -60,6 +62,8 @@ def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join

super
end

def run
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def start
detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
#@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format, log,
@cors_allow_origins)
Expand All @@ -131,6 +130,8 @@ def shutdown
@loop.stop
@lsock.close
@thread.join

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def on_timer
end

def start
super

log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
@srv = WEBrick::HTTPServer.new({
BindAddress: @bind,
Expand Down Expand Up @@ -290,6 +292,8 @@ def shutdown
@thread_for_emit.join
@thread_for_emit = nil
end

super
end

MONITOR_INFO = {
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_object_space.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def configure(conf)
end

def start
super

@loop = Coolio::Loop.new
@timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
@loop.attach(@timer)
Expand All @@ -62,6 +64,8 @@ def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def initialize
end

def start
super

@loop = Coolio::Loop.new
@lsock = listen
@loop.attach(@lsock)
Expand All @@ -46,6 +48,8 @@ def shutdown
@loop.stop
@lsock.close
@thread.join

super
end

#def listen
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def configure(conf)
end

def start
super

callback = if @use_default
method(:receive_data)
else
Expand All @@ -128,6 +130,8 @@ def shutdown
@loop.stop
@handler.close
@thread.join

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def configure_tag
end

def start
super

if @pos_file
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
Expand All @@ -139,6 +141,8 @@ def shutdown
@loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
@thread.join
@pf_file.close if @pf_file

super
end

def expand_paths
Expand Down
16 changes: 10 additions & 6 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,19 @@ def configure(conf)
end

def start
@outputs.each {|o|
o.start
}
super

@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each {|o|
o.shutdown
}
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/plugin/out_exec_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,22 @@ def start
end

def before_shutdown
super
log.debug "out_exec_filter#before_shutdown called"
@children.each {|c|
c.finished = true
}
sleep 0.5 # TODO wait time before killing child process
end

def shutdown
super
end

def shutdown
@children.reject! {|c|
c.shutdown
true
}

super
end

def format_stream(tag, es)
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def shutdown
end
@thread.join if @thread
@usock.close if @usock

super
end

def run
Expand Down
14 changes: 0 additions & 14 deletions lib/fluent/plugin/out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ module Fluent
class NullOutput < Output
Plugin.register_output('null', self)

def initialize
super
end

def configure(conf)
super
end

def start
end

def shutdown
end

def emit(tag, es, chain)
chain.next
end
Expand Down
16 changes: 10 additions & 6 deletions lib/fluent/plugin/out_roundrobin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,21 @@ def configure(conf)
end

def start
super

rebuild_weight_array

@outputs.each {|o|
o.start
}
@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each {|o|
o.shutdown
}
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
Expand Down
Loading

0 comments on commit 5512f83

Please sign in to comment.