Skip to content

Commit

Permalink
Merge pull request #2467 from fluent/socket-connect-timeout
Browse files Browse the repository at this point in the history
socket helper: Support connect_timeout for TCP/TLS
  • Loading branch information
repeatedly authored Jun 20, 2019
2 parents 40dd780 + e6b3006 commit 910ec4c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class ConnectionClosedError < Error; end

desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
desc 'The timeout time for socket connect'
config_param :connect_timeout, :time, default: nil
# TODO: add linger_timeout, recv_timeout

desc 'The protocol to use for heartbeats (default is the same with "transport").'
Expand Down Expand Up @@ -376,6 +378,7 @@ def create_transfer_socket(host, port, hostname, &block)
linger_timeout: Fluent.windows? ? nil : @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
when :tcp
Expand All @@ -384,6 +387,7 @@ def create_transfer_socket(host, port, hostname, &block)
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
else
Expand Down
10 changes: 8 additions & 2 deletions lib/fluent/plugin_helper/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ def socket_create(proto, host, port, **kwargs, &block)
end
end

def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block)
sock = WrappedSocket::TCP.new(host, port)
def socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block)
sock = if connect_timeout
s = ::Socket.tcp(host, port, connect_timeout: connect_timeout)
s.autoclose = false # avoid GC triggered close
WrappedSocket::TCP.for_fd(s.fileno)
else
WrappedSocket::TCP.new(host, port)
end
socket_option_set(sock, resolve_name: resolve_name, **kwargs)
if block
begin
Expand Down
18 changes: 18 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def read_ack_from_sock(sock, unpacker)
assert_equal 60, d.instance.send_timeout
assert_equal :transport, d.instance.heartbeat_type
assert_equal 1, nodes.length
assert_nil d.instance.connect_timeout
node = nodes.first
assert_equal "test", node.name
assert_equal '127.0.0.1', node.host
Expand All @@ -102,6 +103,23 @@ def read_ack_from_sock(sock, unpacker)
assert_equal( 10*1024*1024, instance.buffer.chunk_limit_size )
end

test 'configure timeouts' do
@d = d = create_driver(%[
send_timeout 30
connect_timeout 10
hard_timeout 15
ack_response_timeout 20
<server>
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
])
assert_equal 30, d.instance.send_timeout
assert_equal 10, d.instance.connect_timeout
assert_equal 15, d.instance.hard_timeout
assert_equal 20, d.instance.ack_response_timeout
end

test 'configure_udp_heartbeat' do
@d = d = create_driver(CONFIG + "\nheartbeat_type udp")
assert_equal :udp, d.instance.heartbeat_type
Expand Down

0 comments on commit 910ec4c

Please sign in to comment.