Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_http: Improve time field processing #3046

Merged
merged 2 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 65 additions & 13 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,28 @@
module Fluent::Plugin
class InHttpParser < Parser
Fluent::Plugin.register_parser('in_http', self)

config_set_default :time_key, 'time'

def configure(conf)
super

# if no time parser related parameters, use in_http's time convert rule
@time_parser = if conf.has_key?('time_type') || conf.has_key?('time_format')
time_parser_create
else
nil
end
end

def parse(text)
# this plugin is dummy implementation not to raise error
yield nil, nil
end

def get_time_parser
@time_parser
end
end

class HttpInput < Input
Expand Down Expand Up @@ -74,17 +92,20 @@ def configure(conf)

super

@parser = nil
m = if @parser_configs.first['@type'] == 'in_http'
@parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
@parser_msgpack.time_key = nil
@parser_msgpack.estimate_current_event = false
@parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
@parser_json.time_key = nil
@parser_json.estimate_current_event = false

default_parser = parser_create(usage: '')
@format_name = 'default'
@parser_time_key = if parser_config = conf.elements('parse').first
parser_config['time_key'] || 'time'
else
'time'
end
@parser_time_key = default_parser.time_key
@default_time_parser = default_parser.get_time_parser
@default_keep_time_key = default_parser.keep_time_key
method(:parse_params_default)
else
@parser = parser_create
Expand Down Expand Up @@ -180,7 +201,23 @@ def on_request(path_info, params)
param_time = param_time.to_f
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
record_time.nil? ? Fluent::EventTime.now : record_time
if record_time.nil?
if !record.is_a?(Array)
if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key)
if @default_time_parser
@default_time_parser.parse(t)
else
Fluent::EventTime.from_time(Time.at(t))
end
else
Fluent::EventTime.now
end
else
Fluent::EventTime.now
end
else
record_time
end
end
rescue => e
if @dump_error_log
Expand All @@ -189,10 +226,10 @@ def on_request(path_info, params)
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"]
end

# TODO server error
begin
# Support batched requests
if record.is_a?(Array)
mes = nil
# Support batched requests
if record.is_a?(Array)
begin
mes = Fluent::MultiEventStream.new
record.each do |single_record|
if @add_http_headers
Expand All @@ -206,19 +243,34 @@ def on_request(path_info, params)
single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
end

if defined? @parser
if @parser
single_time = @parser.parse_time(single_record)
single_time, single_record = @parser.convert_values(single_time, single_record)
else
single_time = if t = single_record.delete(@parser_time_key)
Fluent::EventTime.from_time(Time.at(t))
single_time = if t = @default_keep_time_key ? single_record[@parser_time_key] : single_record.delete(@parser_time_key)
if @default_time_parser
@default_time_parser.parse(t)
else
Fluent::EventTime.from_time(Time.at(t))
end
else
time
end
end

mes.add(single_time, single_record)
end
rescue => e
if @dump_error_log
log.error "failed to process batch request", error: e
end
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{e}\n"]
end
end

# TODO server error
begin
if mes
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
Expand Down
57 changes: 57 additions & 0 deletions test/plugin/test_in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,36 @@ def test_json
assert_equal_event_time time, d.events[1][1]
end

data('json' => ['json', :to_json],
'msgpack' => ['msgpack', :to_msgpack])
def test_default_with_time_format(data)
param, method_name = data
d = create_driver(CONFIG + %[
<parse>
keep_time_key
time_format %iso8601
</parse>
])

time = event_time("2020-06-10T01:14:27+00:00")
events = [
["tag1", time, {"a" => 1, "time" => '2020-06-10T01:14:27+00:00'}],
["tag2", time, {"a" => 2, "time" => '2020-06-10T01:14:27+00:00'}],
]
res_codes = []

d.run(expect_records: 2) do
events.each do |tag, t, record|
res = post("/#{tag}", {param => record.__send__(method_name)})
res_codes << res.code
end
end
assert_equal ["200", "200"], res_codes
assert_equal events, d.events
assert_equal_event_time time, d.events[0][1]
assert_equal_event_time time, d.events[1][1]
end

def test_multi_json
d = create_driver
time = event_time("2011-01-02 13:14:15 UTC")
Expand Down Expand Up @@ -163,6 +193,33 @@ def test_multi_json_with_time_field
assert_equal_event_time time, d.events[1][1]
end

data('json' => ['json', :to_json],
'msgpack' => ['msgpack', :to_msgpack])
def test_default_multi_with_time_format(data)
param, method_name = data
d = create_driver(CONFIG + %[
<parse>
keep_time_key
time_format %iso8601
</parse>
])
time = event_time("2020-06-10T01:14:27+00:00")
events = [
["tag1", time, {'a' => 1, 'time' => "2020-06-10T01:14:27+00:00"}],
["tag1", time, {'a' => 2, 'time' => "2020-06-10T01:14:27+00:00"}],
]
tag = "tag1"
res_codes = []
d.run(expect_records: 2, timeout: 5) do
res = post("/#{tag}", {param => events.map { |e| e[2] }.__send__(method_name)})
res_codes << res.code
end
assert_equal ["200"], res_codes
assert_equal events, d.events
assert_equal_event_time time, d.events[0][1]
assert_equal_event_time time, d.events[1][1]
end

def test_multi_json_with_nonexistent_time_key
d = create_driver(CONFIG + %[
<parse>
Expand Down