Skip to content

Commit

Permalink
Merge pull request #3313 from fluent/fix-wrong-ack-test
Browse files Browse the repository at this point in the history
Fix an unstable test for ack response of out_forward
  • Loading branch information
ashie authored Apr 1, 2021
2 parents bca0ac4 + 22a3e14 commit 6a2852a
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -587,31 +587,36 @@ def try_write(chunk)
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
flush_at_shutdown true
</buffer>
])

time = event_time("2011-01-02 13:14:15 UTC")

acked_chunk_ids = []
nacked = false
mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success|
if success
acked_chunk_ids << info.chunk_id
else
nacked = true
end
[chunk_id, success]
[info, success]
end

records = [
{"a" => 1},
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if { acked_chunk_ids.size > 0 }
target_input_driver.run(expect_records: 2, timeout: 5) do
d.end_if { acked_chunk_ids.size > 0 || nacked }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.feed([[time, records[0]], [time,records[1]]])
end
end

assert(!nacked, d.instance.log.logs.join)

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]
Expand All @@ -630,26 +635,29 @@ def try_write(chunk)
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
flush_at_shutdown true
</buffer>
])

time = event_time("2011-01-02 13:14:15 UTC")

acked_chunk_ids = []
nacked = false
mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success|
if success
acked_chunk_ids << info.chunk_id
else
nacked = true
end
[chunk_id, success]
[info, success]
end

records = [
{"a" => 1},
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if { acked_chunk_ids.size > 0 }
target_input_driver.run(expect_records: 2, timeout: 5) do
d.end_if { acked_chunk_ids.size > 0 || nacked }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.instance.stop
d.feed([[time, records[0]], [time,records[1]]])
Expand All @@ -659,6 +667,8 @@ def try_write(chunk)
end
end

assert(!nacked, d.instance.log.logs.join)

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]
Expand Down

0 comments on commit 6a2852a

Please sign in to comment.