Skip to content

Commit

Permalink
Merge pull request #1494 from fluent/add-suppress-parse-error
Browse files Browse the repository at this point in the history
filter_parser: Add emit_invalid_record_to_error parameter
  • Loading branch information
repeatedly authored Mar 15, 2017
2 parents 092ea33 + ac63f22 commit 87f9588
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
23 changes: 17 additions & 6 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ParserFilter < Filter
config_param :inject_key_prefix, :string, default: nil
config_param :replace_invalid_sequence, :bool, default: false
config_param :hash_value_field, :string, default: nil
config_param :emit_invalid_record_to_error, :bool, default: true

attr_reader :parser

Expand All @@ -49,7 +50,9 @@ def configure(conf)
def filter_with_time(tag, time, record)
raw_value = record[@key_name]
if raw_value.nil?
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
Expand All @@ -67,7 +70,9 @@ def filter_with_time(tag, time, record)
r = handle_parsed(tag, record, t, values)
return t, r
else
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'"))
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'"))
end
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
Expand All @@ -78,17 +83,23 @@ def filter_with_time(tag, time, record)
end
end
rescue Fluent::Plugin::Parser::ParserError => e
router.emit_error_event(tag, time, record, e)
return FAILED_RESULT
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}"))
return FAILED_RESULT
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
else
return FAILED_RESULT
end
end
end

Expand Down
35 changes: 35 additions & 0 deletions test/plugin/test_filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -662,4 +662,39 @@ def test_not_call_emit_error_event_when_pattern_is_mached
end
end
end

class EmitInvalidRecordToErrorTest < self
def test_pattern_is_mismached_with_emit_invalid_record_to_error
d = create_driver(CONFIG_UNMATCHED_PATTERN_LOG + "emit_invalid_record_to_error false")
flexmock(d.instance.router).should_receive(:emit_error_event).never
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
end
}
assert_equal 0, d.filtered.length
end

def test_parser_error_with_emit_invalid_record_to_error
d = create_driver(CONFIG_INVALID_TIME_VALUE + "emit_invalid_record_to_error false")
flexmock(d.instance.router).should_receive(:emit_error_event).never
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
end
}
assert_equal 0, d.filtered.length
end

def test_key_not_exist_with_emit_invalid_record_to_error
d = create_driver(CONFIG_NOT_IGNORE + "emit_invalid_record_to_error false")
flexmock(d.instance.router).should_receive(:emit_error_event).never
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'})
end
}
assert_equal 0, d.filtered.length
end
end
end

0 comments on commit 87f9588

Please sign in to comment.