Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Mar 8, 2023
1 parent edcbd4b commit f23e7e6
Showing 1 changed file with 259 additions and 0 deletions.
259 changes: 259 additions & 0 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1072,4 +1072,263 @@ def invoke_slow_flush_log_threshold_test(i)
}
end
end

sub_test_case "actual_flush_thread_count" do
data(
"Not buffered",
{
output_type: :sync,
config: config_element(),
expected: 0,
}
)
data(
"Buffered with singile thread",
{
output_type: :full,
config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]),
expected: 1,
}
)
data(
"Buffered with multiple threads",
{
output_type: :full,
config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]),
expected: 8,
}
)
test "actual_flush_thread_count" do |data|
o = create_output(data[:output_type])
o.configure(data[:config])
assert_equal data[:expected], o.actual_flush_thread_count
end

data(
"Buffered with singile thread",
{
output_type: :full,
config: config_element(
"ROOT", "", {},
[
config_element("buffer", "", {}),
config_element("secondary", "", {"@type" => "test", "name" => "test"}),
]
),
expected: 1,
}
)
data(
"Buffered with multiple threads",
{
output_type: :full,
config: config_element(
"ROOT", "", {},
[
config_element("buffer", "", {"flush_thread_count" => 8}),
config_element("secondary", "", {"@type" => "test", "name" => "test"}),
]
),
expected: 8,
}
)
test "actual_flush_thread_count for secondary" do |data|
primary = create_output(data[:output_type])
primary.configure(data[:config])
assert_equal data[:expected], primary.secondary.actual_flush_thread_count
end
end

sub_test_case "acquire_lock_if_need" do
def setup
Dir.mktmpdir do |lock_dir|
ENV['FLUENTD_LOCK_DIR'] = lock_dir
yield
end
end

def assert_worker_lock(lock_path, expect_locked)
# With LOCK_NB set, flock() returns:
# * `false` when the file is already locked.
# * `0` when the file is not locked.
File.open(lock_path, "w") do |f|
if expect_locked
assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB)
else
assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB)
end
end
end

def assert_thread_lock(output_plugin, expect_locked)
t = Thread.new do
output_plugin.acquire_lock_if_need("test") do
end
end
if expect_locked
assert_nil t.join(3)
else
assert_not_nil t.join(3)
end
end

data(
"Not buffered with single worker",
{
output_type: :sync,
config: config_element(),
workers: 1,
expect_worker_lock: false,
expect_thread_lock: false,
}
)
data(
"Not buffered with multiple workers",
{
output_type: :sync,
config: config_element(),
workers: 4,
expect_worker_lock: true,
expect_thread_lock: false,
}
)
data(
"Buffered with single thread and single worker",
{
output_type: :full,
config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]),
workers: 1,
expect_worker_lock: false,
expect_thread_lock: false,
}
)
data(
"Buffered with multiple threads and single worker",
{
output_type: :full,
config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]),
workers: 1,
expect_worker_lock: false,
expect_thread_lock: true,
}
)
data(
"Buffered with single thread and multiple workers",
{
output_type: :full,
config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]),
workers: 4,
expect_worker_lock: true,
expect_thread_lock: false,
}
)
data(
"Buffered with multiple threads and multiple workers",
{
output_type: :full,
config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]),
workers: 4,
expect_worker_lock: true,
expect_thread_lock: true,
}
)
test "acquire_lock_if_need" do |data|
o = create_output(data[:output_type])
o.configure(data[:config])
o.system_config_override(workers: data[:workers])

test_lock_name = "test_lock_name"
lock_path = o.get_lock_path(test_lock_name)

o.acquire_lock_if_need(test_lock_name) do
assert_worker_lock(lock_path, data[:expect_worker_lock])
assert_thread_lock(o, data[:expect_thread_lock])
end

assert_worker_lock(lock_path, false)
assert_thread_lock(o, false)
end

data(
"Buffered with single thread and single worker",
{
output_type: :full,
config: config_element(
"ROOT", "", {},
[
config_element("buffer", "", {}),
config_element("secondary", "", {"@type" => "test", "name" => "test"}),
]
),
workers: 1,
expect_worker_lock: false,
expect_thread_lock: false,
}
)
data(
"Buffered with multiple threads and single worker",
{
output_type: :full,
config: config_element(
"ROOT", "", {},
[
config_element("buffer", "", {"flush_thread_count" => 8}),
config_element("secondary", "", {"@type" => "test", "name" => "test"}),
]
),
workers: 1,
expect_worker_lock: false,
expect_thread_lock: true,
}
)
data(
"Buffered with single thread and multiple workers",
{
output_type: :full,
config: config_element(
"ROOT", "", {},
[
config_element("buffer", "", {}),
config_element("secondary", "", {"@type" => "test", "name" => "test"}),
]
),
workers: 4,
expect_worker_lock: true,
expect_thread_lock: false,
}
)
data(
"Buffered with multiple threads and multiple workers",
{
output_type: :full,
config: config_element(
"ROOT", "", {},
[
config_element("buffer", "", {"flush_thread_count" => 8}),
config_element("secondary", "", {"@type" => "test", "name" => "test"}),
]
),
workers: 4,
expect_worker_lock: true,
expect_thread_lock: true,
}
)
test "acquire_lock_if_need for secondary" do |data|
primary = create_output(data[:output_type])
primary.configure(data[:config])
secondary = primary.secondary
secondary.system_config_override(workers: data[:workers])

test_lock_name = "test_lock_name"
lock_path = secondary.get_lock_path(test_lock_name)

secondary.acquire_lock_if_need(test_lock_name) do
assert_worker_lock(lock_path, data[:expect_worker_lock])
assert_thread_lock(secondary, data[:expect_thread_lock])
end

assert_worker_lock(lock_path, false)
assert_thread_lock(secondary, false)
end
end
end

0 comments on commit f23e7e6

Please sign in to comment.