From 0790ae6a6561d5f94c19772e3aae6fe7edabb1e4 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 9 Mar 2017 06:21:32 +0900 Subject: [PATCH 1/2] filter_parser: Add emit_invalid_record_to_error parameter --- lib/fluent/plugin/filter_parser.rb | 17 +++++++++++---- test/plugin/test_filter_parser.rb | 35 ++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index cc5d785b4a..79413f84b3 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -32,6 +32,7 @@ class ParserFilter < Filter config_param :inject_key_prefix, :string, default: nil config_param :replace_invalid_sequence, :bool, default: false config_param :hash_value_field, :string, default: nil + config_param :emit_invalid_record_to_error, :bool, default: true attr_reader :parser @@ -49,7 +50,9 @@ def configure(conf) def filter_with_time(tag, time, record) raw_value = record[@key_name] if raw_value.nil? - router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) + end if @reserve_data return time, handle_parsed(tag, record, time, {}) else @@ -67,7 +70,9 @@ def filter_with_time(tag, time, record) r = handle_parsed(tag, record, t, values) return t, r else - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'")) + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'")) + end if @reserve_data t = time r = handle_parsed(tag, record, time, {}) @@ -78,7 +83,9 @@ def filter_with_time(tag, time, record) end end rescue Fluent::Plugin::Parser::ParserError => e - router.emit_error_event(tag, time, record, e) + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, e) + end return FAILED_RESULT rescue ArgumentError => e raise unless @replace_invalid_sequence @@ -87,7 +94,9 @@ def filter_with_time(tag, time, record) raw_value = raw_value.scrub(REPLACE_CHAR) retry rescue => e - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}")) + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}")) + end return FAILED_RESULT end end diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb index 0e54989478..df1824f129 100644 --- a/test/plugin/test_filter_parser.rb +++ b/test/plugin/test_filter_parser.rb @@ -662,4 +662,39 @@ def test_not_call_emit_error_event_when_pattern_is_mached end end end + + class EmitInvalidRecordToErrorTest < self + def test_pattern_is_mismached_with_emit_invalid_record_to_error + d = create_driver(CONFIG_UNMATCHED_PATTERN_LOG + "emit_invalid_record_to_error false") + flexmock(d.instance.router).should_receive(:emit_error_event).never + assert_nothing_raised { + d.run do + d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) + end + } + assert_equal 0, d.filtered.length + end + + def test_parser_error_with_emit_invalid_record_to_error + d = create_driver(CONFIG_INVALID_TIME_VALUE + "emit_invalid_record_to_error false") + flexmock(d.instance.router).should_receive(:emit_error_event).never + assert_nothing_raised { + d.run do + d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'}) + end + } + assert_equal 0, d.filtered.length + end + + def test_key_not_exist_with_emit_invalid_record_to_error + d = create_driver(CONFIG_NOT_IGNORE + "emit_invalid_record_to_error false") + flexmock(d.instance.router).should_receive(:emit_error_event).never + assert_nothing_raised { + d.run do + d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'}) + end + } + assert_equal 0, d.filtered.length + end + end end From ac63f223039114a8b17b4f1dfe87f9c90b7ec652 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 15 Mar 2017 16:44:32 +0900 Subject: [PATCH 2/2] Use raise instead of explicit emit_error_event for non reserve_data case --- lib/fluent/plugin/filter_parser.rb | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index 79413f84b3..b05dd3ce96 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -84,9 +84,10 @@ def filter_with_time(tag, time, record) end rescue Fluent::Plugin::Parser::ParserError => e if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, e) + raise e + else + return FAILED_RESULT end - return FAILED_RESULT rescue ArgumentError => e raise unless @replace_invalid_sequence raise unless e.message.index("invalid byte sequence in") == 0 @@ -94,10 +95,11 @@ def filter_with_time(tag, time, record) raw_value = raw_value.scrub(REPLACE_CHAR) retry rescue => e - if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}")) + if @emit_invalid_record_to_error + raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" + else + return FAILED_RESULT end - return FAILED_RESULT end end