Skip to content

Commit

Permalink
Prevent from deplicating transmit reload forward
Browse files Browse the repository at this point in the history
* Fixes #3137
* Suspend try_flush in before shutdown phase
* Confirm ack until after shutdown phase or last try_write

Signed-off-by: yugo-horie <[email protected]>
  • Loading branch information
yugo-horie committed Mar 6, 2021
1 parent 8634a26 commit dd6bf39
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 28 deletions.
89 changes: 61 additions & 28 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def initialize

@usock = nil
@keep_alive_watcher_interval = 5 # TODO
@suspend_flush = false
end

def configure(conf)
Expand Down Expand Up @@ -291,6 +292,15 @@ def prefer_delayed_commit
@require_ack_response
end

def overwrite_delayed_commit_timeout
# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end
end

def start
super

Expand All @@ -303,13 +313,7 @@ def start
end

if @require_ack_response
# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

overwrite_delayed_commit_timeout
thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end

Expand Down Expand Up @@ -346,6 +350,26 @@ def stop
end
end

def before_shutdown
super
@suspend_flush = true
end

def after_shutdown
last_ack if @require_ack_response
super
end

def try_flush
return if @require_ack_response && @suspend_flush
super
end

def last_ack
overwrite_delayed_commit_timeout
ack_check(set_interval)
end

def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag
Expand All @@ -361,6 +385,7 @@ def try_write(chunk)
end
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end

def create_transfer_socket(host, port, hostname, &block)
Expand Down Expand Up @@ -481,31 +506,39 @@ def on_purge_obsolete_socks
@connection_manager.purge_obsolete_socks
end

def set_interval
if @delayed_commit_timeout > 3
1
else
@delayed_commit_timeout / 3.0
end
end

def ack_reader
select_interval = if @delayed_commit_timeout > 3
1
else
@delayed_commit_timeout / 3.0
end
select_interval = set_interval

while thread_current_running?
@ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
@connection_manager.close(sock)

case result
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
log.warn("BUG: invalid status #{result} #{chunk_id}")
ack_check(select_interval)
end
end

if chunk_id
rollback_write(chunk_id, update_retry: false)
end
def ack_check(select_interval)
@ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
@connection_manager.close(sock)

case result
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
log.warn("BUG: invalid status #{result} #{chunk_id}")

if chunk_id
rollback_write(chunk_id, update_retry: false)
end
end
end
Expand Down
67 changes: 67 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,26 @@ def try_write(chunk)
assert_equal 2, d.instance.ack_response_timeout
end

test 'suspend_flush is disable before before_shutdown' do
@d = d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 2s
])
d.instance_start
assert_false d.instance.instance_variable_get(:@suspend_flush)
end

test 'suspend_flush should be enabled and try_flush returns nil after before_shutdown' do
@d = d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 2s
])
d.instance_start
d.instance.before_shutdown
assert_true d.instance.instance_variable_get(:@suspend_flush)
assert_nil d.instance.try_flush
end

test 'verify_connection_at_startup is disabled in default' do
@d = d = create_driver(CONFIG)
assert_false d.instance.verify_connection_at_startup
Expand Down Expand Up @@ -599,6 +619,53 @@ def try_write(chunk)
assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first
end

test 'a node supporting responses after stop' do
target_input_driver = create_target_input_driver

@d = d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 1s
<buffer tag>
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
</buffer>
])

time = event_time("2011-01-02 13:14:15 UTC")

acked_chunk_ids = []
mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success|
if success
acked_chunk_ids << info.chunk_id
end
[chunk_id, success]
end

records = [
{"a" => 1},
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if { acked_chunk_ids.size > 0 }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.instance.stop
d.feed([[time, records[0]], [time,records[1]]])
d.instance.before_shutdown
d.instance.shutdown
d.instance.after_shutdown
end
end

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_equal 1, acked_chunk_ids.size
assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first
end

data('ack true' => true,
'ack false' => false)
test 'TLS transport and ack parameter combination' do |ack|
Expand Down

0 comments on commit dd6bf39

Please sign in to comment.