Skip to content

Commit

Permalink
Merge pull request #1560 from fluent/fix-1554
Browse files Browse the repository at this point in the history
Call proper method for each connection type. fix #1554
  • Loading branch information
repeatedly authored May 12, 2017
2 parents 2dd43f5 + 751f843 commit 3f8630d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def handle_connection(conn)
options = on_message(msg, chunk_size, conn)
if options && r = response(options)
log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
conn.on_write_complete{ conn.close } if @deny_keepalive
conn.on(:write_complete) { |c| c.close } if @deny_keepalive
send_data.call(serializer, r)
else
if @deny_keepalive
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def on_heartbeat(sockaddr, msg)
# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
raw_data = sock.recv(@read_length)
raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
rescue Errno::ECONNRESET
raw_data = ""
end
Expand Down
43 changes: 43 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,49 @@ def read_ack_from_sock(sock, unpacker)
assert_empty d.instance.exceptions
end

data('ack true' => true,
'ack false' => false)
test 'TLS transport and ack parameter combination' do |ack|
input_conf = TARGET_CONFIG + %[
<transport tls>
insecure true
</transport>
]
target_input_driver = create_target_input_driver(conf: input_conf)

output_conf = %[
send_timeout 5
require_ack_response #{ack}
transport tls
tls_insecure_mode true
<server>
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
<buffer>
#flush_mode immediate
flush_interval 0s
flush_at_shutdown false # suppress errors in d.instance_shutdown
</buffer>
]
@d = d = create_driver(output_conf)

time = event_time("2011-01-02 13:14:15 UTC")
records = [{"a" => 1}, {"a" => 2}]
target_input_driver.run(expect_records: 2, timeout: 3) do
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
records.each do |record|
d.feed(time, record)
end
end
end

events = target_input_driver.events
assert{ events != [] }
assert_equal(['test', time, records[0]], events[0])
assert_equal(['test', time, records[1]], events[1])
end

test 'a destination node not supporting responses by just ignoring' do
target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: false)

Expand Down

0 comments on commit 3f8630d

Please sign in to comment.