Skip to content

Commit

Permalink
Merge pull request #1286 from fluent/bye-valueparser-and-typeconverter
Browse files Browse the repository at this point in the history
Merge ValuesParser and TypeConverter into Parser
  • Loading branch information
tagomoris authored Oct 25, 2016
2 parents df5ce96 + c0b5e6e commit 2009cd4
Show file tree
Hide file tree
Showing 20 changed files with 738 additions and 234 deletions.
150 changes: 139 additions & 11 deletions lib/fluent/compat/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
require 'fluent/plugin/parser'
require 'fluent/mixin'

require 'fluent/config'
require 'fluent/compat/type_converter'

require 'fluent/plugin/parser_regexp'
require 'fluent/plugin/parser_json'
require 'fluent/plugin/parser_tsv'
Expand Down Expand Up @@ -56,13 +59,14 @@ def configure(conf, required=true)
format = conf['format']

@parser = TextParser.lookup(format)
if ! @estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=')
@parser.estimate_current_event = @estimate_current_event
end

if @parser.respond_to?(:configure)
@parser.configure(conf)
end
if !@estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=')
# external code sets parser.estimate_current_event = false
@parser.estimate_current_event = @estimate_current_event
end

return true
end
Expand Down Expand Up @@ -116,48 +120,172 @@ def self.lookup(format)
end
end

class TimeParser < Fluent::Plugin::Parser::TimeParser
module TypeConverterCompatParameters
def convert_type_converter_parameters!(conf)
if conf["types"]
delimiter = conf["types_delimiter"] || ','
label_delimiter = conf["types_label_delimiter"] || ':'
types = {}
conf['types'].split(delimiter).each do |pair|
key, value = pair.split(label_delimiter, 2)
if value.start_with?("time#{label_delimiter}")
value = value.split(label_delimiter, 2).join(':')
elsif value.start_with?("array#{label_delimiter}")
value = value.split(label_delimiter, 2).join(':')
end
types[key] = value
end
conf["types"] = JSON.dump(types)
end
end
end

class TimeParser < Fluent::TimeParser
# TODO: warn when deprecated
end

class RegexpParser < Fluent::Plugin::RegexpParser
include TypeConverterCompatParameters

# TODO: warn when deprecated
def initialize(regexp, conf = {})
super()

@stored_regexp = regexp
@manually_configured = false
unless conf.empty?
unless conf.is_a?(Config::Element)
conf = Config::Element.new('default_regexp_conf', '', conf, [])
end
configure(conf)
conf_init = if conf.is_a?(Fluent::Config::Element)
conf
else
Fluent::Config::Element.new('parse', '', conf, [])
end
self.configure(conf_init)
@manually_configured = true
end
end

def configure(conf)
return if @manually_configured # not to run twice

conf['expression'] ||= @stored_regexp.source
convert_type_converter_parameters!(conf)

@regexp = regexp
super
end

def patterns
{'format' => @regexp, 'time_format' => @time_format}
end
end

class ValuesParser < Fluent::Plugin::ValuesParser
# TODO: warn when deprecated
class ValuesParser < Parser
include Fluent::Compat::TypeConverter

config_param :keys, :array, default: []
config_param :time_key, :string, default: nil
config_param :null_value_pattern, :string, default: nil
config_param :null_empty_string, :bool, default: false

def configure(conf)
super

if @time_key && !@keys.include?(@time_key) && @estimate_current_event
raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})"
end

if @time_format && !@time_key
raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}"
end

@time_parser = time_parser_create

if @null_value_pattern
@null_value_pattern = Regexp.new(@null_value_pattern)
end

@mutex = Mutex.new
end

def values_map(values)
record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })]

if @time_key
value = @keep_time_key ? record[@time_key] : record.delete(@time_key)
time = if value.nil?
if @estimate_current_event
Fluent::EventTime.now
else
nil
end
else
@mutex.synchronize { @time_parser.parse(value) }
end
elsif @estimate_current_event
time = Fluent::EventTime.now
else
time = nil
end

convert_field_type!(record) if @type_converters

return time, record
end

private

def convert_field_type!(record)
@type_converters.each_key { |key|
if value = record[key]
record[key] = convert_type(key, value)
end
}
end

def convert_value_to_nil(value)
if value and @null_empty_string
value = (value == '') ? nil : value
end
if value and @null_value_pattern
value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value
end
value
end
end

class JSONParser < Fluent::Plugin::JSONParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class TSVParser < Fluent::Plugin::TSVParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class LabeledTSVParser < Fluent::Plugin::LabeledTSVParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class CSVParser < Fluent::Plugin::CSVParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class NoneParser < Fluent::Plugin::NoneParser
Expand Down
8 changes: 8 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def configure(conf)
self
end

def string_safe_encoding(str)
unless str.valid_encoding?
log.info "invalid byte sequence is replaced in `#{str}`" if self.respond_to?(:log)
str = str.scrub('?')
end
yield str
end

def context_router=(router)
@_context_router = router
end
Expand Down
Loading

0 comments on commit 2009cd4

Please sign in to comment.