Skip to content

Commit

Permalink
Merge pull request #1800 from fluent/delete-nested-field
Browse files Browse the repository at this point in the history
Delete nested field
  • Loading branch information
repeatedly authored Jan 9, 2018
2 parents ce6cda0 + 662e3a1 commit ccd971d
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
9 changes: 7 additions & 2 deletions lib/fluent/plugin/filter_record_transformer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ module Fluent::Plugin
class RecordTransformerFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('record_transformer', self)

helpers :record_accessor

desc 'A comma-delimited list of keys to delete.'
config_param :remove_keys, :array, default: nil
desc 'A comma-delimited list of keys to keep.'
Expand Down Expand Up @@ -56,6 +58,10 @@ def configure(conf)
raise Fluent::ConfigError, "`renew_record` must be true to use `keep_keys`" unless @renew_record
end

@key_deleters = if @remove_keys
@remove_keys.map { |k| record_accessor_create(k) }
end

placeholder_expander_params = {
log: log,
auto_typecast: @auto_typecast,
Expand Down Expand Up @@ -96,8 +102,7 @@ def filter_stream(tag, es)
if @renew_time_key && new_record.has_key?(@renew_time_key)
time = Fluent::EventTime.from_time(Time.at(new_record[@renew_time_key].to_f))
end
@remove_keys.each {|k| new_record.delete(k) } if @remove_keys

@key_deleters.each { |deleter| deleter.delete(new_record) } if @key_deleters
new_es.add(time, new_record)
rescue => e
router.emit_error_event(tag, time, record, e)
Expand Down
35 changes: 32 additions & 3 deletions lib/fluent/plugin_helper/record_accessor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,20 @@ class Accessor
def initialize(param)
@keys = Accessor.parse_parameter(param)

# Call [] for single key to reduce dig overhead
m = method(@keys.is_a?(Array) ? :call_dig : :call_index)
if @keys.is_a?(Array)
@last_key = @keys.last
@dig_keys = @keys[0..-2]
mcall = method(:call_dig)
mdelete = method(:delete_nest)
else
# Call [] for single key to reduce dig overhead
mcall = method(:call_index)
mdelete = method(:delete_top)
end

singleton_class.module_eval do
define_method(:call, m)
define_method(:call, mcall)
define_method(:delete, mdelete)
end
end

Expand All @@ -57,6 +67,25 @@ def call_index(r)
r[@keys]
end

def delete(r)
end

def delete_nest(r)
if target = r.dig(*@dig_keys)
if target.is_a?(Array)
target.delete_at(@last_key)
else
target.delete(@last_key)
end
end
rescue
nil
end

def delete_top(r)
r.delete(@keys)
end

def self.parse_parameter(param)
if param.start_with?('$.')
parse_dot_notation(param)
Expand Down
11 changes: 10 additions & 1 deletion test/plugin/test_filter_record_transformer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def filter(config, msgs = [''])
d = create_driver(config)
d.run {
msgs.each { |msg|
d.feed(@tag, @time, {'foo' => 'bar', 'message' => msg})
d.feed(@tag, @time, {'foo' => 'bar', 'message' => msg, 'nest' => {'k1' => 'v1', 'k2' => 'v2'}})
}
}
d.filtered
Expand All @@ -68,6 +68,7 @@ def filter(config, msgs = [''])
assert_equal(@tag, r['tag'])
assert_equal(Time.at(@time).localtime.to_s, r['time'])
assert_equal("#{@hostname} #{@tag_parts[-1]} #{msgs[i]}", r['message'])
assert_equal({'k1' => 'v1', 'k2' => 'v2'}, r['nest'])
end
end

Expand All @@ -83,6 +84,14 @@ def filter(config, msgs = [''])
end
end

test 'remove_keys with nested key' do
config = CONFIG + %[remove_keys $.nest.k1]
filtered = filter(config)
filtered.each_with_index do |(_t, r), i|
assert_not_include(r['nest'], 'k1')
end
end

test 'renew_record' do
config = CONFIG + %[renew_record true]
msgs = ['1', '2']
Expand Down
33 changes: 33 additions & 0 deletions test/plugin_helper/test_record_accessor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,37 @@ class Dummy < Fluent::Plugin::TestBase
end
end
end

sub_test_case 'Fluent::PluginHelper::RecordAccessor::Accessor#delete' do
setup do
@d = Dummy.new
end

data('normal' => 'key1',
'space' => 'ke y2',
'dot key' => 'this.is.key3')
test 'delete top key' do |param|
r = {'key1' => 'v1', 'ke y2' => 'v2', 'this.is.key3' => 'v3'}
accessor = @d.record_accessor_create(param)
accessor.delete(r)
assert_not_include(r, param)
end

data('bracket' => "$['key1'][0]['ke y2']",
'bracket w/ double quotes' => '$["key1"][0]["ke y2"]')
test "delete nested keys ['key1', 0, 'ke y2']" do |param|
r = {'key1' => [{'ke y2' => "value"}]}
accessor = @d.record_accessor_create(param)
accessor.delete(r)
assert_not_include(r['key1'][0], 'ke y2')
end

test "don't raise an error when unexpected record is coming" do
r = {'key1' => [{'key3' => "value"}]}
accessor = @d.record_accessor_create("$['key1']['key2']['key3']")
assert_nothing_raised do
assert_nil accessor.delete(r)
end
end
end
end

0 comments on commit ccd971d

Please sign in to comment.