Skip to content

Commit

Permalink
Merge pull request #3182 from fluent/follow-inodes
Browse files Browse the repository at this point in the history
Add follow_inodes parameter on in_tail
  • Loading branch information
cosmo0920 authored Dec 9, 2020
2 parents 9110685 + 08ee2da commit 3c85807
Show file tree
Hide file tree
Showing 4 changed files with 558 additions and 101 deletions.
133 changes: 97 additions & 36 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def initialize
config_param :ignore_repeated_permission_error, :bool, default: false
desc 'Format path with the specified timezone'
config_param :path_timezone, :string, default: nil
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
config_param :follow_inodes, :bool, default: false

config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
config_argument :usage, :string, default: 'in_tail_parser'
Expand Down Expand Up @@ -155,6 +157,9 @@ def configure(conf)
end
@variable_store[@pos_file] = self.plugin_id
else
if @follow_inodes
raise Fluent::ConfigError, "Can't follow inodes without pos_file configuration parameter"
end
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
Expand Down Expand Up @@ -216,7 +221,7 @@ def start
FileUtils.mkdir_p(pos_file_dir, mode: @dir_perm) unless Dir.exist?(pos_file_dir)
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.load(@pf_file, logger: log)
@pf = PositionFile.load(@pf_file, @follow_inodes, expand_paths, logger: log)

if @pos_file_compaction_interval
timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do
Expand All @@ -240,7 +245,7 @@ def stop

def shutdown
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
stop_watchers(@tails.keys, immediate: true, remove_watcher: false)
stop_watchers(existence_path, immediate: true, remove_watcher: false)
@pf_file.close if @pf_file

super
Expand Down Expand Up @@ -303,7 +308,31 @@ def expand_paths
end
path.include?('*') ? Dir.glob(path) : path
}.flatten.uniq
paths.uniq - excluded
# filter out non existing files, so in case pattern is without '*' we don't do unnecessary work
hash = {}
(paths - excluded).select { |path|
FileTest.exist?(path)
}.each { |path|
target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
if @follow_inodes
hash[target_info.ino] = target_info
else
hash[target_info.path] = target_info
end
}
hash
end

def existence_path
hash = {}
@tails.each_key {|path_ino|
if @follow_inodes
hash[path_ino.ino] = path_ino
else
hash[path_ino.path] = path_ino
end
}
hash
end

# in_tail with '*' path doesn't check rotation file equality at refresh phase.
Expand All @@ -312,21 +341,21 @@ def expand_paths
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers
target_paths = expand_paths
existence_paths = @tails.keys
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug { "tailing paths: target = #{target_paths.join(",")} | existing = #{existence_paths.join(",")}" }

unwatched = existence_paths - target_paths
added = target_paths - existence_paths
unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?
stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
end

def setup_watcher(path, pe)
def setup_watcher(path_with_inode, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, pe, log, @read_from_head, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(path_with_inode, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -357,11 +386,13 @@ def setup_watcher(path, pe)
raise e
end

def start_watchers(paths)
paths.each { |path|
def start_watchers(paths_with_inodes)
paths_with_inodes.each_value { |path_with_inode|
path = path_with_inode.path
ino = path_with_inode.ino
pe = nil
if @pf
pe = @pf[path]
pe = @pf[path, ino]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
Expand All @@ -372,58 +403,76 @@ def start_watchers(paths)
end

begin
tw = setup_watcher(path, pe)
tw = setup_watcher(path_with_inode, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
end
@tails[path] = tw
target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
@tails[target_info] = tw
}
end

def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true)
paths.each { |path|
tw = remove_watcher ? @tails.delete(path) : @tails[path]
def stop_watchers(paths_with_inodes, immediate: false, unwatched: false, remove_watcher: true)
paths_with_inodes.each_value { |path_with_inode|
if remove_watcher
tw = @tails.delete(path_with_inode)
else
tw = @tails[path_with_inode]
end
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, false)
detach_watcher(tw, path_with_inode.ino, false)
else
detach_watcher_after_rotate_wait(tw)
detach_watcher_after_rotate_wait(tw, path_with_inode.ino)
end
end
}
end

def close_watcher_handles
@tails.keys.each do |path|
tw = @tails.delete(path)
@tails.keys.each do |path_with_inode|
tw = @tails.delete(path_with_inode)
if tw
tw.close
end
end
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path, pe)
def update_watcher(path_with_inode, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

if @pf
unless pe.read_inode == @pf[path].read_inode
unless pe.read_inode == @pf[path_with_inode.path, pe.read_inode].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end
rotated_tw = @tails[path]
@tails[path] = setup_watcher(path, pe)
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw

target_info = TargetInfo.new(path_with_inode.path, pe.read_inode)
rotated_tw = @tails[target_info]

new_target_info = TargetInfo.new(path_with_inode.path, path_with_inode.ino)

if @follow_inodes
new_position_entry = @pf[path_with_inode.path, path_with_inode.ino]

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
end
else
@tails[new_target_info] = setup_watcher(new_target_info, pe)
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end

# TailWatcher#close is called by another thread at shutdown phase.
# It causes 'can't modify string; temporarily locked' error in IOHandler
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
def detach_watcher(tw, ino, close_io = true)
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
Expand All @@ -432,15 +481,15 @@ def detach_watcher(tw, close_io = true)
tw.close if close_io

if tw.unwatched && @pf
@pf.unwatch(tw.path)
@pf.unwatch(tw.path, ino)
end
end

def detach_watcher_after_rotate_wait(tw)
def detach_watcher_after_rotate_wait(tw, ino)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
detach_watcher(tw, ino)
end
end

Expand Down Expand Up @@ -608,10 +657,12 @@ def on_timer
end

class TailWatcher
def initialize(path, pe, log, read_from_head, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path
def initialize(path_with_inode, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path_with_inode.path
@ino = path_with_inode.ino
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@follow_inodes = follow_inodes
@update_watcher = update_watcher
@log = log
@rotate_handler = RotateHandler.new(log, &method(:on_rotate))
Expand All @@ -621,7 +672,7 @@ def initialize(path, pe, log, read_from_head, update_watcher, line_buffer_timer_
@watchers = []
end

attr_reader :path
attr_reader :path, :ino
attr_reader :pe
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
Expand Down Expand Up @@ -716,7 +767,17 @@ def on_rotate(stat)
end

if watcher_needs_update
@update_watcher.call(@path, swap_state(@pe))
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
if stat
target_info = TargetInfo.new(@path, stat.ino)
if @follow_inodes
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(target_info, @pe)
else
@update_watcher.call(target_info, swap_state(@pe))
end
end
else
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
Expand Down
54 changes: 43 additions & 11 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,49 @@ class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze

def self.load(file, logger:)
pf = new(file, logger: logger)
def self.load(file, follow_inodes, existing_paths, logger:)
pf = new(file, follow_inodes, existing_paths, logger: logger)
pf.load
pf
end

def initialize(file, logger: nil)
def initialize(file, follow_inodes, existing_paths, logger: nil)
@file = file
@logger = logger
@file_mutex = Mutex.new
@map = {}
@follow_inodes = follow_inodes
@existing_paths = existing_paths
end

def [](path)
if m = @map[path]
def [](path, inode)
if @follow_inodes && m = @map[inode]
return m
elsif !@follow_inodes && m = @map[path]
return m
end

@file_mutex.synchronize {
@file.seek(0, IO::SEEK_END)
seek = @file.pos + path.bytesize + 1
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
if @follow_inodes
@map[inode] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
else
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
end
}
end

def unwatch(path)
if (entry = @map.delete(path))
entry.update_pos(UNWATCHED_POSITION)
def unwatch(path, inode)
if @follow_inodes
if (entry = @map.delete(inode))
entry.update_pos(UNWATCHED_POSITION)
end
else
if (entry = @map.delete(path))
entry.update_pos(UNWATCHED_POSITION)
end
end
end

Expand All @@ -69,7 +83,11 @@ def load
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = @file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
if @follow_inodes
map[ino] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
else
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
end
end
end

Expand Down Expand Up @@ -139,13 +157,25 @@ def fetch_compacted_entries
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
if @follow_inodes
entries[ino] = Entry.new(path, pos, ino, file_pos + path.size + 1)
else
entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
end
file_pos += line.size
end
end

entries = remove_deleted_files_entries(entries, @existing_paths) if @follow_inodes
entries
end

def remove_deleted_files_entries(existent_entries, existing_paths)
filtered_entries = existent_entries.select {|file_entry|
existing_paths.key?(file_entry)
}
filtered_entries
end
end

Entry = Struct.new(:path, :pos, :ino, :seek) do
Expand Down Expand Up @@ -224,5 +254,7 @@ def read_inode
@inode
end
end

TargetInfo = Struct.new(:path, :ino)
end
end
Loading

0 comments on commit 3c85807

Please sign in to comment.