Skip to content

Commit

Permalink
in_http: Allow users to send payload in compressed form
Browse files Browse the repository at this point in the history
This adds support for the 'Content-Encoding' header, which allows
HTTP clients to transfer payload more efficiently by applying
compression.

You can test it using the following command:

    $ echo 'json={"foo":"bar"}' | gzip > json.gz
    $ curl --data-binary @json.gz -H "Content-Encoding: gzip" \
           http://localhost:9880/debug.log

This feature should be useful especially for users who handle large
amounts of data.
  • Loading branch information
Fujimoto Seiji committed Jul 5, 2018
1 parent 815e513 commit a147d34
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
19 changes: 19 additions & 0 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def on_headers_complete(headers)
end
@env = {}
@content_type = ""
@content_encoding = ""
headers.each_pair {|k,v|
@env["HTTP_#{k.gsub('-','_').upcase}"] = v
case k
Expand All @@ -325,6 +326,8 @@ def on_headers_complete(headers)
size = v.to_i
when /Content-Type/i
@content_type = v
when /Content-Encoding/i
@content_encoding = v
when /Connection/i
if v =~ /close/i
@keep_alive = false
Expand Down Expand Up @@ -376,6 +379,22 @@ def on_message_complete
end
end

# Content Encoding
# =================
# Decode payload according to the "Content-Encoding" header.
# For now, we only support 'gzip' and 'deflate'.
begin
if @content_encoding == 'gzip'
@body = Zlib::GzipReader.new(StringIO.new(@body)).read
elsif @content_encoding == 'deflate'
@body = Zlib::Inflate.inflate(@body)
end
rescue
@log.warn 'fails to decode payload', error: $!.to_s
send_response_and_close("400 Bad Request", {}, "")
return
end

@env['REMOTE_ADDR'] = @remote_addr if @remote_addr

uri = URI.parse(@parser.request_url)
Expand Down
55 changes: 55 additions & 0 deletions test/plugin/test_in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,54 @@ def test_cors_allowed
assert_equal_event_time time, d.events[1][1]
end

def test_content_encoding_gzip
d = create_driver

time = event_time("2011-01-02 13:14:15 UTC")
events = [
["tag1", time, {"a"=>1}],
["tag2", time, {"a"=>2}],
]
res_codes = []
res_headers = []

d.run do
events.each do |tag, time, record|
header = {'Content-Type'=>'application/json', 'Content-Encoding'=>'gzip'}
res = post("/#{tag}?time=#{time}", compress_gzip(record.to_json), header)
res_codes << res.code
end
end
assert_equal ["200", "200"], res_codes
assert_equal events, d.events
assert_equal_event_time time, d.events[0][1]
assert_equal_event_time time, d.events[1][1]
end

def test_content_encoding_deflate
d = create_driver

time = event_time("2011-01-02 13:14:15 UTC")
events = [
["tag1", time, {"a"=>1}],
["tag2", time, {"a"=>2}],
]
res_codes = []
res_headers = []

d.run do
events.each do |tag, time, record|
header = {'Content-Type'=>'application/msgpack', 'Content-Encoding'=>'deflate'}
res = post("/#{tag}?time=#{time}", Zlib.deflate(record.to_msgpack), header)
res_codes << res.code
end
end
assert_equal ["200", "200"], res_codes
assert_equal events, d.events
assert_equal_event_time time, d.events[0][1]
assert_equal_event_time time, d.events[1][1]
end

def test_cors_disallowed
d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]")
assert_equal ["http://foo.com"], d.instance.cors_allow_origins
Expand Down Expand Up @@ -690,6 +738,13 @@ def post(path, params, header = {}, &block)
http.request(req)
end

def compress_gzip(data)
io = StringIO.new
io.binmode
Zlib::GzipWriter.wrap(io) { |gz| gz.write data }
return io.string
end

def include_http_header?(record)
record.keys.find { |header| header.start_with?('HTTP_') }
end
Expand Down

0 comments on commit a147d34

Please sign in to comment.