diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index ad7fbf9186..959089f005 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2548,7 +2548,6 @@ def test_shutdown_timeout end sub_test_case "throttling logs at in_tail level" do - data("file test1.log no_limit 5120 text: msg" => ["test1.log", 5120, "msg"], "file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"]) def test_lines_collected_with_no_throttling(data) @@ -2558,7 +2557,7 @@ def test_lines_collected_with_no_throttling(data) rule = create_rule_directive({ "file" => "/test.*/", }, -1) - group = create_group_directive(pattern, '10s', rule) + group = create_group_directive(pattern, "1s", rule) path_element = create_path_element(file) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG @@ -2571,20 +2570,23 @@ def test_lines_collected_with_no_throttling(data) d = create_driver(conf, false) - d.run do + d.run(timeout: 3) do start_time = Fluent::Clock.now - assert_true(Fluent::Clock.now - start_time < 10) assert_equal(num_lines, d.record_count) assert_equal({ "message" => msg }, d.events[0][2]) prev_count = d.record_count - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - sleep(1) until Fluent::Clock.now - start_time > 12 - ## after waiting for 10 secs, limit will reset + sleep(0.1) while d.emit_count < 1 + assert_true(Fluent::Clock.now - start_time < 2) + ## after waiting for 1 (+ jitter) secs, limit will reset ## Plugin will start reading but it will encounter EOF Error ## since no logs are left to be read ## Hence, d.record_count = prev_count + tail_watcher_interval = 1.0 # hard coded value in in_tail + safety_mergin = 1.02 + jitter = tail_watcher_interval * safety_mergin + sleep(1.0 + jitter) assert_equal(0, d.record_count - prev_count) end end @@ -2592,16 +2594,16 @@ def test_lines_collected_with_no_throttling(data) test "lines collected with throttling" do file = "podname1_namespace12_container-123456.log" limit = 1000 - rate_period = '10s' + rate_period = 2 num_lines = 3000 - msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes + msg = "a" * 8190 # Total size = 8190 bytes + 2 (\n) bytes rule = create_rule_directive({ "namespace"=> "/namespace.+/", "podname"=> "/podname.+/", }, limit) path_element = create_path_element(file) - conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, "#{rate_period}s", rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD d = create_driver(conf, false) file_path = "#{TMP_DIR}/#{file}" @@ -2612,25 +2614,37 @@ def test_lines_collected_with_no_throttling(data) end end - d.run do - start_time = Fluent::Clock.now + d.run(timeout: 15) do + sleep_interval = 0.1 + tail_watcher_interval = 1.0 # hard coded value in in_tail + safety_mergin = 1.02 + lower_jitter = sleep_interval * safety_mergin + upper_jitter = (tail_watcher_interval + sleep_interval) * safety_mergin + lower_interval = rate_period - lower_jitter + upper_interval = rate_period + upper_jitter + + emit_count = 0 prev_count = 0 - (num_lines/limit).times do - assert_true(Fluent::Clock.now - start_time < 10) - ## Check record_count after 10s to check lines reads + while emit_count < 3 do + start_time = Fluent::Clock.now + sleep(sleep_interval) while d.emit_count <= emit_count + elapsed_seconds = Fluent::Clock.now - start_time + if emit_count > 0 + assert_true(elapsed_seconds > lower_interval && elapsed_seconds < upper_interval, + "elapsed_seconds #{elapsed_seconds} is out of allowed range:\n" + + " lower: #{lower_interval} [sec]\n" + + " upper: #{upper_interval} [sec]") + end assert_equal(limit, d.record_count - prev_count) + emit_count = d.emit_count prev_count = d.record_count - ## sleep until rate_period seconds are over so that - ## Plugin can read lines again - sleep(1) until Fluent::Clock.now - start_time > 12 - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - start_time = Fluent::Clock.now end + ## When all the lines are read and rate_period seconds are over ## limit will reset and since there are no more logs to be read, ## number_lines_read will be 0 - + sleep upper_interval gw = d.instance.find_group_from_metadata(file_path) assert_equal(0, gw.current_paths[file_path].number_lines_read) end