From a4ec3f272295cc5df59dc6cf0b82d0fc136cad69 Mon Sep 17 00:00:00 2001 From: Anthony Comtois Date: Mon, 18 Nov 2019 20:41:01 +0000 Subject: [PATCH] implement thread based tailwatcher --- lib/fluent/plugin/in_tail.rb | 53 ++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 19739a08a5..2ba4b5292f 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -208,6 +208,7 @@ def start @threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval| timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) end + @threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads. @threads.each { |thr| thr.join @@ -289,6 +290,11 @@ def refresh_watchers stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty? start_watchers(added) unless added.empty? + + log.debug "Thread refresh_watchers" + @threads.each { |thr| + log.debug "Thread #{thr[0]} #{thr[1].status}" + } end def setup_watcher(path, pe) @@ -312,26 +318,37 @@ def setup_watcher(path, pe) def start_watchers(paths) paths.each { |path| - @threads[path] = Thread.new(path) do |path| - pe = nil - if @pf - pe = @pf[path] - if @read_from_head && pe.read_inode.zero? - begin - pe.update(Fluent::FileWrapper.stat(path).ino, 0) - rescue Errno::ENOENT - $log.warn "#{path} not found. Continuing without tailing it." + unless @threads[path].nil? + log.debug "Check Thread #{path} #{@threads[path].status}" + if @threads[path].status != "sleep" and @threads[path].status != "run" + log.debug "Stopping Thread #{path} #{@threads[path].status}" + @threads[path].exit + @threads.delete(path) + end + end + if @threads[path].nil? + log.debug "Add Thread #{path}" + @threads[path] = Thread.new(path) do |path| + pe = nil + if @pf + pe = @pf[path] + if @read_from_head && pe.read_inode.zero? + begin + pe.update(Fluent::FileWrapper.stat(path).ino, 0) + rescue Errno::ENOENT + $log.warn "#{path} not found. Continuing without tailing it." + end end end - end - begin - tw = setup_watcher(path, pe) - rescue WatcherSetupError => e - log.warn "Skip #{path} because unexpected setup error happens: #{e}" - next + begin + tw = setup_watcher(path, pe) + rescue WatcherSetupError => e + log.warn "Skip #{path} because unexpected setup error happens: #{e}" + next + end + @tails[path] = tw end - @tails[path] = tw end } end @@ -375,6 +392,7 @@ def update_watcher(path, pe) end end rotated_tw = @tails[path] + @tails[path] = setup_watcher(path, pe) detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw end @@ -781,6 +799,7 @@ def handle_notify number_bytes_read += bytes_to_read limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0) + @watcher.log.debug("reading file: #{ @watcher.path}") if @lines.size >= @watcher.read_lines_limit or limit_bytes_per_second_reached # not to use too much memory in case the file is very large read_more = true @@ -788,7 +807,7 @@ def handle_notify if limit_bytes_per_second_reached # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion time_spent_reading = Time.new - start_reading - @watcher.log.debug("time_spent_reading: #{time_spent_reading}") + @watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if (time_spent_reading < 1) debug_time = 1 - time_spent_reading @watcher.log.debug("sleep: #{debug_time}")