From 6ef1411d346359a9264e73bd2fb974094e951906 Mon Sep 17 00:00:00 2001 From: Lewis Rockliffe Date: Thu, 13 Jun 2024 16:18:12 +1000 Subject: [PATCH] out_http: add gzip option Signed-off-by: Lewis Rockliffe --- lib/fluent/plugin/out_http.rb | 12 ++++++++ test/plugin/test_out_http.rb | 52 ++++++++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 4d45b9706c..1ca3910456 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -55,6 +55,8 @@ class RetryableResponse < StandardError; end config_param :headers, :hash, default: nil desc 'Additional placeholder based headers for HTTP request' config_param :headers_from_placeholders, :hash, default: nil + desc 'Compress HTTP request body' + config_param :compress, :enum, list: [:text, :gzip], default: :text desc 'The connection open timeout in seconds' config_param :open_timeout, :integer, default: nil @@ -290,6 +292,9 @@ def set_headers(req, uri, chunk) req[k] = extract_placeholders(v, chunk) end end + if @compress == :gzip + req['Content-Encoding'] = "gzip" + end req['Content-Type'] = @content_type end @@ -323,8 +328,15 @@ def create_request(chunk, uri) Net::HTTP::Put.new(uri.request_uri) end set_headers(req, uri, chunk) + req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read + if @compress == :gzip + gz = Zlib::GzipWriter.new(StringIO.new) + gz << req.body + req.body = gz.close.string + end + # At least one authentication method requires the body and other headers, so the order of this call matters set_auth(req, uri) req diff --git a/test/plugin/test_out_http.rb b/test/plugin/test_out_http.rb index a0b3969a8d..265568e876 100644 --- a/test/plugin/test_out_http.rb +++ b/test/plugin/test_out_http.rb @@ -75,21 +75,30 @@ def run_http_server @@result.headers[key] = value end + body = "" data = [] + + case req['content-encoding'] + when 'gzip' + body = Zlib::GzipReader.new(StringIO.new(req.body)).read + else + body = req.body + end + case req.content_type when 'application/x-ndjson' - req.body.each_line { |l| + body.each_line { |l| data << JSON.parse(l) } when 'application/json' - data = JSON.parse(req.body) + data = JSON.parse(body) when 'text/plain' # Use single_value in this test - req.body.each_line { |line| + body.each_line { |line| data << line.chomp } else - data << req.body + data << body end @@result.data = data @@ -519,6 +528,41 @@ def test_write_with_https end end + sub_test_case 'GZIP' do + def server_port + 19882 + end + + data(:json_array, [false, true]) + data(:buffer_compress, ["text", "gzip"]) + def test_write_with_gzip + d = create_driver(%[ + endpoint http://127.0.0.1:#{server_port}/test + compress gzip + json_array #{data[:json_array]} + + @type memory + compress #{data[:buffer_compress]} + + ]) + d.run(default_tag: 'test.http') do + test_events.each { |event| + d.feed(event) + } + end + + result = @@result + assert_equal 'POST', result.method + assert_equal( + data[:json_array] ? 'application/json' : 'application/x-ndjson', + result.content_type + ) + assert_equal 'gzip', result.headers['content-encoding'] + assert_equal test_events, result.data + assert_not_empty result.headers + end + end + sub_test_case 'connection_reuse' do def server_port 19883