From 6acf958c00e7a1e2d3030d7a6ccbd8908f5351d9 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 6 Mar 2023 20:42:06 +0900 Subject: [PATCH] Fix race condition of out_secondary_file on multiple workers Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_secondary_file.rb | 99 +++++++++++++++++++------ test/command/test_cat.rb | 1 + test/plugin/test_out_secondary_file.rb | 15 ++++ 3 files changed, 93 insertions(+), 22 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index e294a2e379..5859b28a95 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -61,34 +61,40 @@ def configure(conf) @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION + @need_worker_lock = system_config.workers > 1 + @need_thread_lock = @primary_instance.buffer_config.flush_thread_count > 1 end def multi_workers_ready? - ### TODO: add hack to synchronize for multi workers true end + def start + super + extend WriteLocker + @write_mutex = Mutex.new + end + def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) - path = generate_path(path_without_suffix) - FileUtils.mkdir_p File.dirname(path), mode: @dir_perm - - case @compress - when :text - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - chunk.write_to(f) - } - when :gzip - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - gz = Zlib::GzipWriter.new(f) - chunk.write_to(gz) - gz.close - } + return generate_path(path_without_suffix) do |path| + FileUtils.mkdir_p File.dirname(path), mode: @dir_perm + + case @compress + when :text + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + chunk.write_to(f) + } + when :gzip + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + gz = Zlib::GzipWriter.new(f) + chunk.write_to(gz) + gz.close + } + end end - - path end private @@ -117,14 +123,63 @@ def has_time_format?(str) def generate_path(path_without_suffix) if @append - "#{path_without_suffix}#{@suffix}" - else + path = "#{path_without_suffix}#{@suffix}" + lock_if_need(path) do + yield path + end + return path + end + + begin i = 0 loop do path = "#{path_without_suffix}.#{i}#{@suffix}" - return path unless File.exist?(path) + break unless File.exist?(path) i += 1 end + lock_if_need(path) do + # If multiple processes or threads select the same path and another + # one entered this locking block first, the file should already + # exist and this one should retry to find new path. + raise FileAlreadyExist if File.exist?(path) + yield path + end + rescue FileAlreadyExist + retry + end + return path + end + + class FileAlreadyExist < StandardError + end + + module WriteLocker + def lock_if_need(path) + get_worker_lock_if_need(path) do + get_thread_lock_if_need do + yield + end + end + end + + def get_worker_lock_if_need(path) + unless @need_worker_lock + yield + return + end + acquire_worker_lock(path) do + yield + end + end + + def get_thread_lock_if_need + unless @need_thread_lock + yield + return + end + @write_mutex.synchronize do + yield + end end end end diff --git a/test/command/test_cat.rb b/test/command/test_cat.rb index 51c1a20bbb..9e0c6e8d82 100644 --- a/test/command/test_cat.rb +++ b/test/command/test_cat.rb @@ -83,6 +83,7 @@ def test_cat_json sub_test_case "msgpack" do def test_cat_secondary_file d = create_secondary_driver + d.instance_start path = d.instance.write(@chunk) d = create_driver d.run(expect_records: 1) do diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 05091b2853..139e3b7b67 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -150,6 +150,7 @@ def create_chunk(primary, metadata, es) test 'should output compressed file when compress option is gzip' do d = create_driver(CONFIG, @primary) + d.instance_start path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path @@ -161,6 +162,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR}/ basename out_file_test ], @primary) + d.instance_start msgpack_binary = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') @@ -175,6 +177,7 @@ def create_chunk(primary, metadata, es) test 'path should be incremental when append option is false' do d = create_driver(CONFIG, @primary) + d.instance_start packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') 5.times do |i| @@ -186,6 +189,7 @@ def create_chunk(primary, metadata, es) test 'path should be unchanged when append option is true' do d = create_driver(CONFIG + %[append true], @primary) + d.instance_start packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| @@ -240,6 +244,7 @@ def create_chunk(primary, metadata, es) test 'normal path when compress option is gzip' do d = create_driver + d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path end @@ -249,6 +254,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} basename out_file_test ] + d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0", path end @@ -258,6 +264,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} append true ] + d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/dump.bin", path end @@ -267,6 +274,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} basename out_file_chunk_id_${chunk_id} ] + d.instance_start path = d.instance.write(@c) if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/ unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) @@ -321,6 +329,7 @@ def create_chunk(primary, metadata, es) basename cool_${tag} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(tag: 'test.dummy') c = create_chunk(primary, m, @es) @@ -337,6 +346,7 @@ def create_chunk(primary, metadata, es) basename cool_${tag[0]}_${tag[1]} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(tag: 'test.dummy') c = create_chunk(primary, m, @es) @@ -355,6 +365,7 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) c = create_chunk(primary, m, @es) @@ -373,6 +384,7 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) c = create_chunk(primary, m, @es) @@ -389,6 +401,7 @@ def create_chunk(primary, metadata, es) basename cool_${test1} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" }) c = create_chunk(primary, m, @es) @@ -421,6 +434,7 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H_${tag}_${test1} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata( timekey: event_time("2011-01-02 13:14:15 UTC"), @@ -443,6 +457,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata( timekey: event_time("2011-01-02 13:14:15 UTC"),