diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb
index bed031005c..7916825f3e 100644
--- a/lib/fluent/plugin/in_http.rb
+++ b/lib/fluent/plugin/in_http.rb
@@ -27,10 +27,28 @@
module Fluent::Plugin
class InHttpParser < Parser
Fluent::Plugin.register_parser('in_http', self)
+
+ config_set_default :time_key, 'time'
+
+ def configure(conf)
+ super
+
+ # if no time parser related parameters, use in_http's time convert rule
+ @time_parser = if conf.has_key?('time_type') || conf.has_key?('time_format')
+ time_parser_create
+ else
+ nil
+ end
+ end
+
def parse(text)
# this plugin is dummy implementation not to raise error
yield nil, nil
end
+
+ def get_time_parser
+ @time_parser
+ end
end
class HttpInput < Input
@@ -74,17 +92,20 @@ def configure(conf)
super
+ @parser = nil
m = if @parser_configs.first['@type'] == 'in_http'
@parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
+ @parser_msgpack.time_key = nil
@parser_msgpack.estimate_current_event = false
@parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
+ @parser_json.time_key = nil
@parser_json.estimate_current_event = false
+
+ default_parser = parser_create(usage: '')
@format_name = 'default'
- @parser_time_key = if parser_config = conf.elements('parse').first
- parser_config['time_key'] || 'time'
- else
- 'time'
- end
+ @parser_time_key = default_parser.time_key
+ @default_time_parser = default_parser.get_time_parser
+ @default_keep_time_key = default_parser.keep_time_key
method(:parse_params_default)
else
@parser = parser_create
@@ -180,7 +201,23 @@ def on_request(path_info, params)
param_time = param_time.to_f
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
- record_time.nil? ? Fluent::EventTime.now : record_time
+ if record_time.nil?
+ if !record.is_a?(Array)
+ if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key)
+ if @default_time_parser
+ @default_time_parser.parse(t)
+ else
+ Fluent::EventTime.from_time(Time.at(t))
+ end
+ else
+ Fluent::EventTime.now
+ end
+ else
+ Fluent::EventTime.now
+ end
+ else
+ record_time
+ end
end
rescue => e
if @dump_error_log
@@ -189,10 +226,10 @@ def on_request(path_info, params)
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"]
end
- # TODO server error
- begin
- # Support batched requests
- if record.is_a?(Array)
+ mes = nil
+ # Support batched requests
+ if record.is_a?(Array)
+ begin
mes = Fluent::MultiEventStream.new
record.each do |single_record|
if @add_http_headers
@@ -206,12 +243,16 @@ def on_request(path_info, params)
single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
end
- if defined? @parser
+ if @parser
single_time = @parser.parse_time(single_record)
single_time, single_record = @parser.convert_values(single_time, single_record)
else
- single_time = if t = single_record.delete(@parser_time_key)
- Fluent::EventTime.from_time(Time.at(t))
+ single_time = if t = @default_keep_time_key ? single_record[@parser_time_key] : single_record.delete(@parser_time_key)
+ if @default_time_parser
+ @default_time_parser.parse(t)
+ else
+ Fluent::EventTime.from_time(Time.at(t))
+ end
else
time
end
@@ -219,6 +260,17 @@ def on_request(path_info, params)
mes.add(single_time, single_record)
end
+ rescue => e
+ if @dump_error_log
+ log.error "failed to process batch request", error: e
+ end
+ return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"]
+ end
+ end
+
+ # TODO server error
+ begin
+ if mes
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb
index 3847f4a3c2..c16fde805c 100644
--- a/test/plugin/test_in_http.rb
+++ b/test/plugin/test_in_http.rb
@@ -116,6 +116,36 @@ def test_json
assert_equal_event_time time, d.events[1][1]
end
+ data('json' => ['json', :to_json],
+ 'msgpack' => ['msgpack', :to_msgpack])
+ def test_default_with_time_format(data)
+ param, method_name = data
+ d = create_driver(CONFIG + %[
+
+ keep_time_key
+ time_format %iso8601
+
+ ])
+
+ time = event_time("2020-06-10T01:14:27+00:00")
+ events = [
+ ["tag1", time, {"a" => 1, "time" => '2020-06-10T01:14:27+00:00'}],
+ ["tag2", time, {"a" => 2, "time" => '2020-06-10T01:14:27+00:00'}],
+ ]
+ res_codes = []
+
+ d.run(expect_records: 2) do
+ events.each do |tag, t, record|
+ res = post("/#{tag}", {param => record.__send__(method_name)})
+ res_codes << res.code
+ end
+ end
+ assert_equal ["200", "200"], res_codes
+ assert_equal events, d.events
+ assert_equal_event_time time, d.events[0][1]
+ assert_equal_event_time time, d.events[1][1]
+ end
+
def test_multi_json
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
@@ -163,6 +193,33 @@ def test_multi_json_with_time_field
assert_equal_event_time time, d.events[1][1]
end
+ data('json' => ['json', :to_json],
+ 'msgpack' => ['msgpack', :to_msgpack])
+ def test_default_multi_with_time_format(data)
+ param, method_name = data
+ d = create_driver(CONFIG + %[
+
+ keep_time_key
+ time_format %iso8601
+
+ ])
+ time = event_time("2020-06-10T01:14:27+00:00")
+ events = [
+ ["tag1", time, {'a' => 1, 'time' => "2020-06-10T01:14:27+00:00"}],
+ ["tag1", time, {'a' => 2, 'time' => "2020-06-10T01:14:27+00:00"}],
+ ]
+ tag = "tag1"
+ res_codes = []
+ d.run(expect_records: 2, timeout: 5) do
+ res = post("/#{tag}", {param => events.map { |e| e[2] }.__send__(method_name)})
+ res_codes << res.code
+ end
+ assert_equal ["200"], res_codes
+ assert_equal events, d.events
+ assert_equal_event_time time, d.events[0][1]
+ assert_equal_event_time time, d.events[1][1]
+ end
+
def test_multi_json_with_nonexistent_time_key
d = create_driver(CONFIG + %[