From 8b44e4f6b6fe9da3f7758e8532e1d1308d96a91d Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 9 May 2017 20:07:42 +0900 Subject: [PATCH 1/3] Call proper method for each connection type. fix #1554 --- lib/fluent/plugin/out_forward.rb | 2 +- test/plugin/test_out_forward.rb | 43 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index fb9d642d1e..1b5fc348a8 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -420,7 +420,7 @@ def on_heartbeat(sockaddr, msg) # return chunk id to be committed def read_ack_from_sock(sock, unpacker) begin - raw_data = sock.recv(@read_length) + raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.read(@read_length) : sock.recv(@read_length) rescue Errno::ECONNRESET raw_data = "" end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index f31c3c4508..1e78db92f9 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -430,6 +430,49 @@ def read_ack_from_sock(sock, unpacker) assert_empty d.instance.exceptions end + data('ack true' => true, + 'ack false' => false) + test 'TLS transport and ack parameter combination' do |ack| + input_conf = TARGET_CONFIG + %[ + + insecure true + + ] + target_input_driver = create_target_input_driver(conf: input_conf) + + output_conf = %[ + send_timeout 5 + require_ack_response #{ack} + transport tls + tls_insecure_mode true + + host #{TARGET_HOST} + port #{TARGET_PORT} + + + #flush_mode immediate + flush_interval 0s + flush_at_shutdown false # suppress errors in d.instance_shutdown + + ] + @d = d = create_driver(output_conf) + + time = event_time("2011-01-02 13:14:15 UTC") + records = [{"a" => 1}, {"a" => 2}] + target_input_driver.run(expect_records: 2, timeout: 3) do + d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do + records.each do |record| + d.feed(time, record) + end + end + end + + events = target_input_driver.events + assert{ events != [] } + assert_equal(['test', time, records[0]], events[0]) + assert_equal(['test', time, records[1]], events[1]) + end + test 'a destination node not supporting responses by just ignoring' do target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: false) From a8e7119c3ebe3929597ef41f048b1306bc3f268b Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 10 May 2017 04:52:21 +0900 Subject: [PATCH 2/3] Fix callback registration when deny_keepalive is true --- lib/fluent/plugin/in_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 070d6e480f..6c27c0ea6d 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -211,7 +211,7 @@ def handle_connection(conn) options = on_message(msg, chunk_size, conn) if options && r = response(options) log.trace "sent response to fluent socket", address: conn.remote_addr, response: r - conn.on_write_complete{ conn.close } if @deny_keepalive + conn.on(:write_complete) { |c| c.close } if @deny_keepalive send_data.call(serializer, r) else if @deny_keepalive From 751f84399897862158c57bf84701ea1f7fdc2fbb Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 10 May 2017 18:13:46 +0900 Subject: [PATCH 3/3] Use readpartial for SSLSocket to avoid blocking read --- lib/fluent/plugin/out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 1b5fc348a8..00bc2cc125 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -420,7 +420,7 @@ def on_heartbeat(sockaddr, msg) # return chunk id to be committed def read_ack_from_sock(sock, unpacker) begin - raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.read(@read_length) : sock.recv(@read_length) + raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length) rescue Errno::ECONNRESET raw_data = "" end