Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add follow_inodes parameter on in_tail #3182

Merged
merged 16 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 97 additions & 36 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def initialize
config_param :ignore_repeated_permission_error, :bool, default: false
desc 'Format path with the specified timezone'
config_param :path_timezone, :string, default: nil
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
config_param :follow_inodes, :bool, default: false

config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
config_argument :usage, :string, default: 'in_tail_parser'
Expand Down Expand Up @@ -155,6 +157,9 @@ def configure(conf)
end
@variable_store[@pos_file] = self.plugin_id
else
if @follow_inodes
raise Fluent::ConfigError, "Can't follow inodes without pos_file configuration parameter"
end
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
Expand Down Expand Up @@ -216,7 +221,7 @@ def start
FileUtils.mkdir_p(pos_file_dir, mode: @dir_perm) unless Dir.exist?(pos_file_dir)
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.load(@pf_file, logger: log)
@pf = PositionFile.load(@pf_file, @follow_inodes, expand_paths, logger: log)

if @pos_file_compaction_interval
timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do
Expand All @@ -240,7 +245,7 @@ def stop

def shutdown
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
stop_watchers(@tails.keys, immediate: true, remove_watcher: false)
stop_watchers(existence_path, immediate: true, remove_watcher: false)
@pf_file.close if @pf_file

super
Expand Down Expand Up @@ -303,7 +308,31 @@ def expand_paths
end
path.include?('*') ? Dir.glob(path) : path
}.flatten.uniq
paths.uniq - excluded
# filter out non existing files, so in case pattern is without '*' we don't do unnecessary work
hash = {}
(paths - excluded).select { |path|
FileTest.exist?(path)
}.each { |path|
target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
if @follow_inodes
hash[target_info.ino] = target_info
else
hash[target_info.path] = target_info
end
}
hash
end

def existence_path
hash = {}
@tails.each_key {|path_ino|
if @follow_inodes
hash[path_ino.ino] = path_ino
else
hash[path_ino.path] = path_ino
end
}
hash
end

# in_tail with '*' path doesn't check rotation file equality at refresh phase.
Expand All @@ -312,21 +341,21 @@ def expand_paths
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers
target_paths = expand_paths
existence_paths = @tails.keys
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug { "tailing paths: target = #{target_paths.join(",")} | existing = #{existence_paths.join(",")}" }

unwatched = existence_paths - target_paths
added = target_paths - existence_paths
unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?
stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
end

def setup_watcher(path, pe)
def setup_watcher(path_with_inode, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, pe, log, @read_from_head, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(path_with_inode, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -357,11 +386,13 @@ def setup_watcher(path, pe)
raise e
end

def start_watchers(paths)
paths.each { |path|
def start_watchers(paths_with_inodes)
paths_with_inodes.each_value { |path_with_inode|
path = path_with_inode.path
ino = path_with_inode.ino
pe = nil
if @pf
pe = @pf[path]
pe = @pf[path, ino]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
Expand All @@ -372,58 +403,76 @@ def start_watchers(paths)
end

begin
tw = setup_watcher(path, pe)
tw = setup_watcher(path_with_inode, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
end
@tails[path] = tw
target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
@tails[target_info] = tw
}
end

def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true)
paths.each { |path|
tw = remove_watcher ? @tails.delete(path) : @tails[path]
def stop_watchers(paths_with_inodes, immediate: false, unwatched: false, remove_watcher: true)
paths_with_inodes.each_value { |path_with_inode|
if remove_watcher
tw = @tails.delete(path_with_inode)
else
tw = @tails[path_with_inode]
end
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, false)
detach_watcher(tw, path_with_inode.ino, false)
else
detach_watcher_after_rotate_wait(tw)
detach_watcher_after_rotate_wait(tw, path_with_inode.ino)
end
end
}
end

def close_watcher_handles
@tails.keys.each do |path|
tw = @tails.delete(path)
@tails.keys.each do |path_with_inode|
tw = @tails.delete(path_with_inode)
if tw
tw.close
end
end
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path, pe)
def update_watcher(path_with_inode, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

if @pf
unless pe.read_inode == @pf[path].read_inode
unless pe.read_inode == @pf[path_with_inode.path, pe.read_inode].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end
rotated_tw = @tails[path]
@tails[path] = setup_watcher(path, pe)
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw

target_info = TargetInfo.new(path_with_inode.path, pe.read_inode)
rotated_tw = @tails[target_info]

new_target_info = TargetInfo.new(path_with_inode.path, path_with_inode.ino)

if @follow_inodes
new_position_entry = @pf[path_with_inode.path, path_with_inode.ino]

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
end
else
@tails[new_target_info] = setup_watcher(new_target_info, pe)
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end

# TailWatcher#close is called by another thread at shutdown phase.
# It causes 'can't modify string; temporarily locked' error in IOHandler
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
def detach_watcher(tw, ino, close_io = true)
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
Expand All @@ -432,15 +481,15 @@ def detach_watcher(tw, close_io = true)
tw.close if close_io

if tw.unwatched && @pf
@pf.unwatch(tw.path)
@pf.unwatch(tw.path, ino)
end
end

def detach_watcher_after_rotate_wait(tw)
def detach_watcher_after_rotate_wait(tw, ino)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
detach_watcher(tw, ino)
end
end

Expand Down Expand Up @@ -608,10 +657,12 @@ def on_timer
end

class TailWatcher
def initialize(path, pe, log, read_from_head, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path
def initialize(path_with_inode, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path_with_inode.path
@ino = path_with_inode.ino
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@follow_inodes = follow_inodes
@update_watcher = update_watcher
@log = log
@rotate_handler = RotateHandler.new(log, &method(:on_rotate))
Expand All @@ -621,7 +672,7 @@ def initialize(path, pe, log, read_from_head, update_watcher, line_buffer_timer_
@watchers = []
end

attr_reader :path
attr_reader :path, :ino
attr_reader :pe
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
Expand Down Expand Up @@ -716,7 +767,17 @@ def on_rotate(stat)
end

if watcher_needs_update
@update_watcher.call(@path, swap_state(@pe))
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
if stat
target_info = TargetInfo.new(@path, stat.ino)
if @follow_inodes
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(target_info, @pe)
else
@update_watcher.call(target_info, swap_state(@pe))
end
end
else
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
Expand Down
54 changes: 43 additions & 11 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,49 @@ class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze

def self.load(file, logger:)
pf = new(file, logger: logger)
def self.load(file, follow_inodes, existing_paths, logger:)
pf = new(file, follow_inodes, existing_paths, logger: logger)
pf.load
pf
end

def initialize(file, logger: nil)
def initialize(file, follow_inodes, existing_paths, logger: nil)
@file = file
@logger = logger
@file_mutex = Mutex.new
@map = {}
@follow_inodes = follow_inodes
@existing_paths = existing_paths
end

def [](path)
if m = @map[path]
def [](path, inode)
if @follow_inodes && m = @map[inode]
return m
elsif !@follow_inodes && m = @map[path]
return m
end

@file_mutex.synchronize {
@file.seek(0, IO::SEEK_END)
seek = @file.pos + path.bytesize + 1
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
if @follow_inodes
@map[inode] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
else
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
end
}
end

def unwatch(path)
if (entry = @map.delete(path))
entry.update_pos(UNWATCHED_POSITION)
def unwatch(path, inode)
if @follow_inodes
if (entry = @map.delete(inode))
entry.update_pos(UNWATCHED_POSITION)
end
else
if (entry = @map.delete(path))
entry.update_pos(UNWATCHED_POSITION)
end
end
end

Expand All @@ -69,7 +83,11 @@ def load
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = @file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
if @follow_inodes
map[ino] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
else
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
end
end
end

Expand Down Expand Up @@ -139,13 +157,25 @@ def fetch_compacted_entries
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
if @follow_inodes
entries[ino] = Entry.new(path, pos, ino, file_pos + path.size + 1)
else
entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
end
file_pos += line.size
end
end

entries = remove_deleted_files_entries(entries, @existing_paths) if @follow_inodes
entries
end

def remove_deleted_files_entries(existent_entries, existing_paths)
filtered_entries = existent_entries.select {|file_entry|
existing_paths.key?(file_entry)
}
filtered_entries
end
end

Entry = Struct.new(:path, :pos, :ino, :seek) do
Expand Down Expand Up @@ -224,5 +254,7 @@ def read_inode
@inode
end
end

TargetInfo = Struct.new(:path, :ino)
end
end
Loading