Skip to content

Commit

Permalink
out_http: Add option to reuse connections
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Norbert Menges <[email protected]>
  • Loading branch information
Garfield96 authored and daipom committed Apr 30, 2024
1 parent 35e2210 commit 5844e96
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class RetryableResponse < StandardError; end
config_param :read_timeout, :integer, default: nil
desc 'The TLS timeout in seconds'
config_param :ssl_timeout, :integer, default: nil
desc 'Try to reuse connections'
config_param :reuse_connections, :bool, default: false

desc 'The CA certificate path for TLS'
config_param :tls_ca_cert_path, :string, default: nil
Expand Down Expand Up @@ -106,11 +108,25 @@ def initialize
@uri = nil
@proxy_uri = nil
@formatter = nil

@Cache = []
@CacheIdMutex = Mutex.new
@CacheEntry = Struct.new(:uri, :conn)
end

def close
super

# Close all open connections
@Cache.each {|entry| entry.conn.finish if entry.conn&.started? }
end

def configure(conf)
super

@Cache = Array.new(actual_flush_thread_count, @CacheEntry.new("", nil)) if @reuse_connections
@CacheId = 0

if @retryable_response_codes.nil?
log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish')
@retryable_response_codes = [503]
Expand Down Expand Up @@ -302,16 +318,45 @@ def create_request(chunk, uri)
req
end

def make_request_cached(uri, req)
id = Thread.current.thread_variable_get(plugin_id)
if id.nil?
@CacheIdMutex.synchronize {
id = @CacheId
@CacheId += 1
}
Thread.current.thread_variable_set(plugin_id, id)
end
uri_str = uri.to_s
if @Cache[id].uri != uri_str
@Cache[id].conn.finish if @Cache[id].conn&.started?
http = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt)
else
Net::HTTP.start(uri.host, uri.port, @http_opt)
end
@Cache[id] = @CacheEntry.new(uri_str, http)
end
@Cache[id].conn.request(req)
end

def make_request(uri, req)
if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http|
http.request(req)
}
else
Net::HTTP.start(uri.host, uri.port, @http_opt) { |http|
http.request(req)
}
end
end

def send_request(uri, req)
res = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http|
http.request(req)
}
res = if @reuse_connections
make_request_cached(uri, req)
else
Net::HTTP.start(uri.host, uri.port, @http_opt) { |http|
http.request(req)
}
make_request(uri, req)
end

if res.is_a?(Net::HTTPSuccess)
Expand Down

0 comments on commit 5844e96

Please sign in to comment.