Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug about chunk response handling #1389

Merged
merged 1 commit into from
Dec 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def start
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If @delayed_commit_timeout and @ack_response_timeout are the same, @delayed_commit_timeout = @ack_response_timeout + 2 line is skipped. Is this okay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, because these are configured intentionally by users, and don't cause problems for longer configured values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OT. Adding why message is better for users to understand why delayed_commit_timeout is overwritten in out_forward.

@delayed_commit_timeout = @ack_response_timeout
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

@rand_seed = Random.new.seed
Expand Down Expand Up @@ -214,22 +214,23 @@ def write(chunk)
select_a_healthy_node{|node| node.send_data(tag, chunk) }
end

ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :node, :time, :timeout) do
ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one idea.
How about using def chunk_id_base64 instead of struct member?
It reduces additional member cost.

Copy link
Member Author

@tagomoris tagomoris Dec 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not bad idea, but it looks better for me to add these methods to chunk:

  • unique_id_hex
  • unique_id_base64

These values are called many times (especially _hex for logging), and should be cached not to calculate 2 or more times.
How do you think about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... it also seems good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another one: chunk.unique_id(:hex) (or :base64)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like method approach, unique_id_hex, rathar than unique_id(:hex).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing this point makes many bad things (one is unreasonable SEGV in Travis) and doesn't bring so big happy.
I'll leave this as is.

def expired?(now)
time + timeout < now
end
end

def try_write(chunk)
log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
if chunk.empty?
commit_write(chunk.unique_id)
return
end
tag = chunk.metadata.tag
sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) }
chunk_id = Base64.encode64(chunk.unique_id)
chunk_id_base64 = Base64.encode64(chunk.unique_id)
current_time = Process.clock_gettime(PROCESS_CLOCK_ID)
info = ACKWaitingSockInfo.new(sock, chunk_id, node, current_time, @ack_response_timeout)
info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout)
@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting << info
end
Expand Down Expand Up @@ -341,7 +342,7 @@ def on_heartbeat(sockaddr, msg)
end
end

# return chunk id when succeeded for tests
# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
raw_data = sock.recv(@read_length)
Expand All @@ -359,11 +360,14 @@ def read_ack_from_sock(sock, unpacker)
else
unpacker.feed(raw_data)
res = unpacker.read
if res['ack'] != info.chunk_id
log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
if res['ack'] != info.chunk_id_base64
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: info.chunk_id, ack: res['ack']
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
rollback_write(info.chunk_id)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
end
return info.chunk_id
end
Expand All @@ -378,40 +382,46 @@ def read_ack_from_sock(sock, unpacker)

def ack_reader
select_interval = if @delayed_commit_timeout > 3
2
1
else
@delayed_commit_timeout / 2.0
@delayed_commit_timeout / 3.0
end

unpacker = Fluent::Engine.msgpack_unpacker

while thread_current_running?
now = Process.clock_gettime(PROCESS_CLOCK_ID)
sockets = []
@sock_ack_waiting_mutex.synchronize do
new_list = []
@sock_ack_waiting.each do |info|
if info.expired?(now)
# There are 2 types of cases when no response has been received from socket:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
rollback_write(info.chunk_id)
else
sockets << info.sock
new_list << info
begin
@sock_ack_waiting_mutex.synchronize do
new_list = []
@sock_ack_waiting.each do |info|
if info.expired?(now)
# There are 2 types of cases when no response has been received from socket:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
rollback_write(info.chunk_id)
else
sockets << info.sock
new_list << info
end
end
@sock_ack_waiting = new_list
end
@sock_ack_waiting = new_list
end

readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
next unless readable_sockets
readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
next unless readable_sockets

readable_sockets.each do |sock|
read_ack_from_sock(sock, unpacker)
readable_sockets.each do |sock|
chunk_id = read_ack_from_sock(sock, unpacker)
commit_write(chunk_id)
end
rescue => e
log.error "unexpected error while receiving ack", error: e
log.error_backtrace
end
end
end
Expand Down
30 changes: 19 additions & 11 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,25 @@ def teardown

def create_driver(conf=CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) {
attr_reader :responses, :exceptions
attr_reader :response_chunk_ids, :exceptions, :sent_chunk_ids

def initialize
super
@responses = []
@sent_chunk_ids = []
@response_chunk_ids = []
@exceptions = []
end

def try_write(chunk)
retval = super
@sent_chunk_ids << chunk.unique_id
retval
end

def read_ack_from_sock(sock, unpacker)
@responses << super
retval = super
@response_chunk_ids << retval
retval
rescue => e
@exceptions << e
raise e
Expand Down Expand Up @@ -302,7 +311,7 @@ def read_ack_from_sock(sock, unpacker)
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_empty d.instance.responses # not attempt to receive responses, so it's empty
assert_empty d.instance.response_chunk_ids # not attempt to receive responses, so it's empty
assert_empty d.instance.exceptions
end

Expand All @@ -329,7 +338,7 @@ def read_ack_from_sock(sock, unpacker)
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_empty d.instance.responses # not attempt to receive responses, so it's empty
assert_empty d.instance.response_chunk_ids # not attempt to receive responses, so it's empty
assert_empty d.instance.exceptions
end

Expand All @@ -354,7 +363,7 @@ def read_ack_from_sock(sock, unpacker)
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if{ d.instance.responses.length > 0 }
d.end_if{ d.instance.response_chunk_ids.length > 0 }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.feed([[time, records[0]], [time,records[1]]])
end
Expand All @@ -364,7 +373,8 @@ def read_ack_from_sock(sock, unpacker)
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]

assert_equal 1, d.instance.responses.length
assert_equal 1, d.instance.response_chunk_ids.size
assert_equal d.instance.sent_chunk_ids.first, d.instance.response_chunk_ids.first
assert_empty d.instance.exceptions
end

Expand Down Expand Up @@ -400,7 +410,7 @@ def read_ack_from_sock(sock, unpacker)
end
end

assert_equal 1, delayed_commit_timeout_value
assert_equal (1 + 2), delayed_commit_timeout_value

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
Expand All @@ -409,7 +419,6 @@ def read_ack_from_sock(sock, unpacker)
assert{ d.instance.rollback_count > 0 }

logs = d.instance.log.logs
assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } }
assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } }
end

Expand Down Expand Up @@ -445,7 +454,7 @@ def read_ack_from_sock(sock, unpacker)
end
end

assert_equal 5, delayed_commit_timeout_value
assert_equal (5 + 2), delayed_commit_timeout_value

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
Expand All @@ -454,7 +463,6 @@ def read_ack_from_sock(sock, unpacker)
assert{ d.instance.rollback_count > 0 }

logs = d.instance.log.logs
assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } }
assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } }
end

Expand Down