Skip to content

Commit

Permalink
Add size for each log file
Browse files Browse the repository at this point in the history
Remove debug log

Add size file

Update size parameter FilePositionEntry

adding logs

update pos file function in memory

add debug size log

update file size from pos_file read

Save size inside the file and keep it backward compatible

Revert "Add log throttling per file"

This reverts commit f5bb69b.

Remove debug log
  • Loading branch information
Anthony Comtois committed Nov 4, 2019
1 parent 0ac4f42 commit 9321337
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def start_watchers(paths)
pe = @pf[path]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
pe.update(Fluent::FileWrapper.stat(path).ino, 0, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
end
Expand Down Expand Up @@ -372,7 +372,7 @@ def detach_watcher(tw, close_io = true)
tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
@pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
@pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION, 0)
end
end

Expand Down Expand Up @@ -571,6 +571,7 @@ def on_rotate(stat)
fsize = stat.size
inode = stat.ino

@log.debug "io handler size file #{fsize}"
last_inode = @pe.read_inode
if inode == last_inode
# rotated file has the same inode number with the last file.
Expand All @@ -580,19 +581,19 @@ def on_rotate(stat)
# in either case of a and b, seek to the saved position
# c) file was once renamed, truncated and then backed
# in this case, consider it truncated
@pe.update(inode, 0) if fsize < @pe.read_pos
@pe.update(inode, 0, fsize) if fsize < @pe.read_pos
elsif last_inode != 0
# this is FilePositionEntry and fluentd once started.
# read data from the head of the rotated file.
# logs never duplicate because this file is a rotated new file.
@pe.update(inode, 0)
@pe.update(inode, 0, fize)
else
# this is MemoryPositionEntry or this is the first time fluentd started.
# seek to the end of the any files.
# logs may duplicate without this seek because it's not sure the file is
# existent file or rotated new file.
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
@pe.update(inode, pos, fsize)
end
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
else
Expand All @@ -604,10 +605,10 @@ def on_rotate(stat)
if stat
inode = stat.ino
if inode == @pe.read_inode # truncated
@pe.update_pos(0)
@pe.update_pos(0, stat.size)
@io_handler.close
elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
@pe.update(inode, 0)
@pe.update(inode, 0, stat.size)
else # file is rotated and new file found
watcher_needs_update = true
# Handle the old log file before renewing TailWatcher [fluentd#1055]
Expand All @@ -634,7 +635,7 @@ def on_rotate(stat)
def swap_state(pe)
# Use MemoryPositionEntry for rotated file temporary
mpe = MemoryPositionEntry.new
mpe.update(pe.read_inode, pe.read_pos)
mpe.update(pe.read_inode, pe.read_pos, pe.read_size)
@pe = mpe
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
Expand Down Expand Up @@ -766,7 +767,7 @@ def handle_notify

unless @lines.empty?
if @receive_lines.call(@lines)
@watcher.pe.update_pos(io.pos - @fifo.bytesize)
@watcher.pe.update_pos(io.pos - @fifo.bytesize, io.size)
@lines.clear
else
read_more = false
Expand Down Expand Up @@ -912,10 +913,10 @@ def [](path)

@file_mutex.synchronize {
@file.pos = @last_pos
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
@file.write "#{path}\t0000000000000000\t0000000000000000\t0000000000000000\n"
seek = @last_pos + path.bytesize + 1
@last_pos = @file.pos
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0, 0)
}
end

Expand All @@ -926,16 +927,17 @@ def self.parse(file)
map = {}
file.pos = 0
file.each_line {|line|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)[\t]?([0-9a-fA-F]+)?/.match(line)
unless m
$log.warn "Unparsable line in pos_file: #{line}"
next
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
size = m[4].to_i(16)
seek = file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino)
map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, size, ino)
}
new(file, file_mutex, map, file.pos)
end
Expand All @@ -944,16 +946,17 @@ def self.parse(file)
def self.compact(file)
file.pos = 0
existent_entries = file.each_line.map { |line|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)[\t]?([0-9a-fA-F]+)?/.match(line)
unless m
$log.warn "Unparsable line in pos_file: #{line}"
next
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
m[4].nil? ? size = 0 : size = m[4].to_i(16)
# 32bit inode converted to 64bit at this phase
pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino])
pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\t%016x\n" % [path, pos, ino, size])
}.compact

file.pos = 0
Expand All @@ -971,35 +974,42 @@ class FilePositionEntry
LN_OFFSET = 33
SIZE = 34

def initialize(file, file_mutex, seek, pos, inode)
def initialize(file, file_mutex, seek, pos, size, inode)
@file = file
@file_mutex = file_mutex
@seek = seek
@pos = pos
@size = size
@inode = inode
end

def update(ino, pos)
def update(ino, pos, size)
@file_mutex.synchronize {
@file.pos = @seek
@file.write "%016x\t%016x" % [pos, ino]
@file.write "%016x\t%016x\t%016x" % [pos, ino, size]
}
@pos = pos
@inode = ino
@size = size
end

def update_pos(pos)
def update_pos(pos, size)
@file_mutex.synchronize {
@file.pos = @seek
@file.write "%016x" % pos
@file.write "%016x\t%016x\t%016x" % [pos, @inode, size]
}
@pos = pos
@size = size
end

def read_inode
@inode
end

def read_size
@size
end

def read_pos
@pos
end
Expand All @@ -1009,21 +1019,28 @@ class MemoryPositionEntry
def initialize
@pos = 0
@inode = 0
@size = 0
end

def update(ino, pos)
def update(ino, pos, size)
@inode = ino
@pos = pos
@size = size
end

def update_pos(pos)
def update_pos(pos, size)
@pos = pos
@size = size
end

def read_pos
@pos
end

def read_size
@size
end

def read_inode
@inode
end
Expand Down

0 comments on commit 9321337

Please sign in to comment.