Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Fix rotation related resource leak. fix #1941 #2105

Merged
merged 2 commits into from
Aug 20, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,16 @@ def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
end
tw
rescue => e
if tw
tw.detach
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.close
end
raise e
Expand Down Expand Up @@ -343,7 +346,10 @@ def update_watcher(path, pe)
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
tw.detach
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
Expand All @@ -352,6 +358,8 @@ def detach_watcher(tw, close_io = true)
end

def detach_watcher_after_rotate_wait(tw)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
end
Expand Down Expand Up @@ -479,7 +487,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
@update_watcher = update_watcher

@stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil
@timer_trigger = nil
@timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil

@rotate_handler = RotateHandler.new(self, &method(:on_rotate))
@io_handler = nil
Expand Down Expand Up @@ -513,8 +521,7 @@ def attach
end

def detach
@timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached?
@stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached?
yield self
@io_handler.on_notify if @io_handler
end

Expand Down Expand Up @@ -613,6 +620,21 @@ def swap_state(pe)
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class StatWatcher < Coolio::StatWatcher
def initialize(watcher, &callback)
@watcher = watcher
Expand All @@ -629,7 +651,6 @@ def on_change(prev, cur)
end
end


class FIFO
def initialize(from_encoding, encoding)
@from_encoding = from_encoding
Expand Down