diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 08f027b8de..85d9b000e7 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -61,20 +61,12 @@ def configure(conf) @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION - @need_worker_lock = system_config.workers > 1 - @need_thread_lock = @primary_instance.buffer_config.flush_thread_count > 1 end def multi_workers_ready? true end - def start - super - extend WriteLocker - @write_mutex = Mutex.new - end - def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) generate_path(path_without_suffix) do |path| @@ -152,35 +144,5 @@ def generate_path(path_without_suffix) class FileAlreadyExist < StandardError end - - module WriteLocker - def lock_if_need(path) - get_worker_lock_if_need(path) do - get_thread_lock_if_need do - yield - end - end - end - - def get_worker_lock_if_need(path) - unless @need_worker_lock - yield - return - end - acquire_worker_lock(path) do - yield - end - end - - def get_thread_lock_if_need - unless @need_thread_lock - yield - return - end - @write_mutex.synchronize do - yield - end - end - end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 5dd5255652..8e821388f5 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -198,6 +198,7 @@ def rollback_count def initialize super @counter_mutex = Mutex.new + @flush_thread_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @@ -597,6 +598,47 @@ def terminate super end + def actual_flush_thread_count + return 0 unless @buffering + return @buffer_config.flush_thread_count unless @as_secondary + @primary_instance.buffer_config.flush_thread_count + end + + # Run the passed block in the appropriate lock condition for multiple threads and workers. + # The lock between workers is made for every `worker_lock_name`. + # (For multiple workers, the lock is shared if `worker_lock_name` is the same value). + # For multiple threads, `worker_lock_name` is not used, and the lock is shared by all + # threads in the same process. + def lock_if_need(worker_lock_name) + get_worker_lock_if_need(worker_lock_name) do + get_flush_thread_lock_if_need do + yield + end + end + end + + def get_worker_lock_if_need(name) + need_worker_lock = system_config.workers > 1 + unless need_worker_lock + yield + return + end + acquire_worker_lock(name) do + yield + end + end + + def get_flush_thread_lock_if_need + need_thread_lock = actual_flush_thread_count > 1 + unless need_thread_lock + yield + return + end + @flush_thread_mutex.synchronize do + yield + end + end + def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature