diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index a04b19d469..34bf439bc0 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1072,4 +1072,263 @@ def invoke_slow_flush_log_threshold_test(i) } end end + + sub_test_case "actual_flush_thread_count" do + data( + "Not buffered", + { + output_type: :sync, + config: config_element(), + expected: 0, + } + ) + data( + "Buffered with singile thread", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + expected: 1, + } + ) + data( + "Buffered with multiple threads", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + expected: 8, + } + ) + test "actual_flush_thread_count" do |data| + o = create_output(data[:output_type]) + o.configure(data[:config]) + assert_equal data[:expected], o.actual_flush_thread_count + end + + data( + "Buffered with singile thread", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + expected: 1, + } + ) + data( + "Buffered with multiple threads", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + expected: 8, + } + ) + test "actual_flush_thread_count for secondary" do |data| + primary = create_output(data[:output_type]) + primary.configure(data[:config]) + assert_equal data[:expected], primary.secondary.actual_flush_thread_count + end + end + + sub_test_case "acquire_lock_if_need" do + def setup + Dir.mktmpdir do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + yield + end + end + + def assert_worker_lock(lock_path, expect_locked) + # With LOCK_NB set, flock() returns: + # * `false` when the file is already locked. + # * `0` when the file is not locked. + File.open(lock_path, "w") do |f| + if expect_locked + assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB) + else + assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + end + + def assert_thread_lock(output_plugin, expect_locked) + t = Thread.new do + output_plugin.acquire_lock_if_need("test") do + end + end + if expect_locked + assert_nil t.join(3) + else + assert_not_nil t.join(3) + end + end + + data( + "Not buffered with single worker", + { + output_type: :sync, + config: config_element(), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Not buffered with multiple workers", + { + output_type: :sync, + config: config_element(), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with single thread and single worker", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and single worker", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: true, + } + ) + data( + "Buffered with single thread and multiple workers", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and multiple workers", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: true, + } + ) + test "acquire_lock_if_need" do |data| + o = create_output(data[:output_type]) + o.configure(data[:config]) + o.system_config_override(workers: data[:workers]) + + test_lock_name = "test_lock_name" + lock_path = o.get_lock_path(test_lock_name) + + o.acquire_lock_if_need(test_lock_name) do + assert_worker_lock(lock_path, data[:expect_worker_lock]) + assert_thread_lock(o, data[:expect_thread_lock]) + end + + assert_worker_lock(lock_path, false) + assert_thread_lock(o, false) + end + + data( + "Buffered with single thread and single worker", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and single worker", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: true, + } + ) + data( + "Buffered with single thread and multiple workers", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and multiple workers", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: true, + } + ) + test "acquire_lock_if_need for secondary" do |data| + primary = create_output(data[:output_type]) + primary.configure(data[:config]) + secondary = primary.secondary + secondary.system_config_override(workers: data[:workers]) + + test_lock_name = "test_lock_name" + lock_path = secondary.get_lock_path(test_lock_name) + + secondary.acquire_lock_if_need(test_lock_name) do + assert_worker_lock(lock_path, data[:expect_worker_lock]) + assert_thread_lock(secondary, data[:expect_thread_lock]) + end + + assert_worker_lock(lock_path, false) + assert_thread_lock(secondary, false) + end + end end