Skip to content

Commit

Permalink
Merge pull request #2456 from ganmacs/fix-keepalive-and-shared-key-bug
Browse files Browse the repository at this point in the history
Fix keepalive and shared key bug
Signed-off-by: Masahiro Nakagawa <[email protected]>
  • Loading branch information
repeatedly committed Jun 14, 2019
1 parent 45c7b75 commit f2cf25f
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 18 deletions.
41 changes: 24 additions & 17 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,7 @@ def standby?
end

def verify_connection
connect do |sock|
ri = RequestInfo.new(@sender.security ? :helo : :established)
connect do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
Expand Down Expand Up @@ -810,11 +809,6 @@ def establish_connection(sock, ri)
end

def send_data_actual(sock, tag, chunk)
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
end

unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
Expand All @@ -841,7 +835,10 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = connect
sock, ri = connect
if ri.state != :established
establish_connection(sock, ri)
end

begin
send_data_actual(sock, tag, chunk)
Expand Down Expand Up @@ -1073,26 +1070,36 @@ def on_read(sock, ri, data)
private

def connect(host = nil)
sock = if @keepalive
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
else
@log.debug('connect new socket')
@sender.create_transfer_socket(host || resolved_host, port, @hostname)
end
socket, request_info =
if @keepalive
ri = RequestInfo.new(:established)
sock = @socket_cache.fetch_or do
s = @sender.create_transfer_socket(host || resolved_host, port, @hostname)
ri = RequestInfo.new(@sender.security ? :helo : :established) # overwrite if new connection
s
end
[sock, ri]
else
@log.debug('connect new socket')
[@sender.create_transfer_socket(host || resolved_host, port, @hostname), RequestInfo.new(@sender.security ? :helo : :established)]
end

if block_given?
ret = nil
begin
yield(sock)
ret = yield(socket, request_info)
rescue
@socket_cache.revoke if @keepalive
raise
else
@socket_cache.dec_ref if @keepalive
ensure
sock.close unless @keepalive
socket.close unless @keepalive
end

ret
else
sock
[socket, request_info]
end
end
end
Expand Down
50 changes: 49 additions & 1 deletion test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,55 @@ def read_ack_from_sock(sock, unpacker)
assert_equal(['test', time, records[1]], events[1])
end

test 'authentication_with_user_auth' do
test 'keepalive + shared_key' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
shared_key fluentd-sharedkey
</security>
]
target_input_driver = create_target_input_driver(conf: input_conf)

output_conf = %[
send_timeout 51
keepalive true
<security>
self_hostname localhost
shared_key fluentd-sharedkey
</security>
<server>
name test
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
]
@d = d = create_driver(output_conf)

time = event_time('2011-01-02 13:14:15 UTC')
records = [{ 'a' => 1 }, { 'a' => 2 }]
records2 = [{ 'b' => 1}, { 'b' => 2}]
target_input_driver.run(expect_records: 4, timeout: 15) do
d.run(default_tag: 'test') do
records.each do |record|
d.feed(time, record)
end

d.flush # emit buffer to reuse same socket later
records2.each do |record|
d.feed(time, record)
end
end
end

events = target_input_driver.events
assert{ events != [] }
assert_equal(['test', time, records[0]], events[0])
assert_equal(['test', time, records[1]], events[1])
assert_equal(['test', time, records2[0]], events[2])
assert_equal(['test', time, records2[1]], events[3])
end

test 'authentication_with_user_auth' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
Expand Down

0 comments on commit f2cf25f

Please sign in to comment.