Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent from duplicating transmit reload forward #3267

Merged
merged 1 commit into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def initialize

@usock = nil
@keep_alive_watcher_interval = 5 # TODO
@suspend_flush = false
end

def configure(conf)
Expand Down Expand Up @@ -291,6 +292,15 @@ def prefer_delayed_commit
@require_ack_response
end

def overwrite_delayed_commit_timeout
# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end
end

def start
super

Expand All @@ -303,13 +313,7 @@ def start
end

if @require_ack_response
# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

overwrite_delayed_commit_timeout
thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end

Expand Down Expand Up @@ -346,6 +350,26 @@ def stop
end
end

def before_shutdown
super
@suspend_flush = true
end

def after_shutdown
last_ack if @require_ack_response
super
end

def try_flush
return if @require_ack_response && @suspend_flush
super
end

def last_ack
overwrite_delayed_commit_timeout
ack_check(set_interval)
end

def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag
Expand All @@ -361,6 +385,7 @@ def try_write(chunk)
end
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end

def create_transfer_socket(host, port, hostname, &block)
Expand Down Expand Up @@ -481,31 +506,39 @@ def on_purge_obsolete_socks
@connection_manager.purge_obsolete_socks
end

def set_interval
if @delayed_commit_timeout > 3
1
else
@delayed_commit_timeout / 3.0
end
end

def ack_reader
select_interval = if @delayed_commit_timeout > 3
1
else
@delayed_commit_timeout / 3.0
end
select_interval = set_interval

while thread_current_running?
@ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
@connection_manager.close(sock)

case result
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
log.warn("BUG: invalid status #{result} #{chunk_id}")
ack_check(select_interval)
end
end

if chunk_id
rollback_write(chunk_id, update_retry: false)
end
def ack_check(select_interval)
@ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
@connection_manager.close(sock)

case result
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
log.warn("BUG: invalid status #{result} #{chunk_id}")

if chunk_id
rollback_write(chunk_id, update_retry: false)
end
end
end
Expand Down
67 changes: 67 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,26 @@ def try_write(chunk)
assert_equal 2, d.instance.ack_response_timeout
end

test 'suspend_flush is disable before before_shutdown' do
@d = d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 2s
])
d.instance_start
assert_false d.instance.instance_variable_get(:@suspend_flush)
end

test 'suspend_flush should be enabled and try_flush returns nil after before_shutdown' do
@d = d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 2s
])
d.instance_start
d.instance.before_shutdown
assert_true d.instance.instance_variable_get(:@suspend_flush)
assert_nil d.instance.try_flush
end

test 'verify_connection_at_startup is disabled in default' do
@d = d = create_driver(CONFIG)
assert_false d.instance.verify_connection_at_startup
Expand Down Expand Up @@ -599,6 +619,53 @@ def try_write(chunk)
assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first
end

test 'a node supporting responses after stop' do
target_input_driver = create_target_input_driver

@d = d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 1s
<buffer tag>
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
</buffer>
])

time = event_time("2011-01-02 13:14:15 UTC")

acked_chunk_ids = []
mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success|
if success
acked_chunk_ids << info.chunk_id
end
[chunk_id, success]
end

records = [
{"a" => 1},
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if { acked_chunk_ids.size > 0 }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.instance.stop
d.feed([[time, records[0]], [time,records[1]]])
d.instance.before_shutdown
d.instance.shutdown
d.instance.after_shutdown
end
end

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_equal 1, acked_chunk_ids.size
assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first
end

data('ack true' => true,
'ack false' => false)
test 'TLS transport and ack parameter combination' do |ack|
Expand Down