Skip to content

Commit

Permalink
Merge pull request #4330 from Garfield96/out-http-reuse-connections
Browse files Browse the repository at this point in the history
out_http: Add option to reuse connections
  • Loading branch information
ashie authored Apr 30, 2024
2 parents 35e2210 + 9ce635c commit 8804a80
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 7 deletions.
67 changes: 60 additions & 7 deletions lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class HTTPOutput < Output

class RetryableResponse < StandardError; end

ConnectionCache = Struct.new(:uri, :conn)

helpers :formatter

desc 'The endpoint for HTTP request, e.g. http://example.com/api'
Expand All @@ -60,6 +62,8 @@ class RetryableResponse < StandardError; end
config_param :read_timeout, :integer, default: nil
desc 'The TLS timeout in seconds'
config_param :ssl_timeout, :integer, default: nil
desc 'Try to reuse connections'
config_param :reuse_connections, :bool, default: false

desc 'The CA certificate path for TLS'
config_param :tls_ca_cert_path, :string, default: nil
Expand Down Expand Up @@ -100,17 +104,41 @@ class RetryableResponse < StandardError; end
config_param :aws_role_arn, :string, default: nil
end

def connection_cache_id_thread_key
"#{plugin_id}_connection_cache_id"
end

def connection_cache_id_for_thread
Thread.current[connection_cache_id_thread_key]
end

def connection_cache_id_for_thread=(id)
Thread.current[connection_cache_id_thread_key] = id
end

def initialize
super

@uri = nil
@proxy_uri = nil
@formatter = nil

@connection_cache = []
@connection_cache_id_mutex = Mutex.new
@connection_cache_next_id = 0
end

def close
super

@connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? }
end

def configure(conf)
super

@connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections

if @retryable_response_codes.nil?
log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish')
@retryable_response_codes = [503]
Expand Down Expand Up @@ -302,16 +330,41 @@ def create_request(chunk, uri)
req
end

def make_request_cached(uri, req)
id = self.connection_cache_id_for_thread
if id.nil?
@connection_cache_id_mutex.synchronize {
id = @connection_cache_next_id
@connection_cache_next_id += 1
}
self.connection_cache_id_for_thread = id
end
uri_str = uri.to_s
if @connection_cache[id].uri != uri_str
@connection_cache[id].conn.finish if @connection_cache[id].conn&.started?
http = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt)
else
Net::HTTP.start(uri.host, uri.port, @http_opt)
end
@connection_cache[id] = ConnectionCache.new(uri_str, http)
end
@connection_cache[id].conn.request(req)
end

def make_request(uri, req, &block)
if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt, &block)
else
Net::HTTP.start(uri.host, uri.port, @http_opt, &block)
end
end

def send_request(uri, req)
res = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http|
http.request(req)
}
res = if @reuse_connections
make_request_cached(uri, req)
else
Net::HTTP.start(uri.host, uri.port, @http_opt) { |http|
http.request(req)
}
make_request(uri, req) { |http| http.request(req) }
end

if res.is_a?(Net::HTTPSuccess)
Expand Down
36 changes: 36 additions & 0 deletions test/plugin/test_out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,40 @@ def test_write_with_https
assert_not_empty result.headers
end
end

sub_test_case 'connection_reuse' do
def server_port
19883
end

def test_connection_recreation
d = create_driver(%[
endpoint http://127.0.0.1:#{server_port}/test
reuse_connections true
])

d.run(default_tag: 'test.http', shutdown: false) do
d.feed(test_events[0])
end

data = @@result.data

# Restart server to simulate connection loss
@@http_server_thread.kill
@@http_server_thread.join
@@http_server_thread = Thread.new do
run_http_server
end

d.run(default_tag: 'test.http') do
d.feed(test_events[1])
end

result = @@result
assert_equal 'POST', result.method
assert_equal 'application/x-ndjson', result.content_type
assert_equal test_events, data.concat(result.data)
assert_not_empty result.headers
end
end
end

0 comments on commit 8804a80

Please sign in to comment.