diff --git a/lib/fluent/plugin/parser.rb b/lib/fluent/plugin/parser.rb index 66109fa215..9faa7d3645 100644 --- a/lib/fluent/plugin/parser.rb +++ b/lib/fluent/plugin/parser.rb @@ -17,13 +17,60 @@ require 'fluent/plugin/base' require 'fluent/plugin/owned_by_mixin' +require 'fluent/error' require 'fluent/mixin' # for TypeConverter require 'fluent/time' require 'fluent/plugin/string_util' +require 'serverengine/blocking_flag' + module Fluent module Plugin class Parser < Base + class TimeoutChecker + # This implementation now uses mutex because parser is typically used in input. + # If this has a performance issue under high concurreny, use concurrent-ruby's map instead. + def initialize(timeout) + @map = {} + @flag = ServerEngine::BlockingFlag.new + @mutex = Mutex.new + @timeout = timeout + end + + def start + @thread = ::Thread.new { + until @flag.wait_for_set(0.5) + @mutex.synchronize { + now = Time.now + @map.keys.each { |th| + time = @map[th] + if now - time > @timeout + th.raise UnrecoverableError, "parsing timed out" + @map.delete(th) + end + } + } + end + } + end + + def stop + @flag.set! + @thread.join + end + + def execute + th = Thread.current + @mutex.synchronize { + @map[th] = Time.now + } + yield + ensure + # Need clean up here becuase if next event is delayed, incorrect exception will be raised in normal exceution flow. + @map.delete(th) + end + end + include OwnedByMixin include TimeMixin::Parser @@ -47,6 +94,7 @@ class ParserError < StandardError; end config_param :null_empty_string, :bool, default: false config_param :estimate_current_event, :bool, default: true config_param :keep_time_key, :bool, default: false + config_param :timeout, :time, default: nil AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array'] @@ -65,12 +113,41 @@ def configure(conf) @null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern) @type_converters = build_type_converters(@types) @execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string + if @timeout + class << self + alias_method :parse_orig, :parse + alias_method :parse, :parse_with_timeout + end + @timeout_checker = TimeoutChecker.new(@timeout) + end + end + + def start + super + + @timeout_checker.start if @timeout_checker + end + + def stop + super + + @timeout_checker.stop if @timeout_checker end def parse(text, &block) raise NotImplementedError, "Implement this method in child class" end + def parse_with_timeout(text, &block) + @timeout_checker.execute { + parse_orig(text, &block) + } + rescue UnrecoverableError + log.error "parsing timed out with #{self.class}: text = #{text}" + # Return nil instead of raising error. in_tail or other plugin can emit broken line. + yield nil, nil + end + def call(*a, &b) # Keep backward compatibility for existing plugins # TODO: warn when deprecated