Skip to content

Commit

Permalink
Merge pull request #4081 from daipom/fix-out-secondary-file-race-cond…
Browse files Browse the repository at this point in the history
…itiion

Fix race condition of out_secondary_file
  • Loading branch information
ashie authored Mar 9, 2023
2 parents 248160f + 70bf593 commit 59399c1
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 22 deletions.
61 changes: 39 additions & 22 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,29 @@ def configure(conf)
end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
true
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
generate_path(path_without_suffix) do |path|
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
end

path
end

private
Expand Down Expand Up @@ -117,15 +115,34 @@ def has_time_format?(str)

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
path = "#{path_without_suffix}#{@suffix}"
synchronize_path(path) do
yield path
end
return path
end

begin
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
break unless File.exist?(path)
i += 1
end
synchronize_path(path) do
# If multiple processes or threads select the same path and another
# one entered this locking block first, the file should already
# exist and this one should retry to find new path.
raise FileAlreadyExist if File.exist?(path)
yield path
end
rescue FileAlreadyExist
retry
end
path
end

class FileAlreadyExist < StandardError
end
end
end
37 changes: 37 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def rollback_count
def initialize
super
@counter_mutex = Mutex.new
@flush_thread_mutex = Mutex.new
@buffering = false
@delayed_commit = false
@as_secondary = false
Expand Down Expand Up @@ -597,6 +598,42 @@ def terminate
super
end

def actual_flush_thread_count
return 0 unless @buffering
return @buffer_config.flush_thread_count unless @as_secondary
@primary_instance.buffer_config.flush_thread_count
end

# Ensures `path` (filename or filepath) processable
# only by the current thread in the current process.
# For multiple workers, the lock is shared if `path` is the same value.
# For multiple threads, the lock is shared by all threads in the same process.
def synchronize_path(path)
synchronize_path_in_workers(path) do
synchronize_in_threads do
yield
end
end
end

def synchronize_path_in_workers(path)
need_worker_lock = system_config.workers > 1
if need_worker_lock
acquire_worker_lock(path) { yield }
else
yield
end
end

def synchronize_in_threads
need_thread_lock = actual_flush_thread_count > 1
if need_thread_lock
@flush_thread_mutex.synchronize { yield }
else
yield
end
end

def support_in_v12_style?(feature)
# for plugins written in v0.12 styles
case feature
Expand Down
Loading

0 comments on commit 59399c1

Please sign in to comment.