Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

socket helper: Support connect_timeout for TCP/TLS #2467

Merged
merged 1 commit into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class ConnectionClosedError < Error; end

desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
desc 'The timeout time for socket connect'
config_param :connect_timeout, :time, default: nil
# TODO: add linger_timeout, recv_timeout

desc 'The protocol to use for heartbeats (default is the same with "transport").'
Expand Down Expand Up @@ -376,6 +378,7 @@ def create_transfer_socket(host, port, hostname, &block)
linger_timeout: Fluent.windows? ? nil : @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
when :tcp
Expand All @@ -384,6 +387,7 @@ def create_transfer_socket(host, port, hostname, &block)
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
else
Expand Down
10 changes: 8 additions & 2 deletions lib/fluent/plugin_helper/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ def socket_create(proto, host, port, **kwargs, &block)
end
end

def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block)
sock = WrappedSocket::TCP.new(host, port)
def socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block)
sock = if connect_timeout
s = ::Socket.tcp(host, port, connect_timeout: connect_timeout)
Copy link
Member

@ganmacs ganmacs Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when and who will close this socket if it is set false to autoclose?

Ah, WrappedSocket::TCP will close this socket.

https://github.com/ruby/ruby/blob/e4cafa393f8fd4aa207f20b1d122884b4de99cf1/io.c#L8436-L8451

s.autoclose = false # avoid GC triggered close
WrappedSocket::TCP.for_fd(s.fileno)
else
WrappedSocket::TCP.new(host, port)
end
socket_option_set(sock, resolve_name: resolve_name, **kwargs)
if block
begin
Expand Down
18 changes: 18 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def read_ack_from_sock(sock, unpacker)
assert_equal 60, d.instance.send_timeout
assert_equal :transport, d.instance.heartbeat_type
assert_equal 1, nodes.length
assert_nil d.instance.connect_timeout
node = nodes.first
assert_equal "test", node.name
assert_equal '127.0.0.1', node.host
Expand All @@ -102,6 +103,23 @@ def read_ack_from_sock(sock, unpacker)
assert_equal( 10*1024*1024, instance.buffer.chunk_limit_size )
end

test 'configure timeouts' do
@d = d = create_driver(%[
send_timeout 30
connect_timeout 10
hard_timeout 15
ack_response_timeout 20
<server>
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
])
assert_equal 30, d.instance.send_timeout
assert_equal 10, d.instance.connect_timeout
assert_equal 15, d.instance.hard_timeout
assert_equal 20, d.instance.ack_response_timeout
end

test 'configure_udp_heartbeat' do
@d = d = create_driver(CONFIG + "\nheartbeat_type udp")
assert_equal :udp, d.instance.heartbeat_type
Expand Down