diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index cc5d785b4a..b05dd3ce96 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -32,6 +32,7 @@ class ParserFilter < Filter config_param :inject_key_prefix, :string, default: nil config_param :replace_invalid_sequence, :bool, default: false config_param :hash_value_field, :string, default: nil + config_param :emit_invalid_record_to_error, :bool, default: true attr_reader :parser @@ -49,7 +50,9 @@ def configure(conf) def filter_with_time(tag, time, record) raw_value = record[@key_name] if raw_value.nil? - router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) + end if @reserve_data return time, handle_parsed(tag, record, time, {}) else @@ -67,7 +70,9 @@ def filter_with_time(tag, time, record) r = handle_parsed(tag, record, t, values) return t, r else - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'")) + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'")) + end if @reserve_data t = time r = handle_parsed(tag, record, time, {}) @@ -78,8 +83,11 @@ def filter_with_time(tag, time, record) end end rescue Fluent::Plugin::Parser::ParserError => e - router.emit_error_event(tag, time, record, e) - return FAILED_RESULT + if @emit_invalid_record_to_error + raise e + else + return FAILED_RESULT + end rescue ArgumentError => e raise unless @replace_invalid_sequence raise unless e.message.index("invalid byte sequence in") == 0 @@ -87,8 +95,11 @@ def filter_with_time(tag, time, record) raw_value = raw_value.scrub(REPLACE_CHAR) retry rescue => e - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}")) - return FAILED_RESULT + if @emit_invalid_record_to_error + raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" + else + return FAILED_RESULT + end end end diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb index 0e54989478..df1824f129 100644 --- a/test/plugin/test_filter_parser.rb +++ b/test/plugin/test_filter_parser.rb @@ -662,4 +662,39 @@ def test_not_call_emit_error_event_when_pattern_is_mached end end end + + class EmitInvalidRecordToErrorTest < self + def test_pattern_is_mismached_with_emit_invalid_record_to_error + d = create_driver(CONFIG_UNMATCHED_PATTERN_LOG + "emit_invalid_record_to_error false") + flexmock(d.instance.router).should_receive(:emit_error_event).never + assert_nothing_raised { + d.run do + d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) + end + } + assert_equal 0, d.filtered.length + end + + def test_parser_error_with_emit_invalid_record_to_error + d = create_driver(CONFIG_INVALID_TIME_VALUE + "emit_invalid_record_to_error false") + flexmock(d.instance.router).should_receive(:emit_error_event).never + assert_nothing_raised { + d.run do + d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'}) + end + } + assert_equal 0, d.filtered.length + end + + def test_key_not_exist_with_emit_invalid_record_to_error + d = create_driver(CONFIG_NOT_IGNORE + "emit_invalid_record_to_error false") + flexmock(d.instance.router).should_receive(:emit_error_event).never + assert_nothing_raised { + d.run do + d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'}) + end + } + assert_equal 0, d.filtered.length + end + end end