Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Fix rotation related resource leak. fix #1941 #2105

Merged
merged 2 commits into from
Aug 20, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,16 @@ def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
end
tw
rescue => e
if tw
tw.detach
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.close
end
raise e
Expand Down Expand Up @@ -343,7 +346,10 @@ def update_watcher(path, pe)
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
tw.detach
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
Expand All @@ -352,9 +358,14 @@ def detach_watcher(tw, close_io = true)
end

def detach_watcher_after_rotate_wait(tw)
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
# Call event_loop_attach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly.
checker = ->() { true }
detacher = ->(watcher) { event_loop_detach(watcher) }
timer = Fluent::PluginHelper::Timer::TimerWatcher.new(:closer, @rotate_wait, false, log, checker, detacher) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you extract timer_execute?
The above comment does not describe the reason to extract timer_execute.

I think we should create another PR for extracting timer_execute and use @event_loop_directly.

BTW, how about adding TODO: to the above comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I forgot to back to timer_execute implementation.
Fix soon.

detach_watcher(tw)
end
}
event_loop_attach(timer)
end

def flush_buffer(tw)
Expand Down Expand Up @@ -479,7 +490,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
@update_watcher = update_watcher

@stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil
@timer_trigger = nil
@timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil

@rotate_handler = RotateHandler.new(self, &method(:on_rotate))
@io_handler = nil
Expand Down Expand Up @@ -513,8 +524,7 @@ def attach
end

def detach
@timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached?
@stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached?
yield self
@io_handler.on_notify if @io_handler
end

Expand Down Expand Up @@ -613,6 +623,21 @@ def swap_state(pe)
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class StatWatcher < Coolio::StatWatcher
def initialize(watcher, &callback)
@watcher = watcher
Expand All @@ -629,7 +654,6 @@ def on_change(prev, cur)
end
end


class FIFO
def initialize(from_encoding, encoding)
@from_encoding = from_encoding
Expand Down