Skip to content

Commit

Permalink
Merge pull request #4528 from AustralianAntarcticDataCentre/http-out-…
Browse files Browse the repository at this point in the history
…gzip

out_http: add gzip option
  • Loading branch information
ashie authored Jul 6, 2024
2 parents 0dba6ca + 6ef1411 commit e5ca1c7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
12 changes: 12 additions & 0 deletions lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class RetryableResponse < StandardError; end
config_param :headers, :hash, default: nil
desc 'Additional placeholder based headers for HTTP request'
config_param :headers_from_placeholders, :hash, default: nil
desc 'Compress HTTP request body'
config_param :compress, :enum, list: [:text, :gzip], default: :text

desc 'The connection open timeout in seconds'
config_param :open_timeout, :integer, default: nil
Expand Down Expand Up @@ -290,6 +292,9 @@ def set_headers(req, uri, chunk)
req[k] = extract_placeholders(v, chunk)
end
end
if @compress == :gzip
req['Content-Encoding'] = "gzip"
end
req['Content-Type'] = @content_type
end

Expand Down Expand Up @@ -323,8 +328,15 @@ def create_request(chunk, uri)
Net::HTTP::Put.new(uri.request_uri)
end
set_headers(req, uri, chunk)

req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read

if @compress == :gzip
gz = Zlib::GzipWriter.new(StringIO.new)
gz << req.body
req.body = gz.close.string
end

# At least one authentication method requires the body and other headers, so the order of this call matters
set_auth(req, uri)
req
Expand Down
52 changes: 48 additions & 4 deletions test/plugin/test_out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,30 @@ def run_http_server
@@result.headers[key] = value
end

body = ""
data = []

case req['content-encoding']
when 'gzip'
body = Zlib::GzipReader.new(StringIO.new(req.body)).read
else
body = req.body
end

case req.content_type
when 'application/x-ndjson'
req.body.each_line { |l|
body.each_line { |l|
data << JSON.parse(l)
}
when 'application/json'
data = JSON.parse(req.body)
data = JSON.parse(body)
when 'text/plain'
# Use single_value in this test
req.body.each_line { |line|
body.each_line { |line|
data << line.chomp
}
else
data << req.body
data << body
end
@@result.data = data

Expand Down Expand Up @@ -519,6 +528,41 @@ def test_write_with_https
end
end

sub_test_case 'GZIP' do
def server_port
19882
end

data(:json_array, [false, true])
data(:buffer_compress, ["text", "gzip"])
def test_write_with_gzip
d = create_driver(%[
endpoint http://127.0.0.1:#{server_port}/test
compress gzip
json_array #{data[:json_array]}
<buffer>
@type memory
compress #{data[:buffer_compress]}
</buffer>
])
d.run(default_tag: 'test.http') do
test_events.each { |event|
d.feed(event)
}
end

result = @@result
assert_equal 'POST', result.method
assert_equal(
data[:json_array] ? 'application/json' : 'application/x-ndjson',
result.content_type
)
assert_equal 'gzip', result.headers['content-encoding']
assert_equal test_events, result.data
assert_not_empty result.headers
end
end

sub_test_case 'connection_reuse' do
def server_port
19883
Expand Down

0 comments on commit e5ca1c7

Please sign in to comment.