Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record accessor helper #1637

Merged
merged 8 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("tzinfo", ["~> 1.0"])
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
gem.add_runtime_dependency("strptime", ["~> 0.1.7"])
gem.add_runtime_dependency("ruby_dig", ["~> 0.0.2"])

# build gem for a certain platform. see also Rakefile
fake_platform = ENV['GEM_BUILD_FAKE_PLATFORM'].to_s
Expand Down
40 changes: 28 additions & 12 deletions lib/fluent/plugin/filter_grep.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ module Fluent::Plugin
class GrepFilter < Filter
Fluent::Plugin.register_filter('grep', self)

def initialize
super

@_regexps = {}
@_excludes = {}
end

helpers :record_accessor

REGEXP_MAX_NUM = 20

(1..REGEXP_MAX_NUM).each {|i| config_param :"regexp#{i}", :string, default: nil, deprecated: "Use <regexp> section" }
Expand Down Expand Up @@ -52,31 +61,38 @@ class GrepFilter < Filter
def configure(conf)
super

@_regexps = {}
rs = {}
(1..REGEXP_MAX_NUM).each do |i|
next unless conf["regexp#{i}"]
key, regexp = conf["regexp#{i}"].split(/ /, 2)
raise Fluent::ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp
raise Fluent::ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @_regexps[key]
@_regexps[key] = Regexp.compile(regexp)
raise Fluent::ConfigError, "regexp#{i} contains a duplicated key, #{key}" if rs[key]
rs[key] = Regexp.compile(regexp)
end

@_excludes = {}
es = {}
(1..REGEXP_MAX_NUM).each do |i|
next unless conf["exclude#{i}"]
key, exclude = conf["exclude#{i}"].split(/ /, 2)
raise Fluent::ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude
raise Fluent::ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @_excludes[key]
@_excludes[key] = Regexp.compile(exclude)
raise Fluent::ConfigError, "exclude#{i} contains a duplicated key, #{key}" if es[key]
es[key] = Regexp.compile(exclude)
end

@regexps.each do |e|
raise Fluent::ConfigError, "Duplicate key: #{e.key}" if @_regexps.key?(e.key)
@_regexps[e.key] = e.pattern
raise Fluent::ConfigError, "Duplicate key: #{e.key}" if rs.key?(e.key)
rs[e.key] = e.pattern
end
@excludes.each do |e|
raise Fluent::ConfigError, "Duplicate key: #{e.key}" if @_excludes.key?(e.key)
@_excludes[e.key] = e.pattern
raise Fluent::ConfigError, "Duplicate key: #{e.key}" if es.key?(e.key)
es[e.key] = e.pattern
end

rs.each_pair do |k, v|
@_regexps[record_accessor_create(k)] = v
end
es.each_pair do |k, v|
@_excludes[record_accessor_create(k)] = v
end
end

Expand All @@ -85,10 +101,10 @@ def filter(tag, time, record)
begin
catch(:break_loop) do
@_regexps.each do |key, regexp|
throw :break_loop unless ::Fluent::StringUtil.match_regexp(regexp, record[key].to_s)
throw :break_loop unless ::Fluent::StringUtil.match_regexp(regexp, key.call(record).to_s)
end
@_excludes.each do |key, exclude|
throw :break_loop if ::Fluent::StringUtil.match_regexp(exclude, record[key].to_s)
throw :break_loop if ::Fluent::StringUtil.match_regexp(exclude, key.call(record).to_s)
end
result = record
end
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
require 'fluent/plugin_helper/socket'
require 'fluent/plugin_helper/server'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/plugin_helper/compat_parameters'

module Fluent
Expand Down
172 changes: 172 additions & 0 deletions lib/fluent/plugin_helper/record_accessor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/config/error'
unless {}.respond_to?(:dig)
begin
# backport_dig is faster than ruby_dig so prefer backport_dig.
require 'backport_dig'
rescue LoadError
require 'ruby_dig'
end
end

module Fluent
module PluginHelper
module RecordAccessor
def record_accessor_create(param)
Accessor.new(param)
end

class Accessor
def initialize(param)
@keys = Accessor.parse_parameter(param)

# Call [] for single key to reduce dig overhead
m = method(@keys.is_a?(Array) ? :call_dig : :call_index)
(class << self; self; end).module_eval do
define_method(:call, m)
end
end

def call(r)
end

# To optimize the performance, use class_eval with pre-expanding @keys
# See https://gist.github.com/repeatedly/ab553ed260cd080bd01ec71da9427312
def call_dig(r)
r.dig(*@keys)
end

def call_index(r)
r[@keys]
end

def self.parse_parameter(param)
if param.start_with?('$.')
parse_dot_notation(param)
elsif param.start_with?('$[')
parse_bracket_notation(param)
else
param
end
end

def self.parse_dot_notation(param)
result = []
keys = param[2..-1].split('.')
keys.each { |key|
if key.include?('[')
result.concat(parse_dot_array_op(key, param))
else
result << key
end
}

raise Fluent::ConfigError, "empty keys in dot notation" if result.empty?
validate_dot_keys(result)

result
end

def self.validate_dot_keys(keys)
keys.each { |key|
next unless key.is_a?(String)
if /\s+/.match(key)
raise Fluent::ConfigError, "whitespace character is not allowed in dot notation. Use bracket notation: #{key}"
end
}
end

def self.parse_dot_array_op(key, param)
start = key.index('[')
result = if start.zero?
[]
else
[key[0..start - 1]]
end
key = key[start + 1..-1]
in_bracket = true

until key.empty?
if in_bracket
if i = key.index(']')
index_value = key[0..i - 1]
raise Fluent::ConfigError, "missing array index in '[]'. Invalid syntax: #{param}" if index_value == ']'
result << Integer(index_value)
key = key[i + 1..-1]
in_bracket = false
else
raise Fluent::ConfigError, "'[' found but ']' not found. Invalid syntax: #{param}"
end
else
if i = key.index('[')
key = key[i + 1..-1]
in_bracket = true
else
raise Fluent::ConfigError, "found more characters after ']'. Invalid syntax: #{param}"
end
end
end

result
end

def self.parse_bracket_notation(param)
orig_param = param
result = []
param = param[1..-1]
in_bracket = false

until param.empty?
if in_bracket
if param[0] == "'"
if i = param.index("']")
result << param[1..i - 1]
param = param[i + 2..-1]
in_bracket = false
else
raise Fluent::ConfigError, "Incomplete bracket. Invalid syntax: #{orig_param}"
end
else
if i = param.index(']')
index_value = param[0..i - 1]
raise Fluent::ConfigError, "missing array index in '[]'. Invalid syntax: #{param}" if index_value == ']'
result << Integer(index_value)
param = param[i + 1..-1]
in_bracket = false
else
raise Fluent::ConfigError, "'[' found but ']' not found. Invalid syntax: #{orig_param}"
end
end
else
if i = param.index('[')
param = param[i + 1..-1]
in_bracket = true
else
raise Fluent::ConfigError, "found more characters after ']'. Invalid syntax: #{orig_param}"
end
end
end

raise Fluent::ConfigError, "empty keys in bracket notation" if result.empty?

result
end
end
end
end
end
60 changes: 58 additions & 2 deletions test/plugin/test_filter_grep.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ def create_driver(conf = '')

test "regexpN can contain a space" do
d = create_driver(%[regexp1 message foo])
assert_equal(Regexp.compile(/ foo/), d.instance._regexps['message'])
d.instance._regexps.each_pair { |key, value|
assert_equal(Regexp.compile(/ foo/), value)
}
end

test "excludeN can contain a space" do
d = create_driver(%[exclude1 message foo])
assert_equal(Regexp.compile(/ foo/), d.instance._excludes['message'])
d.instance._excludes.each_pair { |key, value|
assert_equal(Regexp.compile(/ foo/), value)
}
end

sub_test_case "duplicate key" do
Expand Down Expand Up @@ -163,6 +167,58 @@ def messages
end
end

sub_test_case 'nested keys' do
def messages
[
{"nest1" => {"nest2" => "INFO"}},
{"nest1" => {"nest2" => "WARN"}},
{"nest1" => {"nest2" => "WARN"}}
]
end

def filter(config, msgs)
d = create_driver(config)
d.run {
msgs.each { |msg|
d.feed("filter.test", @time, {'foo' => 'bar', 'message' => msg})
}
}
d.filtered_records
end

test 'regexps' do
conf = %[
<regexp>
key $.message.nest1.nest2
pattern WARN
</regexp>
]
filtered_records = filter(conf, messages)
assert_equal(2, filtered_records.size)
assert_block('only 2 nested logs') do
filtered_records.all? { |r|
r['message']['nest1']['nest2'] == 'WARN'
}
end
end

test 'excludes' do
conf = %[
<exclude>
key $.message.nest1.nest2
pattern WARN
</exclude>
]
filtered_records = filter(conf, messages)
assert_equal(1, filtered_records.size)
assert_block('only 2 nested logs') do
filtered_records.all? { |r|
r['message']['nest1']['nest2'] == 'INFO'
}
end
end
end

sub_test_case 'grep non-string jsonable values' do
def filter(msg, config = 'regexp1 message 0')
d = create_driver(config)
Expand Down
Loading