Skip to content

Commit

Permalink
Merge pull request #4474 from fluent/make-sure-parser-returns-hash
Browse files Browse the repository at this point in the history
Make sure parser returns hash
  • Loading branch information
ashie authored Apr 30, 2024
2 parents a226e28 + 6cace97 commit 35e2210
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 60 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def filter_with_time(tag, time, record)
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
return t, r
else
if @emit_invalid_record_to_error
Expand Down
67 changes: 15 additions & 52 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,54 +203,24 @@ def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
tag = path.split('/').join('.')
record_time, record = parse_params(params)

# Skip nil record
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
if @respond_with_empty_img
return RESPONSE_IMG
else
if @use_204_response
return RESPONSE_204
else
return RESPONSE_200
end
mes = Fluent::MultiEventStream.new
parse_params(params) do |record_time, record|
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
next
end
end

mes = nil
# Support batched requests
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
add_params_to_record(single_record, params)

if param_time = params['time']
param_time = param_time.to_f
single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
elsif @custom_parser
single_time = @custom_parser.parse_time(single_record)
single_time, single_record = @custom_parser.convert_values(single_time, single_record)
else
single_time = convert_time_field(single_record)
end

mes.add(single_time, single_record)
end
else
add_params_to_record(record, params)

time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
if record_time.nil?
convert_time_field(record)
else
record_time
end
record_time.nil? ? convert_time_field(record) : record_time
end

mes.add(time, record)
end
rescue => e
if @dump_error_log
Expand All @@ -261,11 +231,7 @@ def on_request(path_info, params)

# TODO server error
begin
if mes
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
end
router.emit_stream(tag, mes) unless mes.empty?
rescue => e
if @dump_error_log
log.error "failed to emit data", error: e
Expand Down Expand Up @@ -308,31 +274,28 @@ def on_server_connect(conn)
def parse_params_default(params)
if msgpack = params['msgpack']
@parser_msgpack.parse(msgpack) do |_time, record|
return nil, record
yield nil, record
end
elsif js = params['json']
@parser_json.parse(js) do |_time, record|
return nil, record
yield nil, record
end
elsif ndjson = params['ndjson']
events = []
ndjson.split(/\r?\n/).each do |js|
@parser_json.parse(js) do |_time, record|
events.push(record)
yield nil, record
end
end
return nil, events
else
raise "'json', 'ndjson' or 'msgpack' parameter is required"
end
end

def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@custom_parser.parse(content) { |time, record|
raise "Received event is not #{@format_name}: #{content}" if record.nil?
return time, record
}
@custom_parser.parse(content) do |time, record|
yield time, record
end
else
raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
end
Expand Down
27 changes: 22 additions & 5 deletions lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,33 @@ def configure_json_parser(name)
end

def parse(text)
record = @load_proc.call(text)
time = parse_time(record)
if @execute_convert_values
time, record = convert_values(time, record)
parsed_json = @load_proc.call(text)

if parsed_json.is_a?(Hash)
time, record = parse_one_record(parsed_json)
yield time, record
elsif parsed_json.is_a?(Array)
parsed_json.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, parsed_record = parse_one_record(record)
yield time, parsed_record
end
else
yield nil, nil
end
yield time, record

rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
yield nil, nil
end

def parse_one_record(record)
time = parse_time(record)
convert_values(time, record)
end

def parser_type
:text
end
Expand Down
27 changes: 24 additions & 3 deletions lib/fluent/plugin/parser_msgpack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,39 @@ def parser_type
:binary
end

def parse(data)
def parse(data, &block)
@unpacker.feed_each(data) do |obj|
yield convert_values(parse_time(obj), obj)
parse_unpacked_data(obj, &block)
end
end
alias parse_partial_data parse

def parse_io(io, &block)
u = Fluent::MessagePackFactory.engine_factory.unpacker(io)
u.each do |obj|
time, record = convert_values(parse_time(obj), obj)
parse_unpacked_data(obj, &block)
end
end

def parse_unpacked_data(data)
if data.is_a?(Hash)
time, record = convert_values(parse_time(data), data)
yield time, record
return
end

unless data.is_a?(Array)
yield nil, nil
return
end

data.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, converted_record = convert_values(parse_time(record), record)
yield time, converted_record
end
end
end
Expand Down
106 changes: 106 additions & 0 deletions test/plugin/test_parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,110 @@ def test_yajl_parse_io_with_buffer_smaller_than_input
end
end
end

sub_test_case "various record pattern" do
data("Only string", { record: '"message"', expected: [nil] }, keep: true)
data("Only string without quotation", { record: "message", expected: [nil] }, keep: true)
data("Only number", { record: "0", expected: [nil] }, keep: true)
data(
"Array of Hash",
{
record: '[{"k1": 1}, {"k2": 2}]',
expected: [{"k1" => 1}, {"k2" => 2}]
},
keep: true,
)
data(
"Array of both Hash and invalid",
{
record: '[{"k1": 1}, "string", {"k2": 2}, 0]',
expected: [{"k1" => 1}, nil, {"k2" => 2}, nil]
},
keep: true,
)
data(
"Array of all invalid",
{
record: '["string", 0, [{"k": 0}]]',
expected: [nil, nil, nil]
},
keep: true,
)

def test_oj(data)
parsed_records = []
@parser.configure("json_parser" => "oj")
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_yajl(data)
parsed_records = []
@parser.configure("json_parser" => "yajl")
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_json(json)
parsed_records = []
@parser.configure("json_parser" => "json")
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end
end

# This becomes NoMethodError if a non-Hash object is passed to convert_values.
# https://github.com/fluent/fluentd/issues/4100
sub_test_case "execute_convert_values with null_empty_string" do
data("Only string", { record: '"message"', expected: [nil] }, keep: true)
data(
"Hash",
{
record: '{"k1": 1, "k2": ""}',
expected: [{"k1" => 1, "k2" => nil}]
},
keep: true,
)
data(
"Array of Hash",
{
record: '[{"k1": 1}, {"k2": ""}]',
expected: [{"k1" => 1}, {"k2" => nil}]
},
keep: true,
)

def test_oj(data)
parsed_records = []
@parser.configure("json_parser" => "oj", "null_empty_string" => true)
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_yajl(data)
parsed_records = []
@parser.configure("json_parser" => "yajl", "null_empty_string" => true)
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_json(json)
parsed_records = []
@parser.configure("json_parser" => "json", "null_empty_string" => true)
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end
end
end
Loading

0 comments on commit 35e2210

Please sign in to comment.