Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge ValuesParser and TypeConverter into Parser #1286

Merged
merged 15 commits into from
Oct 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 139 additions & 11 deletions lib/fluent/compat/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
require 'fluent/plugin/parser'
require 'fluent/mixin'

require 'fluent/config'
require 'fluent/compat/type_converter'

require 'fluent/plugin/parser_regexp'
require 'fluent/plugin/parser_json'
require 'fluent/plugin/parser_tsv'
Expand Down Expand Up @@ -56,13 +59,14 @@ def configure(conf, required=true)
format = conf['format']

@parser = TextParser.lookup(format)
if ! @estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=')
@parser.estimate_current_event = @estimate_current_event
end

if @parser.respond_to?(:configure)
@parser.configure(conf)
end
if !@estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=')
# external code sets parser.estimate_current_event = false
@parser.estimate_current_event = @estimate_current_event
end

return true
end
Expand Down Expand Up @@ -116,48 +120,172 @@ def self.lookup(format)
end
end

class TimeParser < Fluent::Plugin::Parser::TimeParser
module TypeConverterCompatParameters
def convert_type_converter_parameters!(conf)
if conf["types"]
delimiter = conf["types_delimiter"] || ','
label_delimiter = conf["types_label_delimiter"] || ':'
types = {}
conf['types'].split(delimiter).each do |pair|
key, value = pair.split(label_delimiter, 2)
if value.start_with?("time#{label_delimiter}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array should be also care?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be. I'll add a fix.

value = value.split(label_delimiter, 2).join(':')
elsif value.start_with?("array#{label_delimiter}")
value = value.split(label_delimiter, 2).join(':')
end
types[key] = value
end
conf["types"] = JSON.dump(types)
end
end
end

class TimeParser < Fluent::TimeParser
# TODO: warn when deprecated
end

class RegexpParser < Fluent::Plugin::RegexpParser
include TypeConverterCompatParameters

# TODO: warn when deprecated
def initialize(regexp, conf = {})
super()

@stored_regexp = regexp
@manually_configured = false
unless conf.empty?
unless conf.is_a?(Config::Element)
conf = Config::Element.new('default_regexp_conf', '', conf, [])
end
configure(conf)
conf_init = if conf.is_a?(Fluent::Config::Element)
conf
else
Fluent::Config::Element.new('parse', '', conf, [])
end
self.configure(conf_init)
@manually_configured = true
end
end

def configure(conf)
return if @manually_configured # not to run twice

conf['expression'] ||= @stored_regexp.source
convert_type_converter_parameters!(conf)

@regexp = regexp
super
end

def patterns
{'format' => @regexp, 'time_format' => @time_format}
end
end

class ValuesParser < Fluent::Plugin::ValuesParser
# TODO: warn when deprecated
class ValuesParser < Parser
include Fluent::Compat::TypeConverter

config_param :keys, :array, default: []
config_param :time_key, :string, default: nil
config_param :null_value_pattern, :string, default: nil
config_param :null_empty_string, :bool, default: false

def configure(conf)
super

if @time_key && [email protected]?(@time_key) && @estimate_current_event
raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})"
end

if @time_format && !@time_key
raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}"
end

@time_parser = time_parser_create

if @null_value_pattern
@null_value_pattern = Regexp.new(@null_value_pattern)
end

@mutex = Mutex.new
end

def values_map(values)
record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })]

if @time_key
value = @keep_time_key ? record[@time_key] : record.delete(@time_key)
time = if value.nil?
if @estimate_current_event
Fluent::EventTime.now
else
nil
end
else
@mutex.synchronize { @time_parser.parse(value) }
end
elsif @estimate_current_event
time = Fluent::EventTime.now
else
time = nil
end

convert_field_type!(record) if @type_converters

return time, record
end

private

def convert_field_type!(record)
@type_converters.each_key { |key|
if value = record[key]
record[key] = convert_type(key, value)
end
}
end

def convert_value_to_nil(value)
if value and @null_empty_string
value = (value == '') ? nil : value
end
if value and @null_value_pattern
value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value
end
value
end
end

class JSONParser < Fluent::Plugin::JSONParser
include TypeConverterCompatParameters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this include needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is a subclass of Fluent::Plugin::JSONParser, and it doesn't include it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSONParser doesn't use TypeConverter feature.
Do you have a plan to update JSONParser?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed to update code of JSONParser. I pushed some commits including one to do it.

# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class TSVParser < Fluent::Plugin::TSVParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class LabeledTSVParser < Fluent::Plugin::LabeledTSVParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class CSVParser < Fluent::Plugin::CSVParser
include TypeConverterCompatParameters
# TODO: warn when deprecated
def configure(conf)
convert_type_converter_parameters!(conf)
super
end
end

class NoneParser < Fluent::Plugin::NoneParser
Expand Down
8 changes: 8 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def configure(conf)
self
end

def string_safe_encoding(str)
unless str.valid_encoding?
log.info "invalid byte sequence is replaced in `#{str}`" if self.respond_to?(:log)
str = str.scrub('?')
end
yield str
end

def context_router=(router)
@_context_router = router
end
Expand Down
Loading