Skip to content

Commit 416693c

Browse files
author
Anthony Comtois
committed
implement thread based tailwatcher
Signed-off-by: Anthony Comtois <[email protected]>
1 parent 39370c4 commit 416693c

File tree

1 file changed

+36
-17
lines changed

1 file changed

+36
-17
lines changed

lib/fluent/plugin/in_tail.rb

+36-17
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def start
208208
@threads['in_tail_refresh_watchers'] = Thread.new(@refresh_interval) do |refresh_interval|
209209
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
210210
end
211+
@threads['in_tail_refresh_watchers'].priority = 10 # Default is zero; higher-priority threads will run before lower-priority threads.
211212

212213
@threads.each { |thr|
213214
thr.join
@@ -289,6 +290,11 @@ def refresh_watchers
289290

290291
stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
291292
start_watchers(added) unless added.empty?
293+
294+
log.debug "Thread refresh_watchers"
295+
@threads.each { |thr|
296+
log.debug "Thread #{thr[0]} #{thr[1].status}"
297+
}
292298
end
293299

294300
def setup_watcher(path, pe)
@@ -312,26 +318,37 @@ def setup_watcher(path, pe)
312318

313319
def start_watchers(paths)
314320
paths.each { |path|
315-
@threads[path] = Thread.new(path) do |path|
316-
pe = nil
317-
if @pf
318-
pe = @pf[path]
319-
if @read_from_head && pe.read_inode.zero?
320-
begin
321-
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
322-
rescue Errno::ENOENT
323-
$log.warn "#{path} not found. Continuing without tailing it."
321+
unless @threads[path].nil?
322+
log.debug "Check Thread #{path} #{@threads[path].status}"
323+
if @threads[path].status != "sleep" and @threads[path].status != "run"
324+
log.debug "Stopping Thread #{path} #{@threads[path].status}"
325+
@threads[path].exit
326+
@threads.delete(path)
327+
end
328+
end
329+
if @threads[path].nil?
330+
log.debug "Add Thread #{path}"
331+
@threads[path] = Thread.new(path) do |path|
332+
pe = nil
333+
if @pf
334+
pe = @pf[path]
335+
if @read_from_head && pe.read_inode.zero?
336+
begin
337+
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
338+
rescue Errno::ENOENT
339+
$log.warn "#{path} not found. Continuing without tailing it."
340+
end
324341
end
325342
end
326-
end
327343

328-
begin
329-
tw = setup_watcher(path, pe)
330-
rescue WatcherSetupError => e
331-
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
332-
next
344+
begin
345+
tw = setup_watcher(path, pe)
346+
rescue WatcherSetupError => e
347+
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
348+
next
349+
end
350+
@tails[path] = tw
333351
end
334-
@tails[path] = tw
335352
end
336353
}
337354
end
@@ -375,6 +392,7 @@ def update_watcher(path, pe)
375392
end
376393
end
377394
rotated_tw = @tails[path]
395+
378396
@tails[path] = setup_watcher(path, pe)
379397
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw
380398
end
@@ -781,14 +799,15 @@ def handle_notify
781799
number_bytes_read += bytes_to_read
782800
limit_bytes_per_second_reached = (number_bytes_read >= @watcher.read_bytes_limit_per_second and @watcher.read_bytes_limit_per_second > 0)
783801

802+
@watcher.log.debug("reading file: #{ @watcher.path}")
784803
if @lines.size >= @watcher.read_lines_limit or limit_bytes_per_second_reached
785804
# not to use too much memory in case the file is very large
786805
read_more = true
787806

788807
if limit_bytes_per_second_reached
789808
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
790809
time_spent_reading = Time.new - start_reading
791-
@watcher.log.debug("time_spent_reading: #{time_spent_reading}")
810+
@watcher.log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
792811
if (time_spent_reading < 1)
793812
debug_time = 1 - time_spent_reading
794813
@watcher.log.debug("sleep: #{debug_time}")

0 commit comments

Comments
 (0)