diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb
index 070d6e480f..6c27c0ea6d 100644
--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -211,7 +211,7 @@ def handle_connection(conn)
options = on_message(msg, chunk_size, conn)
if options && r = response(options)
log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
- conn.on_write_complete{ conn.close } if @deny_keepalive
+ conn.on(:write_complete) { |c| c.close } if @deny_keepalive
send_data.call(serializer, r)
else
if @deny_keepalive
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index fb9d642d1e..00bc2cc125 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -420,7 +420,7 @@ def on_heartbeat(sockaddr, msg)
# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
- raw_data = sock.recv(@read_length)
+ raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
rescue Errno::ECONNRESET
raw_data = ""
end
diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index f31c3c4508..1e78db92f9 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -430,6 +430,49 @@ def read_ack_from_sock(sock, unpacker)
assert_empty d.instance.exceptions
end
+ data('ack true' => true,
+ 'ack false' => false)
+ test 'TLS transport and ack parameter combination' do |ack|
+ input_conf = TARGET_CONFIG + %[
+
+ insecure true
+
+ ]
+ target_input_driver = create_target_input_driver(conf: input_conf)
+
+ output_conf = %[
+ send_timeout 5
+ require_ack_response #{ack}
+ transport tls
+ tls_insecure_mode true
+
+ host #{TARGET_HOST}
+ port #{TARGET_PORT}
+
+
+ #flush_mode immediate
+ flush_interval 0s
+ flush_at_shutdown false # suppress errors in d.instance_shutdown
+
+ ]
+ @d = d = create_driver(output_conf)
+
+ time = event_time("2011-01-02 13:14:15 UTC")
+ records = [{"a" => 1}, {"a" => 2}]
+ target_input_driver.run(expect_records: 2, timeout: 3) do
+ d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
+ records.each do |record|
+ d.feed(time, record)
+ end
+ end
+ end
+
+ events = target_input_driver.events
+ assert{ events != [] }
+ assert_equal(['test', time, records[0]], events[0])
+ assert_equal(['test', time, records[1]], events[1])
+ end
+
test 'a destination node not supporting responses by just ignoring' do
target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: false)