diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index a674446442..064eb5b494 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -178,7 +178,7 @@ def start # But it should be overwritten by ack_response_timeout to rollback chunks after timeout if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout log.info "delayed_commit_timeout is overwritten by ack_response_timeout" - @delayed_commit_timeout = @ack_response_timeout + @delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s end @rand_seed = Random.new.seed @@ -214,22 +214,23 @@ def write(chunk) select_a_healthy_node{|node| node.send_data(tag, chunk) } end - ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :node, :time, :timeout) do + ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do def expired?(now) time + timeout < now end end def try_write(chunk) + log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id) if chunk.empty? commit_write(chunk.unique_id) return end tag = chunk.metadata.tag sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) } - chunk_id = Base64.encode64(chunk.unique_id) + chunk_id_base64 = Base64.encode64(chunk.unique_id) current_time = Process.clock_gettime(PROCESS_CLOCK_ID) - info = ACKWaitingSockInfo.new(sock, chunk_id, node, current_time, @ack_response_timeout) + info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout) @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting << info end @@ -341,7 +342,7 @@ def on_heartbeat(sockaddr, msg) end end - # return chunk id when succeeded for tests + # return chunk id to be committed def read_ack_from_sock(sock, unpacker) begin raw_data = sock.recv(@read_length) @@ -359,11 +360,14 @@ def read_ack_from_sock(sock, unpacker) else unpacker.feed(raw_data) res = unpacker.read - if res['ack'] != info.chunk_id + log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res + if res['ack'] != info.chunk_id_base64 # Some errors may have occured when ack and chunk id is different, so send the chunk again. - log.warn "ack in response and chunk id in sent data are different", chunk_id: info.chunk_id, ack: res['ack'] + log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack'] rollback_write(info.chunk_id) return nil + else + log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id) end return info.chunk_id end @@ -378,9 +382,9 @@ def read_ack_from_sock(sock, unpacker) def ack_reader select_interval = if @delayed_commit_timeout > 3 - 2 + 1 else - @delayed_commit_timeout / 2.0 + @delayed_commit_timeout / 3.0 end unpacker = Fluent::Engine.msgpack_unpacker @@ -388,30 +392,36 @@ def ack_reader while thread_current_running? now = Process.clock_gettime(PROCESS_CLOCK_ID) sockets = [] - @sock_ack_waiting_mutex.synchronize do - new_list = [] - @sock_ack_waiting.each do |info| - if info.expired?(now) - # There are 2 types of cases when no response has been received from socket: - # (1) the node does not support sending responses - # (2) the node does support sending response but responses have not arrived for some reasons. - log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port - info.node.disable! - info.sock.close rescue nil - rollback_write(info.chunk_id) - else - sockets << info.sock - new_list << info + begin + @sock_ack_waiting_mutex.synchronize do + new_list = [] + @sock_ack_waiting.each do |info| + if info.expired?(now) + # There are 2 types of cases when no response has been received from socket: + # (1) the node does not support sending responses + # (2) the node does support sending response but responses have not arrived for some reasons. + log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port + info.node.disable! + info.sock.close rescue nil + rollback_write(info.chunk_id) + else + sockets << info.sock + new_list << info + end end + @sock_ack_waiting = new_list end - @sock_ack_waiting = new_list - end - readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) - next unless readable_sockets + readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) + next unless readable_sockets - readable_sockets.each do |sock| - read_ack_from_sock(sock, unpacker) + readable_sockets.each do |sock| + chunk_id = read_ack_from_sock(sock, unpacker) + commit_write(chunk_id) + end + rescue => e + log.error "unexpected error while receiving ack", error: e + log.error_backtrace end end end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 9e02dc5de7..cad5fd566e 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -35,16 +35,25 @@ def teardown def create_driver(conf=CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) { - attr_reader :responses, :exceptions + attr_reader :response_chunk_ids, :exceptions, :sent_chunk_ids def initialize super - @responses = [] + @sent_chunk_ids = [] + @response_chunk_ids = [] @exceptions = [] end + def try_write(chunk) + retval = super + @sent_chunk_ids << chunk.unique_id + retval + end + def read_ack_from_sock(sock, unpacker) - @responses << super + retval = super + @response_chunk_ids << retval + retval rescue => e @exceptions << e raise e @@ -302,7 +311,7 @@ def read_ack_from_sock(sock, unpacker) assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - assert_empty d.instance.responses # not attempt to receive responses, so it's empty + assert_empty d.instance.response_chunk_ids # not attempt to receive responses, so it's empty assert_empty d.instance.exceptions end @@ -329,7 +338,7 @@ def read_ack_from_sock(sock, unpacker) assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - assert_empty d.instance.responses # not attempt to receive responses, so it's empty + assert_empty d.instance.response_chunk_ids # not attempt to receive responses, so it's empty assert_empty d.instance.exceptions end @@ -354,7 +363,7 @@ def read_ack_from_sock(sock, unpacker) {"a" => 2} ] target_input_driver.run(expect_records: 2) do - d.end_if{ d.instance.responses.length > 0 } + d.end_if{ d.instance.response_chunk_ids.length > 0 } d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do d.feed([[time, records[0]], [time,records[1]]]) end @@ -364,7 +373,8 @@ def read_ack_from_sock(sock, unpacker) assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - assert_equal 1, d.instance.responses.length + assert_equal 1, d.instance.response_chunk_ids.size + assert_equal d.instance.sent_chunk_ids.first, d.instance.response_chunk_ids.first assert_empty d.instance.exceptions end @@ -400,7 +410,7 @@ def read_ack_from_sock(sock, unpacker) end end - assert_equal 1, delayed_commit_timeout_value + assert_equal (1 + 2), delayed_commit_timeout_value events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] @@ -409,7 +419,6 @@ def read_ack_from_sock(sock, unpacker) assert{ d.instance.rollback_count > 0 } logs = d.instance.log.logs - assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } } assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } } end @@ -445,7 +454,7 @@ def read_ack_from_sock(sock, unpacker) end end - assert_equal 5, delayed_commit_timeout_value + assert_equal (5 + 2), delayed_commit_timeout_value events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] @@ -454,7 +463,6 @@ def read_ack_from_sock(sock, unpacker) assert{ d.instance.rollback_count > 0 } logs = d.instance.log.logs - assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } } assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } } end