Skip to content

Commit

Permalink
Merge pull request #1389 from fluent/fix-bug-forward-ack-reponse
Browse files Browse the repository at this point in the history
fix bug about chunk response handling
  • Loading branch information
tagomoris authored Dec 26, 2016
2 parents 948e26b + 0579cce commit 739e7db
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 40 deletions.
68 changes: 39 additions & 29 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def start
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

@rand_seed = Random.new.seed
Expand Down Expand Up @@ -214,22 +214,23 @@ def write(chunk)
select_a_healthy_node{|node| node.send_data(tag, chunk) }
end

ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :node, :time, :timeout) do
ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do
def expired?(now)
time + timeout < now
end
end

def try_write(chunk)
log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
if chunk.empty?
commit_write(chunk.unique_id)
return
end
tag = chunk.metadata.tag
sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) }
chunk_id = Base64.encode64(chunk.unique_id)
chunk_id_base64 = Base64.encode64(chunk.unique_id)
current_time = Process.clock_gettime(PROCESS_CLOCK_ID)
info = ACKWaitingSockInfo.new(sock, chunk_id, node, current_time, @ack_response_timeout)
info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout)
@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting << info
end
Expand Down Expand Up @@ -341,7 +342,7 @@ def on_heartbeat(sockaddr, msg)
end
end

# return chunk id when succeeded for tests
# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
raw_data = sock.recv(@read_length)
Expand All @@ -359,11 +360,14 @@ def read_ack_from_sock(sock, unpacker)
else
unpacker.feed(raw_data)
res = unpacker.read
if res['ack'] != info.chunk_id
log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
if res['ack'] != info.chunk_id_base64
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: info.chunk_id, ack: res['ack']
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
rollback_write(info.chunk_id)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
end
return info.chunk_id
end
Expand All @@ -378,40 +382,46 @@ def read_ack_from_sock(sock, unpacker)

def ack_reader
select_interval = if @delayed_commit_timeout > 3
2
1
else
@delayed_commit_timeout / 2.0
@delayed_commit_timeout / 3.0
end

unpacker = Fluent::Engine.msgpack_unpacker

while thread_current_running?
now = Process.clock_gettime(PROCESS_CLOCK_ID)
sockets = []
@sock_ack_waiting_mutex.synchronize do
new_list = []
@sock_ack_waiting.each do |info|
if info.expired?(now)
# There are 2 types of cases when no response has been received from socket:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
rollback_write(info.chunk_id)
else
sockets << info.sock
new_list << info
begin
@sock_ack_waiting_mutex.synchronize do
new_list = []
@sock_ack_waiting.each do |info|
if info.expired?(now)
# There are 2 types of cases when no response has been received from socket:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
rollback_write(info.chunk_id)
else
sockets << info.sock
new_list << info
end
end
@sock_ack_waiting = new_list
end
@sock_ack_waiting = new_list
end

readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
next unless readable_sockets
readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
next unless readable_sockets

readable_sockets.each do |sock|
read_ack_from_sock(sock, unpacker)
readable_sockets.each do |sock|
chunk_id = read_ack_from_sock(sock, unpacker)
commit_write(chunk_id)
end
rescue => e
log.error "unexpected error while receiving ack", error: e
log.error_backtrace
end
end
end
Expand Down
30 changes: 19 additions & 11 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,25 @@ def teardown

def create_driver(conf=CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) {
attr_reader :responses, :exceptions
attr_reader :response_chunk_ids, :exceptions, :sent_chunk_ids

def initialize
super
@responses = []
@sent_chunk_ids = []
@response_chunk_ids = []
@exceptions = []
end

def try_write(chunk)
retval = super
@sent_chunk_ids << chunk.unique_id
retval
end

def read_ack_from_sock(sock, unpacker)
@responses << super
retval = super
@response_chunk_ids << retval
retval
rescue => e
@exceptions << e
raise e
Expand Down Expand Up @@ -302,7 +311,7 @@ def read_ack_from_sock(sock, unpacker)
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_empty d.instance.responses # not attempt to receive responses, so it's empty
assert_empty d.instance.response_chunk_ids # not attempt to receive responses, so it's empty
assert_empty d.instance.exceptions
end

Expand All @@ -329,7 +338,7 @@ def read_ack_from_sock(sock, unpacker)
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_empty d.instance.responses # not attempt to receive responses, so it's empty
assert_empty d.instance.response_chunk_ids # not attempt to receive responses, so it's empty
assert_empty d.instance.exceptions
end

Expand All @@ -354,7 +363,7 @@ def read_ack_from_sock(sock, unpacker)
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if{ d.instance.responses.length > 0 }
d.end_if{ d.instance.response_chunk_ids.length > 0 }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.feed([[time, records[0]], [time,records[1]]])
end
Expand All @@ -364,7 +373,8 @@ def read_ack_from_sock(sock, unpacker)
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_equal 1, d.instance.responses.length
assert_equal 1, d.instance.response_chunk_ids.size
assert_equal d.instance.sent_chunk_ids.first, d.instance.response_chunk_ids.first
assert_empty d.instance.exceptions
end

Expand Down Expand Up @@ -400,7 +410,7 @@ def read_ack_from_sock(sock, unpacker)
end
end

assert_equal 1, delayed_commit_timeout_value
assert_equal (1 + 2), delayed_commit_timeout_value

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
Expand All @@ -409,7 +419,6 @@ def read_ack_from_sock(sock, unpacker)
assert{ d.instance.rollback_count > 0 }

logs = d.instance.log.logs
assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } }
assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } }
end

Expand Down Expand Up @@ -445,7 +454,7 @@ def read_ack_from_sock(sock, unpacker)
end
end

assert_equal 5, delayed_commit_timeout_value
assert_equal (5 + 2), delayed_commit_timeout_value

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
Expand All @@ -454,7 +463,6 @@ def read_ack_from_sock(sock, unpacker)
assert{ d.instance.rollback_count > 0 }

logs = d.instance.log.logs
assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } }
assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } }
end

Expand Down

0 comments on commit 739e7db

Please sign in to comment.