Skip to content

Commit

Permalink
in_http: Add support for Application/x-ndjson
Browse files Browse the repository at this point in the history
This enable in_http plugin to recognize "ndjson" format, which is
basically a list of JSON objects separated by "\n".

Here is a typical ndjson message looks like:

    {"foo": "bar"}
    {"buz": "hoge"}

See http://ndjson.org/ for more details.

Signed-off-by: Fujimoto Seiji <[email protected]>
  • Loading branch information
fujimotos committed Feb 4, 2022
1 parent 26c62cd commit 91b87ab
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
12 changes: 11 additions & 1 deletion lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,16 @@ def parse_params_default(params)
@parser_json.parse(js) do |_time, record|
return nil, record
end
elsif ndjson = params['ndjson']
events = []
ndjson.split(/\r?\n/).each do |js|
@parser_json.parse(js) do |_time, record|
events.push(record)
end
end
return nil, events
else
raise "'json' or 'msgpack' parameter is required"
raise "'json', 'ndjson' or 'msgpack' parameter is required"
end
end

Expand Down Expand Up @@ -567,6 +575,8 @@ def on_message_complete
params['json'] = @body
elsif @content_type =~ /^application\/msgpack/
params['msgpack'] = @body
elsif @content_type =~ /^application\/x-ndjson/
params['ndjson'] = @body
end
path_info = uri.path

Expand Down
23 changes: 23 additions & 0 deletions test/plugin/test_in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,29 @@ def test_application_msgpack
assert_equal_event_time time, d.events[1][1]
end

def test_application_ndjson
d = create_driver
events = [
["tag1", 1643935663, "{\"a\":1}\n{\"b\":2}"],
["tag2", 1643935664, "{\"a\":3}\r\n{\"b\":4}"]
]

expected = [
["tag1", 1643935663, {"a"=>1}],
["tag1", 1643935663, {"b"=>2}],
["tag2", 1643935664, {"a"=>3}],
["tag2", 1643935664, {"b"=>4}]
]

d.run(expect_records: 1) do
events.each do |tag, time, record|
res = post("/#{tag}?time=#{time}", record, {"Content-Type"=>"application/x-ndjson"})
assert_equal("200", res.code)
end
end
assert_equal(expected, d.events)
end

def test_msgpack
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
Expand Down

0 comments on commit 91b87ab

Please sign in to comment.