Skip to content

Commit

Permalink
Merge pull request #1191 from fluent/port-parser-filter
Browse files Browse the repository at this point in the history
Port parser filter from fluent-plugin-parser. fix #1189
  • Loading branch information
repeatedly authored Nov 10, 2016
2 parents a3a8444 + f64bb77 commit 7b0c40b
Show file tree
Hide file tree
Showing 3 changed files with 776 additions and 2 deletions.
108 changes: 108 additions & 0 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/time'
require 'fluent/config/error'
require 'fluent/plugin/filter'
require 'fluent/plugin_helper/parser'
require 'fluent/plugin_helper/compat_parameters'

module Fluent::Plugin
class ParserFilter < Filter
Fluent::Plugin.register_filter('parser', self)

helpers :parser, :compat_parameters

config_param :key_name, :string
config_param :reserve_data, :bool, default: false
config_param :reserve_time, :bool, default: false
config_param :inject_key_prefix, :string, default: nil
config_param :replace_invalid_sequence, :bool, default: false
config_param :hash_value_field, :string, default: nil

attr_reader :parser

def configure(conf)
compat_parameters_convert(conf, :parser)

super

@parser = parser_create
end

FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
REPLACE_CHAR = '?'.freeze

def filter_with_time(tag, time, record)
raw_value = record[@key_name]
if raw_value.nil?
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
return FAILED_RESULT
end
end
begin
@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
time
else
t.nil? ? time : t
end
r = handle_parsed(tag, record, t, values)
return t, r
else
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'"))
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
return t, r
else
return FAILED_RESULT
end
end
end
rescue Fluent::Plugin::Parser::ParserError => e
router.emit_error_event(tag, time, record, e)
return FAILED_RESULT
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}"))
return FAILED_RESULT
end
end

private

def handle_parsed(tag, record, t, values)
if values && @inject_key_prefix
values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]
end
r = @hash_value_field ? {@hash_value_field => values} : values
if @reserve_data
r = r ? record.merge(r) : record
end
r
end
end
end
5 changes: 3 additions & 2 deletions lib/fluent/plugin_helper/compat_parameters.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ module CompatParameters
"types" => nil,
"types_delimiter" => nil,
"types_label_delimiter" => nil,
"null_value_pattern" => "null_value_pattern",
"null_empty_string" => "null_empty_string",
"keys" => "keys", # CSVParser, TSVParser (old ValuesParser)
"time_key" => "time_key",
"time_format" => "time_format",
"delimiter" => "delimiter",
"keep_time_key" => "keep_time_key",
"null_empty_string" => "null_empty_string",
"null_value_pattern" => "null_value_pattern",
"json_parser" => "json_parser", # JSONParser
"label_delimiter" => "label_delimiter", # LabeledTSVParser
"format_firstline" => "format_firstline", # MultilineParser
Expand Down
Loading

0 comments on commit 7b0c40b

Please sign in to comment.