Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new base class for Filter plugins #905

Merged
merged 2 commits into from
Apr 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/base'

require 'fluent/event'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'

module Fluent
module Plugin
class Filter < Base
include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin

helpers :event_emitter

def filter(tag, time, record)
raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
new_es
end
end
end
end
257 changes: 257 additions & 0 deletions test/plugin/test_filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
require_relative '../helper'
require 'fluent/plugin/filter'
require 'fluent/event'
require 'flexmock/test_unit'

module FluentPluginFilterTest
class DummyPlugin < Fluent::Plugin::Filter
end
class NumDoublePlugin < Fluent::Plugin::Filter
def filter(tag, time, record)
r = record.dup
r["num"] = r["num"].to_i * 2
r
end
end
class IgnoreForNumPlugin < Fluent::Plugin::Filter
def filter(tag, time, record)
if record["num"].is_a? Numeric
nil
else
record
end
end
end
class RaiseForNumPlugin < Fluent::Plugin::Filter
def filter(tag, time, record)
if record["num"].is_a? Numeric
raise "Value of num is Number!"
end
record
end
end
end

class FilterPluginTest < Test::Unit::TestCase
DummyRouter = Struct.new(:emits) do
def emit_error_event(tag, time, record, error)
self.emits << [tag, time, record, error]
end
end

teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
end

sub_test_case 'for basic dummy plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::DummyPlugin.new
end

test 'has healthy lifecycle' do
assert [email protected]?
@p.configure(config_element())
assert @p.configured?

assert [email protected]?
@p.start
assert @p.start

assert [email protected]?
@p.stop
assert @p.stopped?

assert [email protected]_shutdown?
@p.before_shutdown
assert @p.before_shutdown?

assert [email protected]?
@p.shutdown
assert @p.shutdown?

assert [email protected]_shutdown?
@p.after_shutdown
assert @p.after_shutdown?

assert [email protected]?
@p.close
assert @p.closed?

assert [email protected]?
@p.terminate
assert @p.terminated?
end

test 'has plugin_id automatically generated' do
assert @p.respond_to?(:plugin_id_configured?)
assert @p.respond_to?(:plugin_id)

@p.configure(config_element())

assert [email protected]_id_configured?
assert @p.plugin_id
assert{ @p.plugin_id != 'mytest' }
end

test 'has plugin_id manually configured' do
@p.configure(config_element('ROOT', '', {'@id' => 'mytest'}))
assert @p.plugin_id_configured?
assert_equal 'mytest', @p.plugin_id
end

test 'has plugin logger' do
assert @p.respond_to?(:log)
assert @p.log

# default logger
original_logger = @p.log

@p.configure(config_element('ROOT', '', {'@log_level' => 'debug'}))

assert{ @p.log.object_id != original_logger.object_id }
assert_equal Fluent::Log::LEVEL_DEBUG, @p.log.level
end

test 'can load plugin helpers' do
assert_nothing_raised do
class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter
helpers :storage
end
end
end

test 'plugin does not define #filter raises error' do
es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
assert_raise NotImplementedError do
@p.filter_stream('testing', es)
end
end
end

sub_test_case 'normal filter plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::NumDoublePlugin.new
end

test 'filters events correctly' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 3, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal 2, ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:03 -0700'), ary[1][0]
assert_equal 4, ary[1][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[2][0]
assert_equal 6, ary[2][1]["num"]
end
end

sub_test_case 'filter plugin returns nil for some records' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::IgnoreForNumPlugin.new
end

test 'filter_stream ignores records which #filter return nil' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => 2, "message" => "Ignored, yay!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 2, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal "1", ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[1][0]
assert_equal "3", ary[1][1]["num"]
end
end

sub_test_case 'filter plugin raises error' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::RaiseForNumPlugin.new
end

test 'has router and can emit events to error streams' do
assert @p.has_router?

@p.configure(config_element())
assert @p.router

@p.router = DummyRouter.new([])
data = {'message' => 'mydata'}
dummy_error = EOFError.new("dummy eof")

test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => 2, "message" => "Hello error router!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 2, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal "1", ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[1][0]
assert_equal "3", ary[1][1]["num"]

assert_equal 1, @p.router.emits.size

error_emits = @p.router.emits

assert_equal "testing", error_emits[0][0]
assert_equal event_time('2016-04-19 13:01:03 -0700'), error_emits[0][1]
assert_equal({"num" => 2, "message" => "Hello error router!"}, error_emits[0][2])
assert{ error_emits[0][3].is_a? RuntimeError }
assert_equal "Value of num is Number!", error_emits[0][3].message
end
end
end