From dd6bf393e85ea55886f254ac7d5c742a64a8c8d1 Mon Sep 17 00:00:00 2001 From: yugo-horie Date: Thu, 25 Feb 2021 10:11:09 +0900 Subject: [PATCH] Prevent from deplicating transmit reload forward * Fixes #3137 * Suspend try_flush in before shutdown phase * Confirm ack until after shutdown phase or last try_write Signed-off-by: yugo-horie --- lib/fluent/plugin/out_forward.rb | 89 ++++++++++++++++++++++---------- test/plugin/test_out_forward.rb | 67 ++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 28 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 38f6b1b5d5..faf11e24aa 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -166,6 +166,7 @@ def initialize @usock = nil @keep_alive_watcher_interval = 5 # TODO + @suspend_flush = false end def configure(conf) @@ -291,6 +292,15 @@ def prefer_delayed_commit @require_ack_response end + def overwrite_delayed_commit_timeout + # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout + # But it should be overwritten by ack_response_timeout to rollback chunks after timeout + if @delayed_commit_timeout != @ack_response_timeout + log.info "delayed_commit_timeout is overwritten by ack_response_timeout" + @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s + end + end + def start super @@ -303,13 +313,7 @@ def start end if @require_ack_response - # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout - # But it should be overwritten by ack_response_timeout to rollback chunks after timeout - if @delayed_commit_timeout != @ack_response_timeout - log.info "delayed_commit_timeout is overwritten by ack_response_timeout" - @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s - end - + overwrite_delayed_commit_timeout thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end @@ -346,6 +350,26 @@ def stop end end + def before_shutdown + super + @suspend_flush = true + end + + def after_shutdown + last_ack if @require_ack_response + super + end + + def try_flush + return if @require_ack_response && @suspend_flush + super + end + + def last_ack + overwrite_delayed_commit_timeout + ack_check(set_interval) + end + def write(chunk) return if chunk.empty? tag = chunk.metadata.tag @@ -361,6 +385,7 @@ def try_write(chunk) end tag = chunk.metadata.tag discovery_manager.select_service { |node| node.send_data(tag, chunk) } + last_ack if @require_ack_response && @suspend_flush end def create_transfer_socket(host, port, hostname, &block) @@ -481,31 +506,39 @@ def on_purge_obsolete_socks @connection_manager.purge_obsolete_socks end + def set_interval + if @delayed_commit_timeout > 3 + 1 + else + @delayed_commit_timeout / 3.0 + end + end + def ack_reader - select_interval = if @delayed_commit_timeout > 3 - 1 - else - @delayed_commit_timeout / 3.0 - end + select_interval = set_interval while thread_current_running? - @ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result| - @connection_manager.close(sock) - - case result - when AckHandler::Result::SUCCESS - commit_write(chunk_id) - when AckHandler::Result::FAILED - node.disable! - rollback_write(chunk_id, update_retry: false) - when AckHandler::Result::CHUNKID_UNMATCHED - rollback_write(chunk_id, update_retry: false) - else - log.warn("BUG: invalid status #{result} #{chunk_id}") + ack_check(select_interval) + end + end - if chunk_id - rollback_write(chunk_id, update_retry: false) - end + def ack_check(select_interval) + @ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result| + @connection_manager.close(sock) + + case result + when AckHandler::Result::SUCCESS + commit_write(chunk_id) + when AckHandler::Result::FAILED + node.disable! + rollback_write(chunk_id, update_retry: false) + when AckHandler::Result::CHUNKID_UNMATCHED + rollback_write(chunk_id, update_retry: false) + else + log.warn("BUG: invalid status #{result} #{chunk_id}") + + if chunk_id + rollback_write(chunk_id, update_retry: false) end end end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index db831918f4..8e4c370fac 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -371,6 +371,26 @@ def try_write(chunk) assert_equal 2, d.instance.ack_response_timeout end + test 'suspend_flush is disable before before_shutdown' do + @d = d = create_driver(CONFIG + %[ + require_ack_response true + ack_response_timeout 2s + ]) + d.instance_start + assert_false d.instance.instance_variable_get(:@suspend_flush) + end + + test 'suspend_flush should be enabled and try_flush returns nil after before_shutdown' do + @d = d = create_driver(CONFIG + %[ + require_ack_response true + ack_response_timeout 2s + ]) + d.instance_start + d.instance.before_shutdown + assert_true d.instance.instance_variable_get(:@suspend_flush) + assert_nil d.instance.try_flush + end + test 'verify_connection_at_startup is disabled in default' do @d = d = create_driver(CONFIG) assert_false d.instance.verify_connection_at_startup @@ -599,6 +619,53 @@ def try_write(chunk) assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first end + test 'a node supporting responses after stop' do + target_input_driver = create_target_input_driver + + @d = d = create_driver(CONFIG + %[ + require_ack_response true + ack_response_timeout 1s + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + + ]) + + time = event_time("2011-01-02 13:14:15 UTC") + + acked_chunk_ids = [] + mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success| + if success + acked_chunk_ids << info.chunk_id + end + [chunk_id, success] + end + + records = [ + {"a" => 1}, + {"a" => 2} + ] + target_input_driver.run(expect_records: 2) do + d.end_if { acked_chunk_ids.size > 0 } + d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do + d.instance.stop + d.feed([[time, records[0]], [time,records[1]]]) + d.instance.before_shutdown + d.instance.shutdown + d.instance.after_shutdown + end + end + + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] + + assert_equal 1, acked_chunk_ids.size + assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first + end + data('ack true' => true, 'ack false' => false) test 'TLS transport and ack parameter combination' do |ack|