From ab9130d528b8618ec890e71a97adf866a8232fdf Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Thu, 29 Aug 2024 11:54:01 +0530 Subject: [PATCH] Transition filter_parser from single record to stream Signed-off-by: Athish Pranav D --- lib/fluent/plugin/filter_parser.rb | 78 ++++++++++-------------------- 1 file changed, 26 insertions(+), 52 deletions(-) diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index 6998a38286..b65c17f5d4 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -54,29 +54,32 @@ def configure(conf) @parser = parser_create end - FAILED_RESULT = [nil, nil].freeze # reduce allocation cost REPLACE_CHAR = '?'.freeze - def filter_with_time(tag, time, record) - raw_value = @accessor.call(record) - if raw_value.nil? - if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) - end - if @reserve_data - return time, handle_parsed(tag, record, time, {}) - else - return FAILED_RESULT + def filter_stream(tag, es) + new_es = Fluent::MultiEventStream.new + es.each do |time, record| + begin + raw_value = @accessor.call(record) + if raw_value.nil? + new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data + raise ArgumentError, "#{@key_name} does not exist" + else + filter_one_record(tag, time, record, raw_value) do |result_time, result_record| + new_es.add(result_time, result_record) + end + end + rescue => e + router.emit_error_event(tag, time, record, e) if @emit_invalid_record_to_error end end - begin - # Note: https://github.com/fluent/fluentd/issues/4100 - # If the parser returns multiple records from one raw_value, - # this returns only the first one record. - # This should be fixed in the future version. - result_time = nil - result_record = nil + new_es + end + + private + def filter_one_record(tag, time, record, raw_value) + begin @parser.parse(raw_value) do |t, values| if values t = if @reserve_time @@ -85,38 +88,15 @@ def filter_with_time(tag, time, record) t.nil? ? time : t end @accessor.delete(record) if @remove_key_name_field - r = handle_parsed(tag, record, t, values) - - if result_record.nil? - result_time = t - result_record = r - else - if @emit_invalid_record_to_error - router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new( - "Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'" - )) - end - end else - if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) - end - + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) if @emit_invalid_record_to_error next unless @reserve_data - next unless result_record.nil? - - result_time = time - result_record = handle_parsed(tag, record, time, {}) + t = time + values = {} end + yield(t, handle_parsed(tag, record, t, values)) end - return result_time, result_record - rescue Fluent::Plugin::Parser::ParserError => e - if @emit_invalid_record_to_error - raise e - else - return FAILED_RESULT - end rescue ArgumentError => e raise unless @replace_invalid_sequence raise unless e.message.index("invalid byte sequence in") == 0 @@ -124,16 +104,10 @@ def filter_with_time(tag, time, record) raw_value = raw_value.scrub(REPLACE_CHAR) retry rescue => e - if @emit_invalid_record_to_error - raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" - else - return FAILED_RESULT - end + raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" end end - private - def handle_parsed(tag, record, t, values) if values && @inject_key_prefix values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]