diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index fb5cc36bb7..4acfd3178d 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -770,8 +770,7 @@ def standby?
end
def verify_connection
- connect do |sock|
- ri = RequestInfo.new(@sender.security ? :helo : :established)
+ connect do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
@@ -810,11 +809,6 @@ def establish_connection(sock, ri)
end
def send_data_actual(sock, tag, chunk)
- ri = RequestInfo.new(@sender.security ? :helo : :established)
- if ri.state != :established
- establish_connection(sock, ri)
- end
-
unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
@@ -841,7 +835,10 @@ def send_data_actual(sock, tag, chunk)
end
def send_data(tag, chunk)
- sock = connect
+ sock, ri = connect
+ if ri.state != :established
+ establish_connection(sock, ri)
+ end
begin
send_data_actual(sock, tag, chunk)
@@ -1073,26 +1070,36 @@ def on_read(sock, ri, data)
private
def connect(host = nil)
- sock = if @keepalive
- @socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
- else
- @log.debug('connect new socket')
- @sender.create_transfer_socket(host || resolved_host, port, @hostname)
- end
+ socket, request_info =
+ if @keepalive
+ ri = RequestInfo.new(:established)
+ sock = @socket_cache.fetch_or do
+ s = @sender.create_transfer_socket(host || resolved_host, port, @hostname)
+ ri = RequestInfo.new(@sender.security ? :helo : :established) # overwrite if new connection
+ s
+ end
+ [sock, ri]
+ else
+ @log.debug('connect new socket')
+ [@sender.create_transfer_socket(host || resolved_host, port, @hostname), RequestInfo.new(@sender.security ? :helo : :established)]
+ end
if block_given?
+ ret = nil
begin
- yield(sock)
+ ret = yield(socket, request_info)
rescue
@socket_cache.revoke if @keepalive
raise
else
@socket_cache.dec_ref if @keepalive
ensure
- sock.close unless @keepalive
+ socket.close unless @keepalive
end
+
+ ret
else
- sock
+ [socket, request_info]
end
end
end
diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index dc6bc0e210..bddc08dff0 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -649,7 +649,55 @@ def read_ack_from_sock(sock, unpacker)
assert_equal(['test', time, records[1]], events[1])
end
- test 'authentication_with_user_auth' do
+ test 'keepalive + shared_key' do
+ input_conf = TARGET_CONFIG + %[
+
+ self_hostname in.localhost
+ shared_key fluentd-sharedkey
+
+ ]
+ target_input_driver = create_target_input_driver(conf: input_conf)
+
+ output_conf = %[
+ send_timeout 51
+ keepalive true
+
+ self_hostname localhost
+ shared_key fluentd-sharedkey
+
+
+ name test
+ host #{TARGET_HOST}
+ port #{TARGET_PORT}
+
+ ]
+ @d = d = create_driver(output_conf)
+
+ time = event_time('2011-01-02 13:14:15 UTC')
+ records = [{ 'a' => 1 }, { 'a' => 2 }]
+ records2 = [{ 'b' => 1}, { 'b' => 2}]
+ target_input_driver.run(expect_records: 4, timeout: 15) do
+ d.run(default_tag: 'test') do
+ records.each do |record|
+ d.feed(time, record)
+ end
+
+ d.flush # emit buffer to reuse same socket later
+ records2.each do |record|
+ d.feed(time, record)
+ end
+ end
+ end
+
+ events = target_input_driver.events
+ assert{ events != [] }
+ assert_equal(['test', time, records[0]], events[0])
+ assert_equal(['test', time, records[1]], events[1])
+ assert_equal(['test', time, records2[0]], events[2])
+ assert_equal(['test', time, records2[1]], events[3])
+ end
+
+ test 'authentication_with_user_auth' do
input_conf = TARGET_CONFIG + %[
self_hostname in.localhost