From 8fc7ddb727e42138dc1be5b260e18131564b3112 Mon Sep 17 00:00:00 2001 From: Christian Norbert Menges Date: Sun, 22 Oct 2023 14:24:15 +0200 Subject: [PATCH] out_http: Add option to reuse connections Signed-off-by: Christian Norbert Menges --- lib/fluent/plugin/out_http.rb | 60 +++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index b4c149feb0..c12df842ff 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -60,6 +60,8 @@ class RetryableResponse < StandardError; end config_param :read_timeout, :integer, default: nil desc 'The TLS timeout in seconds' config_param :ssl_timeout, :integer, default: nil + desc 'Try to reuse connections' + config_param :reuse_connections, :bool, default: false desc 'The CA certificate path for TLS' config_param :tls_ca_cert_path, :string, default: nil @@ -100,11 +102,25 @@ def initialize @uri = nil @proxy_uri = nil @formatter = nil + + @Cache = [] + @CacheIdMutex = Mutex.new + @CacheEntry = Struct.new(:uri, :conn) + end + + def close + super + + # Close all open connections + @Cache.each {|entry| entry.conn.finish if entry.conn&.started? } end def configure(conf) super + @Cache = Array.new(actual_flush_thread_count, @CacheEntry.new("", nil)) if @reuse_connections + @CacheId = 0 + if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] @@ -244,15 +260,45 @@ def create_request(chunk, uri) req end + def make_request_cached(uri, req) + id = Thread.current.thread_variable_get(plugin_id) + if id.nil? + @CacheIdMutex.synchronize { + id = @CacheId + @CacheId += 1 + } + Thread.current.thread_variable_set(plugin_id, id) + end + uri_str = uri.to_s + if @Cache[id].uri != uri_str + @Cache[id].conn.finish if @Cache[id].conn&.started? + http = if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) + else + Net::HTTP.start(uri.host, uri.port, @http_opt) + end + @Cache[id] = @CacheEntry.new(uri_str, http) + end + @Cache[id].conn.request(req) + end + + def make_request(uri, req) + if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| + http.request(req) + } + else + Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| + http.request(req) + } + end + end + def send_request(uri, req) - res = if @proxy_uri - Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| - http.request(req) - } + res = if @reuse_connections + make_request_cached(uri, req) else - Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| - http.request(req) - } + make_request(uri, req) end if res.is_a?(Net::HTTPSuccess)