Skip to content

Commit

Permalink
implement thread based tailwatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Comtois committed Nov 18, 2019
1 parent d4b3614 commit 8ab733c
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def start
@threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval|
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end
@threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads.

@threads.each { |thr|
thr.join
Expand Down Expand Up @@ -289,6 +290,11 @@ def refresh_watchers

stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?

log.debug "Thread refresh_watchers"
@threads.each { |thr|
log.debug "Thread #{thr[0]} #{thr[1].status}"
}
end

def setup_watcher(path, pe)
Expand All @@ -312,26 +318,37 @@ def setup_watcher(path, pe)

def start_watchers(paths)
paths.each { |path|
@threads[path] = Thread.new(path) do |path|
pe = nil
if @pf
pe = @pf[path]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
unless @threads[path].nil?
log.debug "Check Thread #{path} #{@threads[path].status}"
if @threads[path].status != "sleep" and @threads[path].status != "run"
log.debug "Stopping Thread #{path} #{@threads[path].status}"
@threads[path].exit
@threads.delete(path)
end
end
if @threads[path].nil?
log.debug "Add Thread #{path}"
@threads[path] = Thread.new(path) do |path|
pe = nil
if @pf
pe = @pf[path]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
end
end
end
end

begin
tw = setup_watcher(path, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
begin
tw = setup_watcher(path, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
end
@tails[path] = tw
end
@tails[path] = tw
end
}
end
Expand Down Expand Up @@ -375,6 +392,7 @@ def update_watcher(path, pe)
end
end
rotated_tw = @tails[path]

@tails[path] = setup_watcher(path, pe)
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end
Expand Down Expand Up @@ -781,14 +799,15 @@ def handle_notify
number_bytes_read += bytes_to_read
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)

@watcher.log.debug("reading file: #{ @watcher.path}")
if @lines.size >= @watcher.read_lines_limit or limit_bytes_per_second_reached
# not to use too much memory in case the file is very large
read_more = true

if limit_bytes_per_second_reached
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Time.new - start_reading
@watcher.log.debug("time_spent_reading: #{time_spent_reading}")
@watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if (time_spent_reading < 1)
debug_time = 1 - time_spent_reading
@watcher.log.debug("sleep: #{debug_time}")
Expand Down

0 comments on commit 8ab733c

Please sign in to comment.