Skip to content

Commit

Permalink
Add require_ack_response option
Browse files Browse the repository at this point in the history
Signed-off-by: Kenji Okimoto <[email protected]>
  • Loading branch information
okkez authored and kostyaplis committed Oct 1, 2019
1 parent 1cb98f1 commit 59ff16b
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions lib/fluent/logger/fluent_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def initialize(tag_prefix = nil, *args)
end
@packer = @factory.packer

@require_ack_response = options[:require_ack_response]
@ack_response_timeout = options[:ack_response_timeout] || 190

@mon = Monitor.new
@pending = nil
@connect_error_history = []
Expand Down Expand Up @@ -149,7 +152,7 @@ def close
if @pending
begin
@pending.each do |tag, record|
send_data([tag, record].to_msgpack)
send_data(tag, record)
end
rescue => e
set_last_error(e)
Expand Down Expand Up @@ -237,7 +240,7 @@ def write(tag, time, map)

begin
@pending.each do |tag, record|
send_data([tag, record].to_msgpack)
send_data(tag, record)
end
@pending = nil
true
Expand All @@ -258,7 +261,7 @@ def write(tag, time, map)
}
end

def send_data(data)
def send_data(tag, record)
unless connect?
connect!
end
Expand All @@ -270,6 +273,12 @@ def send_data(data)
# block timeout error during IO#write
ws.first.write(data)
end
if @require_ack_response
option = {}
option['chunk'] = generate_chunk
@con.write [tag, record, option].to_msgpack
else
@con.write [tag, record].to_msgpack
end
#while true
# puts "sending #{data.length} bytes"
Expand All @@ -290,13 +299,22 @@ def send_data_nonblock(data)
written = @con.write_nonblock(data)
remaining = data.bytesize - written

while remaining > 0
len = @con.write_nonblock(data.byteslice(written, remaining))
remaining -= len
written += len
if @require_ack_response && @ack_response_timeout > 0
if IO.select([@con], nil, nil, @ack_response_timeout)
raw_data = @con.recv(1024)

if raw_data.empty?
raise "Closed connection"
else
response = MessagePack.unpack(raw_data)
if response['ack'] != option['chunk']
raise "ack in response and chunk id in sent data are different"
end
end
end
end

written
true
end

def connect!
Expand Down Expand Up @@ -342,6 +360,10 @@ def wait_writeable?(e)
true
end
end

def generate_chunk
Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp
end
end
end
end

0 comments on commit 59ff16b

Please sign in to comment.