diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index e294a2e379..b9bf8b80af 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -61,34 +61,33 @@ def configure(conf) @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION + @need_lock = system_config.workers > 1 end def multi_workers_ready? - ### TODO: add hack to synchronize for multi workers true end def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) - path = generate_path(path_without_suffix) - FileUtils.mkdir_p File.dirname(path), mode: @dir_perm - - case @compress - when :text - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - chunk.write_to(f) - } - when :gzip - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - gz = Zlib::GzipWriter.new(f) - chunk.write_to(gz) - gz.close - } + return generate_path(path_without_suffix) do |path| + FileUtils.mkdir_p File.dirname(path), mode: @dir_perm + + case @compress + when :text + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + chunk.write_to(f) + } + when :gzip + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + gz = Zlib::GzipWriter.new(f) + chunk.write_to(gz) + gz.close + } + end end - - path end private @@ -117,15 +116,45 @@ def has_time_format?(str) def generate_path(path_without_suffix) if @append - "#{path_without_suffix}#{@suffix}" - else + path = "#{path_without_suffix}#{@suffix}" + lock_if_needed(path) do + yield path + end + return path + end + + begin i = 0 loop do path = "#{path_without_suffix}.#{i}#{@suffix}" - return path unless File.exist?(path) + break unless File.exist?(path) i += 1 end + lock_if_needed(path) do + # If multiple processes select the same path and another process + # entered this locking block first, the file should already exist + # and this process should retry to find new path. + raise FileAlreadyExist if File.exist?(path) + yield path + end + rescue FileAlreadyExist + retry end + return path + end + + def lock_if_needed(path) + unless @need_lock + yield + return + end + + acquire_worker_lock(path) do + yield + end + end + + class FileAlreadyExist < StandardError end end end