Skip to content

Commit

Permalink
test_in_tail: Reduce test time of group watch tests
Browse files Browse the repository at this point in the history
Signed-off-by: Takuro Ashie <[email protected]>
  • Loading branch information
ashie committed May 16, 2022
1 parent 07ef562 commit efa7d2d
Showing 1 changed file with 35 additions and 21 deletions.
56 changes: 35 additions & 21 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2548,7 +2548,6 @@ def test_shutdown_timeout
end

sub_test_case "throttling logs at in_tail level" do

data("file test1.log no_limit 5120 text: msg" => ["test1.log", 5120, "msg"],
"file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"])
def test_lines_collected_with_no_throttling(data)
Expand All @@ -2558,7 +2557,7 @@ def test_lines_collected_with_no_throttling(data)
rule = create_rule_directive({
"file" => "/test.*/",
}, -1)
group = create_group_directive(pattern, '10s', rule)
group = create_group_directive(pattern, "1s", rule)
path_element = create_path_element(file)

conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG
Expand All @@ -2571,37 +2570,40 @@ def test_lines_collected_with_no_throttling(data)


d = create_driver(conf, false)
d.run do
d.run(timeout: 3) do
start_time = Fluent::Clock.now

assert_true(Fluent::Clock.now - start_time < 10)
assert_equal(num_lines, d.record_count)
assert_equal({ "message" => msg }, d.events[0][2])

prev_count = d.record_count
## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver
sleep(1) until Fluent::Clock.now - start_time > 12
## after waiting for 10 secs, limit will reset
sleep(0.1) while d.emit_count < 1
assert_true(Fluent::Clock.now - start_time < 2)
## after waiting for 1 (+ jitter) secs, limit will reset
## Plugin will start reading but it will encounter EOF Error
## since no logs are left to be read
## Hence, d.record_count = prev_count
tail_watcher_interval = 1.0 # hard coded value in in_tail
safety_mergin = 1.02
jitter = tail_watcher_interval * safety_mergin
sleep(1.0 + jitter)
assert_equal(0, d.record_count - prev_count)
end
end

test "lines collected with throttling" do
file = "podname1_namespace12_container-123456.log"
limit = 1000
rate_period = '10s'
rate_period = 2
num_lines = 3000
msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes
msg = "a" * 8190 # Total size = 8190 bytes + 2 (\n) bytes

rule = create_rule_directive({
"namespace"=> "/namespace.+/",
"podname"=> "/podname.+/",
}, limit)
path_element = create_path_element(file)
conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD
conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, "#{rate_period}s", rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD

d = create_driver(conf, false)
file_path = "#{TMP_DIR}/#{file}"
Expand All @@ -2612,25 +2614,37 @@ def test_lines_collected_with_no_throttling(data)
end
end

d.run do
start_time = Fluent::Clock.now
d.run(timeout: 15) do
sleep_interval = 0.1
tail_watcher_interval = 1.0 # hard coded value in in_tail
safety_mergin = 1.02
lower_jitter = sleep_interval * safety_mergin
upper_jitter = (tail_watcher_interval + sleep_interval) * safety_mergin
lower_interval = rate_period - lower_jitter
upper_interval = rate_period + upper_jitter

emit_count = 0
prev_count = 0

(num_lines/limit).times do
assert_true(Fluent::Clock.now - start_time < 10)
## Check record_count after 10s to check lines reads
while emit_count < 3 do
start_time = Fluent::Clock.now
sleep(sleep_interval) while d.emit_count <= emit_count
elapsed_seconds = Fluent::Clock.now - start_time
if emit_count > 0
assert_true(elapsed_seconds > lower_interval && elapsed_seconds < upper_interval,
"elapsed_seconds #{elapsed_seconds} is out of allowed range:\n" +
" lower: #{lower_interval} [sec]\n" +
" upper: #{upper_interval} [sec]")
end
assert_equal(limit, d.record_count - prev_count)
emit_count = d.emit_count
prev_count = d.record_count
## sleep until rate_period seconds are over so that
## Plugin can read lines again
sleep(1) until Fluent::Clock.now - start_time > 12
## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver
start_time = Fluent::Clock.now
end

## When all the lines are read and rate_period seconds are over
## limit will reset and since there are no more logs to be read,
## number_lines_read will be 0

sleep upper_interval
gw = d.instance.find_group_from_metadata(file_path)
assert_equal(0, gw.current_paths[file_path].number_lines_read)
end
Expand Down

0 comments on commit efa7d2d

Please sign in to comment.