Skip to content

Commit

Permalink
Fix race condition of out_secondary_file on multiple workers
Browse files Browse the repository at this point in the history
Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Mar 6, 2023
1 parent 8e9f46a commit 6acf958
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 22 deletions.
99 changes: 77 additions & 22 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,40 @@ def configure(conf)

@dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
@file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
@need_worker_lock = system_config.workers > 1
@need_thread_lock = @primary_instance.buffer_config.flush_thread_count > 1
end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
true
end

def start
super
extend WriteLocker
@write_mutex = Mutex.new
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
return generate_path(path_without_suffix) do |path|
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
end

path
end

private
Expand Down Expand Up @@ -117,14 +123,63 @@ def has_time_format?(str)

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
path = "#{path_without_suffix}#{@suffix}"
lock_if_need(path) do
yield path
end
return path
end

begin
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
break unless File.exist?(path)
i += 1
end
lock_if_need(path) do
# If multiple processes or threads select the same path and another
# one entered this locking block first, the file should already
# exist and this one should retry to find new path.
raise FileAlreadyExist if File.exist?(path)
yield path
end
rescue FileAlreadyExist
retry
end
return path
end

class FileAlreadyExist < StandardError
end

module WriteLocker
def lock_if_need(path)
get_worker_lock_if_need(path) do
get_thread_lock_if_need do
yield
end
end
end

def get_worker_lock_if_need(path)
unless @need_worker_lock
yield
return
end
acquire_worker_lock(path) do
yield
end
end

def get_thread_lock_if_need
unless @need_thread_lock
yield
return
end
@write_mutex.synchronize do
yield
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions test/command/test_cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def test_cat_json
sub_test_case "msgpack" do
def test_cat_secondary_file
d = create_secondary_driver
d.instance_start
path = d.instance.write(@chunk)
d = create_driver
d.run(expect_records: 1) do
Expand Down
15 changes: 15 additions & 0 deletions test/plugin/test_out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def create_chunk(primary, metadata, es)

test 'should output compressed file when compress option is gzip' do
d = create_driver(CONFIG, @primary)
d.instance_start
path = d.instance.write(@chunk)

assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
Expand All @@ -161,6 +162,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}/
basename out_file_test
], @primary)
d.instance_start

msgpack_binary = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

Expand All @@ -175,6 +177,7 @@ def create_chunk(primary, metadata, es)

test 'path should be incremental when append option is false' do
d = create_driver(CONFIG, @primary)
d.instance_start
packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

5.times do |i|
Expand All @@ -186,6 +189,7 @@ def create_chunk(primary, metadata, es)

test 'path should be unchanged when append option is true' do
d = create_driver(CONFIG + %[append true], @primary)
d.instance_start
packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

[*1..5].each do |i|
Expand Down Expand Up @@ -240,6 +244,7 @@ def create_chunk(primary, metadata, es)

test 'normal path when compress option is gzip' do
d = create_driver
d.instance_start
path = d.instance.write(@c)
assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
end
Expand All @@ -249,6 +254,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}
basename out_file_test
]
d.instance_start
path = d.instance.write(@c)
assert_equal "#{TMP_DIR}/out_file_test.0", path
end
Expand All @@ -258,6 +264,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}
append true
]
d.instance_start
path = d.instance.write(@c)
assert_equal "#{TMP_DIR}/dump.bin", path
end
Expand All @@ -267,6 +274,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}
basename out_file_chunk_id_${chunk_id}
]
d.instance_start
path = d.instance.write(@c)
if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/
unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate)
Expand Down Expand Up @@ -321,6 +329,7 @@ def create_chunk(primary, metadata, es)
basename cool_${tag}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(tag: 'test.dummy')
c = create_chunk(primary, m, @es)
Expand All @@ -337,6 +346,7 @@ def create_chunk(primary, metadata, es)
basename cool_${tag[0]}_${tag[1]}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(tag: 'test.dummy')
c = create_chunk(primary, m, @es)
Expand All @@ -355,6 +365,7 @@ def create_chunk(primary, metadata, es)
basename cool_%Y%m%d%H
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
c = create_chunk(primary, m, @es)
Expand All @@ -373,6 +384,7 @@ def create_chunk(primary, metadata, es)
basename cool_%Y%m%d%H
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
c = create_chunk(primary, m, @es)
Expand All @@ -389,6 +401,7 @@ def create_chunk(primary, metadata, es)
basename cool_${test1}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" })
c = create_chunk(primary, m, @es)
Expand Down Expand Up @@ -421,6 +434,7 @@ def create_chunk(primary, metadata, es)
basename cool_%Y%m%d%H_${tag}_${test1}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(
timekey: event_time("2011-01-02 13:14:15 UTC"),
Expand All @@ -443,6 +457,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(
timekey: event_time("2011-01-02 13:14:15 UTC"),
Expand Down

0 comments on commit 6acf958

Please sign in to comment.