diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index fb9d642d1e..1b5fc348a8 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -420,7 +420,7 @@ def on_heartbeat(sockaddr, msg) # return chunk id to be committed def read_ack_from_sock(sock, unpacker) begin - raw_data = sock.recv(@read_length) + raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.read(@read_length) : sock.recv(@read_length) rescue Errno::ECONNRESET raw_data = "" end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index f31c3c4508..1e78db92f9 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -430,6 +430,49 @@ def read_ack_from_sock(sock, unpacker) assert_empty d.instance.exceptions end + data('ack true' => true, + 'ack false' => false) + test 'TLS transport and ack parameter combination' do |ack| + input_conf = TARGET_CONFIG + %[ + + insecure true + + ] + target_input_driver = create_target_input_driver(conf: input_conf) + + output_conf = %[ + send_timeout 5 + require_ack_response #{ack} + transport tls + tls_insecure_mode true + + host #{TARGET_HOST} + port #{TARGET_PORT} + + + #flush_mode immediate + flush_interval 0s + flush_at_shutdown false # suppress errors in d.instance_shutdown + + ] + @d = d = create_driver(output_conf) + + time = event_time("2011-01-02 13:14:15 UTC") + records = [{"a" => 1}, {"a" => 2}] + target_input_driver.run(expect_records: 2, timeout: 3) do + d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do + records.each do |record| + d.feed(time, record) + end + end + end + + events = target_input_driver.events + assert{ events != [] } + assert_equal(['test', time, records[0]], events[0]) + assert_equal(['test', time, records[1]], events[1]) + end + test 'a destination node not supporting responses by just ignoring' do target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: false)