diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 55887065a7..4d45b9706c 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -37,6 +37,8 @@ class HTTPOutput < Output class RetryableResponse < StandardError; end + ConnectionCache = Struct.new(:uri, :conn) + helpers :formatter desc 'The endpoint for HTTP request, e.g. http://example.com/api' @@ -60,6 +62,8 @@ class RetryableResponse < StandardError; end config_param :read_timeout, :integer, default: nil desc 'The TLS timeout in seconds' config_param :ssl_timeout, :integer, default: nil + desc 'Try to reuse connections' + config_param :reuse_connections, :bool, default: false desc 'The CA certificate path for TLS' config_param :tls_ca_cert_path, :string, default: nil @@ -100,17 +104,41 @@ class RetryableResponse < StandardError; end config_param :aws_role_arn, :string, default: nil end + def connection_cache_id_thread_key + "#{plugin_id}_connection_cache_id" + end + + def connection_cache_id_for_thread + Thread.current[connection_cache_id_thread_key] + end + + def connection_cache_id_for_thread=(id) + Thread.current[connection_cache_id_thread_key] = id + end + def initialize super @uri = nil @proxy_uri = nil @formatter = nil + + @connection_cache = [] + @connection_cache_id_mutex = Mutex.new + @connection_cache_next_id = 0 + end + + def close + super + + @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? } end def configure(conf) super + @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections + if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] @@ -302,16 +330,41 @@ def create_request(chunk, uri) req end + def make_request_cached(uri, req) + id = self.connection_cache_id_for_thread + if id.nil? + @connection_cache_id_mutex.synchronize { + id = @connection_cache_next_id + @connection_cache_next_id += 1 + } + self.connection_cache_id_for_thread = id + end + uri_str = uri.to_s + if @connection_cache[id].uri != uri_str + @connection_cache[id].conn.finish if @connection_cache[id].conn&.started? + http = if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) + else + Net::HTTP.start(uri.host, uri.port, @http_opt) + end + @connection_cache[id] = ConnectionCache.new(uri_str, http) + end + @connection_cache[id].conn.request(req) + end + + def make_request(uri, req, &block) + if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt, &block) + else + Net::HTTP.start(uri.host, uri.port, @http_opt, &block) + end + end def send_request(uri, req) - res = if @proxy_uri - Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| - http.request(req) - } + res = if @reuse_connections + make_request_cached(uri, req) else - Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| - http.request(req) - } + make_request(uri, req) { |http| http.request(req) } end if res.is_a?(Net::HTTPSuccess) diff --git a/test/plugin/test_out_http.rb b/test/plugin/test_out_http.rb index 04c80137b3..a0b3969a8d 100644 --- a/test/plugin/test_out_http.rb +++ b/test/plugin/test_out_http.rb @@ -518,4 +518,40 @@ def test_write_with_https assert_not_empty result.headers end end + + sub_test_case 'connection_reuse' do + def server_port + 19883 + end + + def test_connection_recreation + d = create_driver(%[ + endpoint http://127.0.0.1:#{server_port}/test + reuse_connections true + ]) + + d.run(default_tag: 'test.http', shutdown: false) do + d.feed(test_events[0]) + end + + data = @@result.data + + # Restart server to simulate connection loss + @@http_server_thread.kill + @@http_server_thread.join + @@http_server_thread = Thread.new do + run_http_server + end + + d.run(default_tag: 'test.http') do + d.feed(test_events[1]) + end + + result = @@result + assert_equal 'POST', result.method + assert_equal 'application/x-ndjson', result.content_type + assert_equal test_events, data.concat(result.data) + assert_not_empty result.headers + end + end end