Skip to content

Commit

Permalink
add #after_start method to detect finishing of calling #start methods
Browse files Browse the repository at this point in the history
This is to make a detection that #start method completely finished.
Without this method, flush/enqueue threads will run without consideration even if Output#start takes
long time to finish. Output#start may need long time to finish if it tries network connection or
service detection over internet.
  • Loading branch information
tagomoris committed Aug 29, 2016
1 parent 2f40a5b commit bd5841e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
15 changes: 12 additions & 3 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ class Base
include Configurable
include SystemConfig::Mixin

State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)
State = Struct.new(:configure, :start, :after_start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false)
@_state = State.new(false, false, false, false, false, false, false, false, false)
end

def has_router?
Expand All @@ -37,7 +37,7 @@ def has_router?

def configure(conf)
super
@_state ||= State.new(false, false, false, false, false, false, false, false)
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
@_state.configure = true
self
end
Expand All @@ -47,6 +47,11 @@ def start
self
end

def after_start
@_state.after_start = true
self
end

def stop
@_state.stop = true
self
Expand Down Expand Up @@ -85,6 +90,10 @@ def started?
@_state.start
end

def after_started?
@_state.after_start
end

def stopped?
@_state.stop
end
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ def start
@secondary.start if @secondary
end

def after_start
super
@secondary.after_start if @secondary
end

def stop
@secondary.stop if @secondary
@buffer.stop if @buffering && @buffer
Expand Down Expand Up @@ -922,6 +927,11 @@ def enqueue_thread_run
interval = @buffer_config.flush_thread_interval
end

while !self.after_started? && !self.stopped?
sleep interval
end
log.debug "enqueue_thread actually running"

begin
while @output_flush_threads_running
now_int = Time.now.to_i
Expand Down Expand Up @@ -969,6 +979,11 @@ def flush_thread_run(state)
clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW
state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval

while !self.after_started? && !self.stopped?
sleep flush_thread_interval
end
log.debug "flush_thread actually running"

begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def start
lifecycle(desc: true) do |i| # instance
i.start unless i.started?
end
lifecycle(desc: true) do |i|
i.after_start unless i.after_started?
end
end

def flush!
Expand Down

0 comments on commit bd5841e

Please sign in to comment.