diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 0baf9d8a79..dbe628bea7 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -869,8 +869,9 @@ def reset_timer class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff - def initialize(file, map, last_pos) + def initialize(file, file_mutex, map, last_pos) @file = file + @file_mutex = file_mutex @map = map @last_pos = last_pos end @@ -880,31 +881,34 @@ def [](path) return m end - @file.pos = @last_pos - @file.write path - @file.write "\t" - seek = @file.pos - @file.write "0000000000000000\t0000000000000000\n" - @last_pos = @file.pos - - @map[path] = FilePositionEntry.new(@file, seek, 0, 0) + @file_mutex.synchronize { + @file.pos = @last_pos + @file.write "#{path}\t0000000000000000\t0000000000000000\n" + seek = @last_pos + path.bytesize + 1 + @last_pos = @file.pos + @map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0) + } end def self.parse(file) compact(file) + file_mutex = Mutex.new map = {} file.pos = 0 file.each_line {|line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) - next unless m + unless m + $log.warn "Unparsable line in pos_file: #{line}" + next + end path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) seek = file.pos - line.bytesize + path.bytesize + 1 - map[path] = FilePositionEntry.new(file, seek, pos, ino) + map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino) } - new(file, map, file.pos) + new(file, file_mutex, map, file.pos) end # Clean up unwatched file entries @@ -912,7 +916,10 @@ def self.compact(file) file.pos = 0 existent_entries = file.each_line.map { |line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) - next unless m + unless m + $log.warn "Unparsable line in pos_file: #{line}" + next + end path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) @@ -935,23 +942,28 @@ class FilePositionEntry LN_OFFSET = 33 SIZE = 34 - def initialize(file, seek, pos, inode) + def initialize(file, file_mutex, seek, pos, inode) @file = file + @file_mutex = file_mutex @seek = seek @pos = pos @inode = inode end def update(ino, pos) - @file.pos = @seek - @file.write "%016x\t%016x" % [pos, ino] + @file_mutex.synchronize { + @file.pos = @seek + @file.write "%016x\t%016x" % [pos, ino] + } @pos = pos @inode = ino end def update_pos(pos) - @file.pos = @seek - @file.write "%016x" % pos + @file_mutex.synchronize { + @file.pos = @seek + @file.write "%016x" % pos + } @pos = pos end