Skip to content

Commit

Permalink
Merge pull request #2118 from vitclone/fix-in_tail-pos_file-race-cond…
Browse files Browse the repository at this point in the history
…ition

Prevent thread switching in the interval between seek and write operations to pos_file
  • Loading branch information
repeatedly authored Sep 18, 2018
2 parents 0337300 + cdc7109 commit 19b6cd6
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,9 @@ def reset_timer
class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff

def initialize(file, map, last_pos)
def initialize(file, file_mutex, map, last_pos)
@file = file
@file_mutex = file_mutex
@map = map
@last_pos = last_pos
end
Expand All @@ -880,39 +881,45 @@ def [](path)
return m
end

@file.pos = @last_pos
@file.write path
@file.write "\t"
seek = @file.pos
@file.write "0000000000000000\t0000000000000000\n"
@last_pos = @file.pos

@map[path] = FilePositionEntry.new(@file, seek, 0, 0)
@file_mutex.synchronize {
@file.pos = @last_pos
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
seek = @last_pos + path.bytesize + 1
@last_pos = @file.pos
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
}
end

def self.parse(file)
compact(file)

file_mutex = Mutex.new
map = {}
file.pos = 0
file.each_line {|line|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
next unless m
unless m
$log.warn "Unparsable line in pos_file: #{line}"
next
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(file, seek, pos, ino)
map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino)
}
new(file, map, file.pos)
new(file, file_mutex, map, file.pos)
end

# Clean up unwatched file entries
def self.compact(file)
file.pos = 0
existent_entries = file.each_line.map { |line|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
next unless m
unless m
$log.warn "Unparsable line in pos_file: #{line}"
next
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
Expand All @@ -935,23 +942,28 @@ class FilePositionEntry
LN_OFFSET = 33
SIZE = 34

def initialize(file, seek, pos, inode)
def initialize(file, file_mutex, seek, pos, inode)
@file = file
@file_mutex = file_mutex
@seek = seek
@pos = pos
@inode = inode
end

def update(ino, pos)
@file.pos = @seek
@file.write "%016x\t%016x" % [pos, ino]
@file_mutex.synchronize {
@file.pos = @seek
@file.write "%016x\t%016x" % [pos, ino]
}
@pos = pos
@inode = ino
end

def update_pos(pos)
@file.pos = @seek
@file.write "%016x" % pos
@file_mutex.synchronize {
@file.pos = @seek
@file.write "%016x" % pos
}
@pos = pos
end

Expand Down

0 comments on commit 19b6cd6

Please sign in to comment.