Skip to content

Commit

Permalink
Fix race condition of out_secondary_file on multiple workers
Browse files Browse the repository at this point in the history
Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Mar 3, 2023
1 parent 8e9f46a commit fe65b4f
Showing 1 changed file with 51 additions and 22 deletions.
73 changes: 51 additions & 22 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,33 @@ def configure(conf)

@dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
@file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
@need_lock = system_config.workers > 1
end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
true
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
return generate_path(path_without_suffix) do |path|
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
end

path
end

private
Expand Down Expand Up @@ -117,15 +116,45 @@ def has_time_format?(str)

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
path = "#{path_without_suffix}#{@suffix}"
lock_if_needed(path) do
yield path
end
return path
end

begin
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
break unless File.exist?(path)
i += 1
end
lock_if_needed(path) do
# If multiple processes select the same path and another process
# entered this locking block first, the file should already exist
# and this process should retry to find new path.
raise FileAlreadyExist if File.exist?(path)
yield path
end
rescue FileAlreadyExist
retry
end
return path
end

def lock_if_needed(path)
unless @need_lock
yield
return
end

acquire_worker_lock(path) do
yield
end
end

class FileAlreadyExist < StandardError
end
end
end

0 comments on commit fe65b4f

Please sign in to comment.