Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition of out_secondary_file #4081

Merged
merged 10 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 77 additions & 22 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,40 @@ def configure(conf)

@dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
@file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
@need_worker_lock = system_config.workers > 1
@need_thread_lock = @primary_instance.buffer_config.flush_thread_count > 1
daipom marked this conversation as resolved.
Show resolved Hide resolved
end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
true
end

def start
super
extend WriteLocker
daipom marked this conversation as resolved.
Show resolved Hide resolved
@write_mutex = Mutex.new
daipom marked this conversation as resolved.
Show resolved Hide resolved
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
return generate_path(path_without_suffix) do |path|
daipom marked this conversation as resolved.
Show resolved Hide resolved
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
end

path
end

private
Expand Down Expand Up @@ -117,14 +123,63 @@ def has_time_format?(str)

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
path = "#{path_without_suffix}#{@suffix}"
lock_if_need(path) do
yield path
end
return path
end

begin
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
break unless File.exist?(path)
i += 1
end
lock_if_need(path) do
# If multiple processes or threads select the same path and another
# one entered this locking block first, the file should already
# exist and this one should retry to find new path.
raise FileAlreadyExist if File.exist?(path)
yield path
end
rescue FileAlreadyExist
retry
end
return path
daipom marked this conversation as resolved.
Show resolved Hide resolved
end

class FileAlreadyExist < StandardError
end

module WriteLocker
def lock_if_need(path)
get_worker_lock_if_need(path) do
get_thread_lock_if_need do
yield
end
end
end

def get_worker_lock_if_need(path)
unless @need_worker_lock
yield
return
end
acquire_worker_lock(path) do
yield
end
end

def get_thread_lock_if_need
unless @need_thread_lock
yield
return
end
@write_mutex.synchronize do
yield
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions test/command/test_cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def test_cat_json
sub_test_case "msgpack" do
def test_cat_secondary_file
d = create_secondary_driver
d.instance_start
daipom marked this conversation as resolved.
Show resolved Hide resolved
path = d.instance.write(@chunk)
d = create_driver
d.run(expect_records: 1) do
Expand Down
15 changes: 15 additions & 0 deletions test/plugin/test_out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def create_chunk(primary, metadata, es)

test 'should output compressed file when compress option is gzip' do
d = create_driver(CONFIG, @primary)
d.instance_start
path = d.instance.write(@chunk)

assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
Expand All @@ -161,6 +162,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}/
basename out_file_test
], @primary)
d.instance_start

msgpack_binary = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

Expand All @@ -175,6 +177,7 @@ def create_chunk(primary, metadata, es)

test 'path should be incremental when append option is false' do
d = create_driver(CONFIG, @primary)
d.instance_start
packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

5.times do |i|
Expand All @@ -186,6 +189,7 @@ def create_chunk(primary, metadata, es)

test 'path should be unchanged when append option is true' do
d = create_driver(CONFIG + %[append true], @primary)
d.instance_start
packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')

[*1..5].each do |i|
Expand Down Expand Up @@ -240,6 +244,7 @@ def create_chunk(primary, metadata, es)

test 'normal path when compress option is gzip' do
d = create_driver
d.instance_start
path = d.instance.write(@c)
assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
end
Expand All @@ -249,6 +254,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}
basename out_file_test
]
d.instance_start
path = d.instance.write(@c)
assert_equal "#{TMP_DIR}/out_file_test.0", path
end
Expand All @@ -258,6 +264,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}
append true
]
d.instance_start
path = d.instance.write(@c)
assert_equal "#{TMP_DIR}/dump.bin", path
end
Expand All @@ -267,6 +274,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}
basename out_file_chunk_id_${chunk_id}
]
d.instance_start
path = d.instance.write(@c)
if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/
unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate)
Expand Down Expand Up @@ -321,6 +329,7 @@ def create_chunk(primary, metadata, es)
basename cool_${tag}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(tag: 'test.dummy')
c = create_chunk(primary, m, @es)
Expand All @@ -337,6 +346,7 @@ def create_chunk(primary, metadata, es)
basename cool_${tag[0]}_${tag[1]}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(tag: 'test.dummy')
c = create_chunk(primary, m, @es)
Expand All @@ -355,6 +365,7 @@ def create_chunk(primary, metadata, es)
basename cool_%Y%m%d%H
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
c = create_chunk(primary, m, @es)
Expand All @@ -373,6 +384,7 @@ def create_chunk(primary, metadata, es)
basename cool_%Y%m%d%H
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
c = create_chunk(primary, m, @es)
Expand All @@ -389,6 +401,7 @@ def create_chunk(primary, metadata, es)
basename cool_${test1}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" })
c = create_chunk(primary, m, @es)
Expand Down Expand Up @@ -421,6 +434,7 @@ def create_chunk(primary, metadata, es)
basename cool_%Y%m%d%H_${tag}_${test1}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(
timekey: event_time("2011-01-02 13:14:15 UTC"),
Expand All @@ -443,6 +457,7 @@ def create_chunk(primary, metadata, es)
directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1}
compress gzip
], primary)
d.instance_start

m = primary.buffer.new_metadata(
timekey: event_time("2011-01-02 13:14:15 UTC"),
Expand Down