From e8a4d6bed52c221e5bee25ec81d0d17ca804ff80 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:24:33 +0900 Subject: [PATCH 01/15] rename test --- test/plugin/{test_parser_base.rb => test_parser.rb} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/plugin/{test_parser_base.rb => test_parser.rb} (100%) diff --git a/test/plugin/test_parser_base.rb b/test/plugin/test_parser.rb similarity index 100% rename from test/plugin/test_parser_base.rb rename to test/plugin/test_parser.rb From b9bd8da58665479bd88c446e750aa9d5f6cf63e4 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:41:44 +0900 Subject: [PATCH 02/15] add #string_safe_encoding method to guard block from invalid char sequence errors --- lib/fluent/plugin/base.rb | 8 ++++++++ test/plugin/test_base.rb | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index bebd5f095e..4d3d87e6f1 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -46,6 +46,14 @@ def configure(conf) self end + def string_safe_encoding(str) + unless str.valid_encoding? + log.info "invalid byte sequence is replaced in `#{str}`" if self.respond_to?(:log) + str = str.scrub('?') + end + yield str + end + def context_router=(router) @_context_router = router end diff --git a/test/plugin/test_base.rb b/test/plugin/test_base.rb index 78d117a47f..b6c766baac 100644 --- a/test/plugin/test_base.rb +++ b/test/plugin/test_base.rb @@ -72,4 +72,20 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase assert_equal 'myvalue1', p2.myparam1 assert_equal 99, p2.mysection.myparam2 end + + test 'provides #string_safe_encoding to scrub invalid sequence string with info logging' do + logger = Fluent::Test::TestLogger.new + m = Module.new do + define_method(:log) do + logger + end + end + @p.extend m + assert_equal [], logger.logs + + ret = @p.string_safe_encoding("abc\xff.\x01f"){|s| s.split(/\./) } + assert_equal ['abc?', "\u0001f"], ret + assert_equal 1, logger.logs.size + assert{ logger.logs.first.include?("invalid byte sequence is replaced in ") } + end end From 366b707ceed6377549461a8c1127e8b1640a04df Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:42:54 +0900 Subject: [PATCH 03/15] fix not to raise errors for unexpected values --- lib/fluent/time.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb index b18a1fa311..716bcfcc27 100644 --- a/lib/fluent/time.rb +++ b/lib/fluent/time.rb @@ -197,7 +197,7 @@ def initialize(format = nil, localtime = true, timezone = nil) else Time.now.localtime.utc_offset # utc end - strptime = format && (Strptime.new(time_format) rescue nil) + strptime = format && (Strptime.new(format) rescue nil) @parse = case when format_with_timezone && strptime then ->(v){ Fluent::EventTime.from_time(strptime.exec(v)) } @@ -291,7 +291,7 @@ def parse_float(value) begin sec_s, nsec_s, _ = value.split('.', 3) # throw away second-dot and later - nsec_s = nsec_s[0..9] + nsec_s = nsec_s && nsec_s[0..9] || '0' nsec_s += '0' * (9 - nsec_s.size) if nsec_s.size < 9 time = Fluent::EventTime.new(sec_s.to_i, nsec_s.to_i) rescue => e From d220fa1dd959263f53811ca910b6be0137fd282a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:43:27 +0900 Subject: [PATCH 04/15] add TIME_TYPES to show valid list to other classes --- lib/fluent/time.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb index 716bcfcc27..0c5832fb4b 100644 --- a/lib/fluent/time.rb +++ b/lib/fluent/time.rb @@ -107,6 +107,8 @@ def method_missing(name, *args, &block) end module TimeMixin + TIME_TYPES = ['string', 'unixtime', 'float'] + TIME_PARAMETERS = [ [:time_format, :string, {default: nil}], [:localtime, :bool, {default: true}], # UTC if :localtime is false and :timezone is nil @@ -115,7 +117,7 @@ module TimeMixin ] TIME_FULL_PARAMETERS = [ # To avoid to define :time_type twice (in plugin_helper/inject) - [:time_type, :enum, {default: :string, list: [:string, :unixtime, :float]}], + [:time_type, :enum, {default: :string, list: TIME_TYPES.map(&:to_sym)}], ] + TIME_PARAMETERS module TimeParameters From 23e3b021ccd6c547e72c7f011651865f9888ae1c Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:47:26 +0900 Subject: [PATCH 05/15] Merge TypeConverter into Fluent::Plugin::Parser and simplify code and inter-class dependency And moved ValuesParser to Compat module completely. The reason of this changes are: * TypeConverter was old-fashioned module with configuration params ("types_delimiter", "types_label_delimiter") and cannot log anything via PluginLogger * Many Parser plugins have choosen ValuesParser as superclass, but doesn't use "Values" features (with `@keys`), but uses just TypeConverter features * "tag_key" feature is now supported by "extract" plugin helper, so parser should not support it --- lib/fluent/compat/parser.rb | 148 ++++++++++++++-- lib/fluent/plugin/parser.rb | 167 +++++++++++------- lib/fluent/plugin_helper/compat_parameters.rb | 17 ++ 3 files changed, 257 insertions(+), 75 deletions(-) diff --git a/lib/fluent/compat/parser.rb b/lib/fluent/compat/parser.rb index 836ee1eeb9..8b6556117b 100644 --- a/lib/fluent/compat/parser.rb +++ b/lib/fluent/compat/parser.rb @@ -18,6 +18,9 @@ require 'fluent/plugin/parser' require 'fluent/mixin' +require 'fluent/config' +require 'fluent/compat/type_converter' + require 'fluent/plugin/parser_regexp' require 'fluent/plugin/parser_json' require 'fluent/plugin/parser_tsv' @@ -56,13 +59,14 @@ def configure(conf, required=true) format = conf['format'] @parser = TextParser.lookup(format) - if ! @estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=') - @parser.estimate_current_event = @estimate_current_event - end if @parser.respond_to?(:configure) @parser.configure(conf) end + if !@estimate_current_event.nil? && @parser.respond_to?(:'estimate_current_event=') + # external code sets parser.estimate_current_event = false + @parser.estimate_current_event = @estimate_current_event + end return true end @@ -116,23 +120,55 @@ def self.lookup(format) end end - class TimeParser < Fluent::Plugin::Parser::TimeParser + module TypeConverterCompatParameters + def convert_type_converter_parameters!(conf) + if conf["types"] + delimiter = conf["types_delimiter"] || ',' + label_delimiter = conf["types_label_delimiter"] || ':' + types = {} + conf['types'].split(delimiter).each do |pair| + key, value = pair.split(label_delimiter, 2) + if value.start_with?("time#{label_delimiter}") + value = value.split(label_delimiter, 2).join(':') + end + types[key] = value + end + conf["types"] = JSON.dump(types) + end + end + end + + class TimeParser < Fluent::TimeParser # TODO: warn when deprecated end class RegexpParser < Fluent::Plugin::RegexpParser + include TypeConverterCompatParameters + # TODO: warn when deprecated def initialize(regexp, conf = {}) super() + @stored_regexp = regexp + @manually_configured = false unless conf.empty? - unless conf.is_a?(Config::Element) - conf = Config::Element.new('default_regexp_conf', '', conf, []) - end - configure(conf) + conf_init = if conf.is_a?(Fluent::Config::Element) + conf + else + Fluent::Config::Element.new('parse', '', conf, []) + end + self.configure(conf_init) + @manually_configured = true end + end + + def configure(conf) + return if @manually_configured # not to run twice + + conf['expression'] ||= @stored_regexp.source + convert_type_converter_parameters!(conf) - @regexp = regexp + super end def patterns @@ -140,24 +176,114 @@ def patterns end end - class ValuesParser < Fluent::Plugin::ValuesParser - # TODO: warn when deprecated + class ValuesParser < Parser + include Fluent::Compat::TypeConverter + + config_param :keys, :array, default: [] + config_param :time_key, :string, default: nil + config_param :null_value_pattern, :string, default: nil + config_param :null_empty_string, :bool, default: false + + def configure(conf) + super + + if @time_key && !@keys.include?(@time_key) && @estimate_current_event + raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})" + end + + if @time_format && !@time_key + raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}" + end + + @time_parser = time_parser_create + + if @null_value_pattern + @null_value_pattern = Regexp.new(@null_value_pattern) + end + + @mutex = Mutex.new + end + + def values_map(values) + record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })] + + if @time_key + value = @keep_time_key ? record[@time_key] : record.delete(@time_key) + time = if value.nil? + if @estimate_current_event + Fluent::EventTime.now + else + nil + end + else + @mutex.synchronize { @time_parser.parse(value) } + end + elsif @estimate_current_event + time = Fluent::EventTime.now + else + time = nil + end + + convert_field_type!(record) if @type_converters + + return time, record + end + + private + + def convert_field_type!(record) + @type_converters.each_key { |key| + if value = record[key] + record[key] = convert_type(key, value) + end + } + end + + def convert_value_to_nil(value) + if value and @null_empty_string + value = (value == '') ? nil : value + end + if value and @null_value_pattern + value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value + end + value + end end class JSONParser < Fluent::Plugin::JSONParser + include TypeConverterCompatParameters # TODO: warn when deprecated + def configure(conf) + convert_type_converter_parameters!(conf) + super + end end class TSVParser < Fluent::Plugin::TSVParser + include TypeConverterCompatParameters # TODO: warn when deprecated + def configure(conf) + convert_type_converter_parameters!(conf) + super + end end class LabeledTSVParser < Fluent::Plugin::LabeledTSVParser + include TypeConverterCompatParameters # TODO: warn when deprecated + def configure(conf) + convert_type_converter_parameters!(conf) + super + end end class CSVParser < Fluent::Plugin::CSVParser + include TypeConverterCompatParameters # TODO: warn when deprecated + def configure(conf) + convert_type_converter_parameters!(conf) + super + end end class NoneParser < Fluent::Plugin::NoneParser diff --git a/lib/fluent/plugin/parser.rb b/lib/fluent/plugin/parser.rb index a6ddfe7d4d..8a16a184d6 100644 --- a/lib/fluent/plugin/parser.rb +++ b/lib/fluent/plugin/parser.rb @@ -31,17 +31,38 @@ class ParserError < StandardError; end configured_in :parse - # SET false BEFORE CONFIGURE, to return nil when time not parsed - attr_accessor :estimate_current_event + ### types can be specified as string-based hash style + # field1:type, field2:type, field3:type:option, field4:type:option + ### or, JSON format + # {"field1":"type", "field2":"type", "field3":"type:option", "field4":["type", "option"]} + config_param :types, :hash, value_type: :string, default: nil + # available options are: + # array: (1st) delimiter + # time : type[, format, timezone] -> type should be a valid "time_type"(string/unixtime/float) + # : format[, timezone] + + config_param :time_key, :string, default: nil + config_param :null_value_pattern, :string, default: nil + config_param :null_empty_string, :bool, default: false + config_param :estimate_current_event, :bool, default: true config_param :keep_time_key, :bool, default: false - def initialize + AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array'] + + # for tests + attr_reader :type_converters + + def configure(conf) super - @estimate_current_event = true + + @time_parser = time_parser_create + @null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern) + @type_converters = build_type_converters(@types) + @execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string end - def parse(text) + def parse(text, &block) raise NotImplementedError, "Implement this method in child class" end @@ -51,80 +72,98 @@ def call(*a, &b) parse(*a, &b) end - TimeParser = Fluent::TimeParser - end - - class ValuesParser < Parser - include Fluent::TypeConverter - - config_param :keys, :array, default: [] - config_param :time_key, :string, default: nil - config_param :null_value_pattern, :string, default: nil - config_param :null_empty_string, :bool, default: false - - def configure(conf) - super - - if @time_key && !@keys.include?(@time_key) && @estimate_current_event - raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})" + def parse_time(record) + if @time_key && record.has_key?(@time_key) + src = if @keep_time_key + record[@time_key] + else + record.delete(@time_key) + end + @time_parser.parse(src) + elsif @estimate_current_event + Fluent::EventTime.now + else + nil end + end - if @time_format && !@time_key - raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}" - end + # def parse(text, &block) + # time, record = ... + # yield convert_values(time, record) + # end + def convert_values(time, record) + return time, record unless @execute_convert_values - @time_parser = time_parser_create + record.each_key do |key| + value = record[key] + next unless value # nil/null value is always left as-is. + + if value.is_a?(String) && string_like_null(value) + record[key] = nil + next + end - if @null_value_pattern - @null_value_pattern = Regexp.new(@null_value_pattern) + if @type_converters && @type_converters.has_key?(key) + record[key] = @type_converters[key].call(value) + end end - @mutex = Mutex.new + return time, record end - def values_map(values) - record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })] - - if @time_key - value = @keep_time_key ? record[@time_key] : record.delete(@time_key) - time = if value.nil? - if @estimate_current_event - Fluent::EventTime.now - else - nil - end - else - @mutex.synchronize { @time_parser.parse(value) } - end - elsif @estimate_current_event - time = Fluent::EventTime.now - else - time = nil - end + def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp) + null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) } + end - convert_field_type!(record) if @type_converters + TRUTHY_VALUES = ['true', 'yes', '1'] - return time, record - end + def build_type_converters(types) + return nil unless types - private + converters = {} - def convert_field_type!(record) - @type_converters.each_key { |key| - if value = record[key] - record[key] = convert_type(key, value) + types.each_pair do |field_name, type_definition| + type, option = if type_definition.is_a?(Array) + type_definition + else + type_definition.split(":", 2) + end + unless AVAILABLE_PARSER_VALUE_TYPES.include?(type) + raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'" end - } - end - def convert_value_to_nil(value) - if value and @null_empty_string - value = (value == '') ? nil : value - end - if value and @null_value_pattern - value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value + conv = case type + when 'string' then ->(v){ v.to_s } + when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i } + when 'float' then ->(v){ v.to_f rescue v.to_s.to_f } + when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) } + when 'time' + # comma-separated: time:[timezone:]time_format + # time_format is unixtime/float/string-time-format + timep = if option + time_type = 'string' # estimate + timezone, time_format = option.split(':', 2) + unless Fluent::Timezone.validate(timezone) + timezone, time_format = nil, option + end + if Fluent::TimeMixin::TIME_TYPES.include?(time_format) + time_type, time_format = time_format, nil # unixtime/float + end + time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone) + else + time_parser_create(type: :string, format: nil, timezone: nil) + end + ->(v){ timep.parse(v) rescue nil } + when 'array' + delimiter = option ? option.to_s : ',' + ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } } + else + raise "BUG: unknown type even after check: #{type}" + end + converters[field_name] = conv end - value + + converters end end end diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 8a876ceb8b..c5b2f3a3c6 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -49,6 +49,12 @@ module CompatParameters PARSER_PARAMS = { "format" => "@type", + "types" => nil, + "types_delimiter" => nil, + "types_label_delimiter" => nil, + "null_value_pattern" => "null_value_pattern", + "null_empty_string" => "null_empty_string", + "keys" => "keys", # CSVParser, TSVParser (old ValuesParser) "time_key" => "time_key", "time_format" => "time_format", "delimiter" => "delimiter", @@ -200,6 +206,17 @@ def compat_parameters_parser(conf) # TODO: warn obsolete parameters if these are deprecated hash = compat_parameters_copy_to_subsection_attributes(conf, PARSER_PARAMS) + if conf["types"] + delimiter = conf["types_delimiter"] || ',' + label_delimiter = conf["types_label_delimiter"] || ':' + types = {} + conf['types'].split(delimiter).each do |pair| + key, value = pair.split(label_delimiter, 2) + types[key] = value + end + hash["types"] = JSON.dump(types) + end + e = Fluent::Config::Element.new('parse', '', hash, []) conf.elements << e From 801dda4964e202cb4ff31cb8cdaba1344e180692 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:55:50 +0900 Subject: [PATCH 06/15] fix to display event time in human readable format --- lib/fluent/test/helpers.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/test/helpers.rb b/lib/fluent/test/helpers.rb index c5e206b094..780705d982 100644 --- a/lib/fluent/test/helpers.rb +++ b/lib/fluent/test/helpers.rb @@ -23,7 +23,9 @@ module Test module Helpers # See "Example Custom Assertion: http://test-unit.github.io/test-unit/en/Test/Unit/Assertions.html def assert_equal_event_time(expected, actual, message = nil) - message = build_message(message, < expected but was . EOT From f4e52b8b8bd63ebde6b48235f788a3193a456921 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:56:36 +0900 Subject: [PATCH 07/15] add parser tests for type conversion and time parsing, null handling --- test/plugin/test_parser.rb | 345 ++++++++++++++++++++++++++++++++++++- 1 file changed, 336 insertions(+), 9 deletions(-) diff --git a/test/plugin/test_parser.rb b/test/plugin/test_parser.rb index 700be1d238..ce90659655 100644 --- a/test/plugin/test_parser.rb +++ b/test/plugin/test_parser.rb @@ -1,20 +1,33 @@ require_relative '../helper' require 'fluent/test/driver/parser' require 'fluent/plugin/parser' +require 'json' +require 'timecop' -module ParserTest - class BaseParserTest < ::Test::Unit::TestCase - def setup - Fluent::Test.setup +class ParserTest < ::Test::Unit::TestCase + class ExampleParser < Fluent::Plugin::Parser + def parse(data) + r = JSON.parse(data) + yield convert_values(parse_time(r), r) end + end - def create_driver(conf={}) - Fluent::Test::Driver::Parser.new(Fluent::Plugin::Parser).configure(conf) - end + def create_driver(conf={}) + Fluent::Test::Driver::Parser.new(Fluent::Plugin::Parser).configure(conf) + end + + def setup + Fluent::Test.setup + end + sub_test_case 'base class works as plugin' do def test_init - d = create_driver - assert_true d.instance.estimate_current_event + i = Fluent::Plugin::Parser.new + assert_nil i.types + assert_nil i.null_value_pattern + assert !i.null_empty_string + assert i.estimate_current_event + assert !i.keep_time_key end def test_configure_against_string_literal @@ -29,4 +42,318 @@ def test_parse end end end + + sub_test_case '#string_like_null' do + setup do + @i = ExampleParser.new + end + + test 'returns false if null_empty_string is false and null_value_regexp is nil' do + assert ! @i.string_like_null('a', false, nil) + assert ! @i.string_like_null('', false, nil) + end + + test 'returns true if null_empty_string is true and string value is empty' do + assert ! @i.string_like_null('a', true, nil) + assert @i.string_like_null('', true, nil) + end + + test 'returns true if null_value_regexp has regexp and it matches string value' do + assert ! @i.string_like_null('a', false, /null/i) + assert @i.string_like_null('NULL', false, /null/i) + assert @i.string_like_null('empty', false, /null|empty/i) + end + end + + sub_test_case '#build_type_converters converters' do + setup do + @i = ExampleParser.new + types_config = { + "s" => "string", + "i" => "integer", + "f" => "float", + "b" => "bool", + "t1" => "time", + "t2" => "time:%Y-%m-%d %H:%M:%S.%N", + "t3" => "time:+0100:%Y-%m-%d %H:%M:%S.%N", + "t4" => "time:unixtime", + "t5" => "time:float", + "a1" => "array", + "a2" => "array:|", + } + @hash = { + 'types' => JSON.dump(types_config), + } + end + + test 'to do #to_s by "string" type' do + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["s"] + assert_equal "", c.call("") + assert_equal "a", c.call("a") + assert_equal "1", c.call(1) + assert_equal "1.01", c.call(1.01) + assert_equal "true", c.call(true) + assert_equal "false", c.call(false) + end + + test 'to do #to_i by "integer" type' do + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["i"] + assert_equal 0, c.call("") + assert_equal 0, c.call("0") + assert_equal 0, c.call("a") + assert_equal(-1000, c.call("-1000")) + assert_equal 1, c.call(1) + assert_equal 1, c.call(1.01) + assert_equal 0, c.call(true) + assert_equal 0, c.call(false) + end + + test 'to do #to_f by "float" type' do + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["f"] + assert_equal 0.0, c.call("") + assert_equal 0.0, c.call("0") + assert_equal 0.0, c.call("a") + assert_equal(-1000.0, c.call("-1000")) + assert_equal 1.0, c.call(1) + assert_equal 1.01, c.call(1.01) + assert_equal 0.0, c.call(true) + assert_equal 0.0, c.call(false) + end + + test 'to return true/false, which returns true only for true/yes/1 (C & perl style), by "bool"' do + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["b"] + assert_false c.call("") + assert_false c.call("0") + assert_false c.call("a") + assert_true c.call("1") + assert_true c.call("true") + assert_true c.call("True") + assert_true c.call("YES") + assert_true c.call(true) + assert_false c.call(false) + assert_false c.call("1.0") + end + + test 'to parse time string by ruby default time parser without any options' do + # "t1" => "time", + with_timezone("UTC+02") do # -0200 + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["t1"] + assert_nil c.call("") + assert_equal_event_time event_time("2016-10-21 01:54:30 -0200"), c.call("2016-10-21 01:54:30") + assert_equal_event_time event_time("2016-10-21 03:54:30 -0200"), c.call("2016-10-21 01:54:30 -0400") + assert_equal_event_time event_time("2016-10-21 01:55:24 -0200"), c.call("2016-10-21T01:55:24-02:00") + assert_equal_event_time event_time("2016-10-21 01:55:24 -0200"), c.call("2016-10-21T03:55:24Z") + end + end + + test 'to parse time string with specified time format' do + # "t2" => "time:%Y-%m-%d %H:%M:%S.%N", + with_timezone("UTC+02") do # -0200 + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["t2"] + assert_nil c.call("") + assert_equal_event_time event_time("2016-10-21 01:54:30.123000000 -0200"), c.call("2016-10-21 01:54:30.123") + assert_equal_event_time event_time("2016-10-21 01:54:30.012345678 -0200"), c.call("2016-10-21 01:54:30.012345678") + assert_nil c.call("2016/10/21 015430") + end + end + + test 'to parse time string with specified time format and timezone' do + # "t3" => "time:+0100:%Y-%m-%d %H:%M:%S.%N", + with_timezone("UTC+02") do # -0200 + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["t3"] + assert_nil c.call("") + assert_equal_event_time event_time("2016-10-21 01:54:30.123000000 +0100"), c.call("2016-10-21 01:54:30.123") + assert_equal_event_time event_time("2016-10-21 01:54:30.012345678 +0100"), c.call("2016-10-21 01:54:30.012345678") + end + end + + test 'to parse time string in unix timestamp' do + # "t4" => "time:unixtime", + with_timezone("UTC+02") do # -0200 + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["t4"] + assert_equal_event_time event_time("1970-01-01 00:00:00.0 +0000"), c.call("") + assert_equal_event_time event_time("2016-10-21 01:54:30.0 -0200"), c.call("1477022070") + assert_equal_event_time event_time("2016-10-21 01:54:30.0 -0200"), c.call("1477022070.01") + end + end + + test 'to parse time string in floating poing value' do + # "t5" => "time:float", + with_timezone("UTC+02") do # -0200 + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["t5"] + assert_equal_event_time event_time("1970-01-01 00:00:00.0 +0000"), c.call("") + assert_equal_event_time event_time("2016-10-21 01:54:30.012 -0200"), c.call("1477022070.012") + assert_equal_event_time event_time("2016-10-21 01:54:30.123456789 -0200"), c.call("1477022070.123456789") + end + end + + test 'to return array of string' do + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["a1"] + assert_equal [], c.call("") + assert_equal ["0"], c.call("0") + assert_equal ["0"], c.call(0) + assert_equal ["0", "1"], c.call("0,1") + assert_equal ["0|1", "2"], c.call("0|1,2") + assert_equal ["true"], c.call(true) + end + + test 'to return array of string using specified delimiter' do + @i.configure(config_element('parse', '', @hash)) + c = @i.type_converters["a2"] + assert_equal [], c.call("") + assert_equal ["0"], c.call("0") + assert_equal ["0"], c.call(0) + assert_equal ["0,1"], c.call("0,1") + assert_equal ["0", "1,2"], c.call("0|1,2") + assert_equal ["true"], c.call(true) + end + end + + sub_test_case 'example parser without any configurations' do + setup do + @current_time = Time.parse("2016-10-21 14:22:01.0 +1000") + @current_event_time = Fluent::EventTime.new(@current_time.to_i, 0) + # @current_time.to_i #=> 1477023721 + Timecop.freeze(@current_time) + @i = ExampleParser.new + @i.configure(config_element('parse', '', {})) + end + + teardown do + Timecop.return + end + + test 'parser returns parsed JSON object, leaving empty/NULL strings, with current time' do + json = '{"t1":"1477023720.101","s1":"","s2":"NULL","s3":"null","k1":1,"k2":"13.1","k3":"1","k4":"yes"}' + @i.parse(json) do |time, record| + assert_equal_event_time @current_event_time, time + assert_equal "1477023720.101", record["t1"] + assert_equal "", record["s1"] + assert_equal "NULL", record["s2"] + assert_equal "null", record["s3"] + assert_equal 1, record["k1"] + assert_equal "13.1", record["k2"] + assert_equal "1", record["k3"] + assert_equal "yes", record["k4"] + end + end + end + + sub_test_case 'example parser fully configured' do + setup do + @current_time = Time.parse("2016-10-21 14:22:01.0 +1000") + @current_event_time = Fluent::EventTime.new(@current_time.to_i, 0) + # @current_time.to_i #=> 1477023721 + Timecop.freeze(@current_time) + @i = ExampleParser.new + hash = { + 'keep_time_key' => "no", + 'estimate_current_event' => "yes", + 'time_key' => "t1", + 'time_type' => "float", + 'null_empty_string' => 'yes', + 'null_value_pattern' => 'NULL|null', + 'types' => "k1:string, k2:integer, k3:float, k4:bool", + } + @i.configure(config_element('parse', '', hash)) + end + + teardown do + Timecop.return + end + + test 'parser returns parsed JSON object, leaving empty/NULL strings, with current time' do + json = '{"t1":"1477023720.101","s1":"","s2":"NULL","s3":"null","k1":1,"k2":"13.1","k3":"1","k4":"yes"}' + @i.parse(json) do |time, record| + assert_equal_event_time Fluent::EventTime.new(1477023720, 101_000_000), time + assert !record.has_key?("t1") + assert{ record.has_key?("s1") && record["s1"].nil? } + assert{ record.has_key?("s2") && record["s2"].nil? } + assert{ record.has_key?("s3") && record["s3"].nil? } + assert_equal "1", record["k1"] + assert_equal 13, record["k2"] + assert_equal 1.0, record["k3"] + assert_equal true, record["k4"] + end + end + + test 'parser returns current time if a field is missing specified by time_key' do + json = '{"s1":"","s2":"NULL","s3":"null","k1":1,"k2":"13.1","k3":"1","k4":"yes"}' + @i.parse(json) do |time, record| + assert_equal_event_time @current_event_time, time + assert !record.has_key?("t1") + assert{ record.has_key?("s1") && record["s1"].nil? } + assert{ record.has_key?("s2") && record["s2"].nil? } + assert{ record.has_key?("s3") && record["s3"].nil? } + assert_equal "1", record["k1"] + assert_equal 13, record["k2"] + assert_equal 1.0, record["k3"] + assert_equal true, record["k4"] + end + end + end + + sub_test_case 'example parser configured not to estimate current time, and to keep time key' do + setup do + @current_time = Time.parse("2016-10-21 14:22:01.0 +1000") + @current_event_time = Fluent::EventTime.new(@current_time.to_i, 0) + # @current_time.to_i #=> 1477023721 + Timecop.freeze(@current_time) + @i = ExampleParser.new + hash = { + 'keep_time_key' => "yes", + 'estimate_current_event' => "no", + 'time_key' => "t1", + 'time_type' => "float", + 'null_empty_string' => 'yes', + 'null_value_pattern' => 'NULL|null', + 'types' => "k1:string, k2:integer, k3:float, k4:bool", + } + @i.configure(config_element('parse', '', hash)) + end + + teardown do + Timecop.return + end + + test 'parser returns parsed time with original field and value if the field of time exists' do + json = '{"t1":"1477023720.101","s1":"","s2":"NULL","s3":"null","k1":1,"k2":"13.1","k3":"1","k4":"yes"}' + @i.parse(json) do |time, record| + assert_equal_event_time Fluent::EventTime.new(1477023720, 101_000_000), time + assert_equal "1477023720.101", record["t1"] + assert{ record.has_key?("s1") && record["s1"].nil? } + assert{ record.has_key?("s2") && record["s2"].nil? } + assert{ record.has_key?("s3") && record["s3"].nil? } + assert_equal "1", record["k1"] + assert_equal 13, record["k2"] + assert_equal 1.0, record["k3"] + assert_equal true, record["k4"] + end + end + + test 'parser returns nil as time if the field of time is missing' do + json = '{"s1":"","s2":"NULL","s3":"null","k1":1,"k2":"13.1","k3":"1","k4":"yes"}' + @i.parse(json) do |time, record| + assert_nil time + assert !record.has_key?("t1") + assert{ record.has_key?("s1") && record["s1"].nil? } + assert{ record.has_key?("s2") && record["s2"].nil? } + assert{ record.has_key?("s3") && record["s3"].nil? } + assert_equal "1", record["k1"] + assert_equal 13, record["k2"] + assert_equal 1.0, record["k3"] + assert_equal true, record["k4"] + end + end + end end From fdbc98153d93a192233967c923697ca516558cdc Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 21 Oct 2016 18:57:12 +0900 Subject: [PATCH 08/15] update built-in plugins and tests with updated parser features --- lib/fluent/plugin/parser_csv.rb | 9 ++++- lib/fluent/plugin/parser_ltsv.rb | 29 +++++-------- lib/fluent/plugin/parser_regexp.rb | 56 +++++++------------------- lib/fluent/plugin/parser_tsv.rb | 10 +++-- test/plugin/test_parser_csv.rb | 3 +- test/plugin/test_parser_json.rb | 3 +- test/plugin/test_parser_labeled_tsv.rb | 3 +- test/plugin/test_parser_none.rb | 3 +- test/plugin/test_parser_regexp.rb | 12 ++++-- test/plugin/test_parser_tsv.rb | 7 ++-- 10 files changed, 54 insertions(+), 81 deletions(-) diff --git a/lib/fluent/plugin/parser_csv.rb b/lib/fluent/plugin/parser_csv.rb index 8e5661d53b..8fc156cbf1 100644 --- a/lib/fluent/plugin/parser_csv.rb +++ b/lib/fluent/plugin/parser_csv.rb @@ -20,13 +20,18 @@ module Fluent module Plugin - class CSVParser < ValuesParser + class CSVParser < Parser Plugin.register_parser('csv', self) + desc 'Names of fields included in each lines' + config_param :keys, :array, value_type: :string + desc 'The delimiter character (or string) of CSV values' config_param :delimiter, :string, default: ',' def parse(text) - yield values_map(CSV.parse_line(text, col_sep: @delimiter)) + values = CSV.parse_line(text, col_sep: @delimiter) + r = Hash[@keys.zip(values)] + yield convert_values(parse_time(r), r) end end end diff --git a/lib/fluent/plugin/parser_ltsv.rb b/lib/fluent/plugin/parser_ltsv.rb index a879d7529a..10de5bb231 100644 --- a/lib/fluent/plugin/parser_ltsv.rb +++ b/lib/fluent/plugin/parser_ltsv.rb @@ -15,35 +15,26 @@ # require 'fluent/plugin/parser' -require 'fluent/time' module Fluent module Plugin - class LabeledTSVParser < ValuesParser + class LabeledTSVParser < Parser Plugin.register_parser('ltsv', self) - config_param :delimiter, :string, default: "\t" + desc 'The delimiter character (or string) of TSV values' + config_param :delimiter, :string, default: "\t" + desc 'The delimiter character between field name and value' config_param :label_delimiter, :string, default: ":" - config_param :time_key, :string, default: "time" - def configure(conf) - # this assignment is not to raise ConfigError in ValuesParser#configure - conf['keys'] = conf['time_key'] || 'time' - super(conf) - end + config_set_default :time_key, 'time' def parse(text) - # TODO: thread unsafe: @keys might be changed by other threads - @keys = [] - values = [] - - text.split(delimiter).each do |pair| - key, value = pair.split(label_delimiter, 2) - @keys.push(key) - values.push(value) + r = {} + text.split(@delimiter).each do |pair| + key, value = pair.split(@label_delimiter, 2) + r[key] = value end - - yield values_map(values) + yield convert_values(parse_time(r), r) end end end diff --git a/lib/fluent/plugin/parser_regexp.rb b/lib/fluent/plugin/parser_regexp.rb index 302ab6db22..e309e217eb 100644 --- a/lib/fluent/plugin/parser_regexp.rb +++ b/lib/fluent/plugin/parser_regexp.rb @@ -1,33 +1,26 @@ module Fluent module Plugin class RegexpParser < Parser - include Fluent::Compat::TypeConverter - Plugin.register_parser("regexp", self) - config_param :expression, :string, default: "" + config_param :expression, :string config_param :ignorecase, :bool, default: false config_param :multiline, :bool, default: false - config_param :time_key, :string, default: 'time' - def initialize - super - @mutex = Mutex.new - end + config_set_default :time_key, 'time' def configure(conf) super - @time_parser = time_parser_create - unless @expression.empty? - if @expression[0] == "/" && @expression[-1] == "/" - regexp_option = 0 - regexp_option |= Regexp::IGNORECASE if @ignorecase - regexp_option |= Regexp::MULTILINE if @multiline - @regexp = Regexp.new(@expression[1..-2], regexp_option) - else - raise Fluent::ConfigError, "expression must start with `/` and end with `/`: #{@expression}" - end - end + + expr = if @expression[0] == "/" && @expression[-1] == "/" + @expression[1..-2] + else + @expression + end + regexp_option = 0 + regexp_option |= Regexp::IGNORECASE if @ignorecase + regexp_option |= Regexp::MULTILINE if @multiline + @regexp = Regexp.new(expr, regexp_option) end def parse(text) @@ -37,35 +30,14 @@ def parse(text) return end - time = nil record = {} - m.names.each do |name| if value = m[name] - if name == @time_key - time = @mutex.synchronize { @time_parser.parse(value) } - if @keep_time_key - record[name] = if @type_converters.nil? - value - else - convert_type(name, value) - end - end - else - record[name] = if @type_converters.nil? - value - else - convert_type(name, value) - end - end + record[name] = value end end - if @estimate_current_event - time ||= Fluent::EventTime.now - end - - yield time, record + yield convert_values(parse_time(record), record) end end end diff --git a/lib/fluent/plugin/parser_tsv.rb b/lib/fluent/plugin/parser_tsv.rb index e6edb388b9..c12fa20d0c 100644 --- a/lib/fluent/plugin/parser_tsv.rb +++ b/lib/fluent/plugin/parser_tsv.rb @@ -15,13 +15,15 @@ # require 'fluent/plugin/parser' -require 'fluent/time' module Fluent module Plugin - class TSVParser < ValuesParser + class TSVParser < Parser Plugin.register_parser('tsv', self) + desc 'Names of fields included in each lines' + config_param :keys, :array, value_type: :string + desc 'The delimiter character (or string) of TSV values' config_param :delimiter, :string, default: "\t" def configure(conf) @@ -30,7 +32,9 @@ def configure(conf) end def parse(text) - yield values_map(text.split(@delimiter, @key_num)) + values = text.split(@delimiter, @key_num) + r = Hash[@keys.zip(values)] + yield convert_values(parse_time(r), r) end end end diff --git a/test/plugin/test_parser_csv.rb b/test/plugin/test_parser_csv.rb index 8e24cdb34b..e163516977 100644 --- a/test/plugin/test_parser_csv.rb +++ b/test/plugin/test_parser_csv.rb @@ -37,8 +37,7 @@ def test_parse_without_time(param) } d = Fluent::Test::Driver::Parser.new(Fluent::Plugin::CSVParser) - d.instance.estimate_current_event = false - d.configure('keys' => param) + d.configure('keys' => param, 'estimate_current_event' => 'no') d.instance.parse("192.168.0.1,111") { |time, record| assert_equal({ 'c' => '192.168.0.1', diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb index 17761a7cbf..d841cedb38 100644 --- a/test/plugin/test_parser_json.rb +++ b/test/plugin/test_parser_json.rb @@ -44,8 +44,7 @@ def test_parse_without_time(data) } parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser) - parser.instance.estimate_current_event = false - parser.configure('json_parser' => data) + parser.configure('json_parser' => data, 'estimate_current_event' => 'false') parser.instance.parse('{"host":"192.168.0.1","size":777,"method":"PUT"}') { |time, record| assert_equal({ 'host' => '192.168.0.1', diff --git a/test/plugin/test_parser_labeled_tsv.rb b/test/plugin/test_parser_labeled_tsv.rb index 8e4c3e9c60..76cac67029 100644 --- a/test/plugin/test_parser_labeled_tsv.rb +++ b/test/plugin/test_parser_labeled_tsv.rb @@ -78,8 +78,7 @@ def test_parse_without_time } parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::LabeledTSVParser) - parser.instance.estimate_current_event = false - parser.configure({}) + parser.configure({'estimate_current_event' => 'no'}) parser.instance.parse("host:192.168.0.1\treq_id:111") { |time, record| assert_equal({ 'host' => '192.168.0.1', diff --git a/test/plugin/test_parser_none.rb b/test/plugin/test_parser_none.rb index a7b8e99f63..e514ab395c 100644 --- a/test/plugin/test_parser_none.rb +++ b/test/plugin/test_parser_none.rb @@ -43,8 +43,7 @@ def test_parse_without_default_time } parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin.new_parser('none')) - parser.instance.estimate_current_event = false - parser.configure({}) + parser.configure({'estimate_current_event' => 'false'}) parser.instance.parse('log message!') { |time, record| assert_equal({'message' => 'log message!'}, record) assert_nil time, "parser returns nil w/o time if configured so" diff --git a/test/plugin/test_parser_regexp.rb b/test/plugin/test_parser_regexp.rb index b16e5f3ebf..b490cb6651 100644 --- a/test/plugin/test_parser_regexp.rb +++ b/test/plugin/test_parser_regexp.rb @@ -28,7 +28,12 @@ def create_driver(regexp, conf = {}, initialize_conf: false) if initialize_conf Fluent::Test::Driver::Parser.new(Fluent::Compat::TextParser::RegexpParser.new(regexp, conf)) else - Fluent::Test::Driver::Parser.new(Fluent::Compat::TextParser::RegexpParser.new(regexp)).configure(conf) + # Fluent::Test::Driver::Parser.new(Fluent::Compat::TextParser::RegexpParser.new(regexp)).configure(conf) + instance = Fluent::Compat::TextParser::RegexpParser.new(regexp) + instance.configure(conf) + d = Struct.new(:instance).new + d.instance = instance + d end end @@ -188,12 +193,11 @@ def test_parse_with_typed internal_test_case(d.instance) end - def test_parse_with_typed_and_name_separator + def test_parse_with_typed_by_json_hash conf = { 'expression' => %q!/^(?[^ ]*) [^ ]* (?[^ ]*) \[(?