Skip to content

Commit

Permalink
Merge pull request #905 from fluent/add-new-filter-base-class
Browse files Browse the repository at this point in the history
Add new base class for Filter plugins
  • Loading branch information
tagomoris committed Apr 20, 2016
2 parents 88054a0 + 45be78b commit 6dbf4ea
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 0 deletions.
51 changes: 51 additions & 0 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/base'

require 'fluent/event'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'

module Fluent
module Plugin
class Filter < Base
include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin

helpers :event_emitter

def filter(tag, time, record)
raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
new_es
end
end
end
end
257 changes: 257 additions & 0 deletions test/plugin/test_filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
require_relative '../helper'
require 'fluent/plugin/filter'
require 'fluent/event'
require 'flexmock/test_unit'

module FluentPluginFilterTest
class DummyPlugin < Fluent::Plugin::Filter
end
class NumDoublePlugin < Fluent::Plugin::Filter
def filter(tag, time, record)
r = record.dup
r["num"] = r["num"].to_i * 2
r
end
end
class IgnoreForNumPlugin < Fluent::Plugin::Filter
def filter(tag, time, record)
if record["num"].is_a? Numeric
nil
else
record
end
end
end
class RaiseForNumPlugin < Fluent::Plugin::Filter
def filter(tag, time, record)
if record["num"].is_a? Numeric
raise "Value of num is Number!"
end
record
end
end
end

class FilterPluginTest < Test::Unit::TestCase
DummyRouter = Struct.new(:emits) do
def emit_error_event(tag, time, record, error)
self.emits << [tag, time, record, error]
end
end

teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
end

sub_test_case 'for basic dummy plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::DummyPlugin.new
end

test 'has healthy lifecycle' do
assert !@p.configured?
@p.configure(config_element())
assert @p.configured?

assert !@p.started?
@p.start
assert @p.start

assert !@p.stopped?
@p.stop
assert @p.stopped?

assert !@p.before_shutdown?
@p.before_shutdown
assert @p.before_shutdown?

assert !@p.shutdown?
@p.shutdown
assert @p.shutdown?

assert !@p.after_shutdown?
@p.after_shutdown
assert @p.after_shutdown?

assert !@p.closed?
@p.close
assert @p.closed?

assert !@p.terminated?
@p.terminate
assert @p.terminated?
end

test 'has plugin_id automatically generated' do
assert @p.respond_to?(:plugin_id_configured?)
assert @p.respond_to?(:plugin_id)

@p.configure(config_element())

assert !@p.plugin_id_configured?
assert @p.plugin_id
assert{ @p.plugin_id != 'mytest' }
end

test 'has plugin_id manually configured' do
@p.configure(config_element('ROOT', '', {'@id' => 'mytest'}))
assert @p.plugin_id_configured?
assert_equal 'mytest', @p.plugin_id
end

test 'has plugin logger' do
assert @p.respond_to?(:log)
assert @p.log

# default logger
original_logger = @p.log

@p.configure(config_element('ROOT', '', {'@log_level' => 'debug'}))

assert{ @p.log.object_id != original_logger.object_id }
assert_equal Fluent::Log::LEVEL_DEBUG, @p.log.level
end

test 'can load plugin helpers' do
assert_nothing_raised do
class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter
helpers :storage
end
end
end

test 'plugin does not define #filter raises error' do
es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
assert_raise NotImplementedError do
@p.filter_stream('testing', es)
end
end
end

sub_test_case 'normal filter plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::NumDoublePlugin.new
end

test 'filters events correctly' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 3, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal 2, ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:03 -0700'), ary[1][0]
assert_equal 4, ary[1][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[2][0]
assert_equal 6, ary[2][1]["num"]
end
end

sub_test_case 'filter plugin returns nil for some records' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::IgnoreForNumPlugin.new
end

test 'filter_stream ignores records which #filter return nil' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => 2, "message" => "Ignored, yay!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 2, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal "1", ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[1][0]
assert_equal "3", ary[1][1]["num"]
end
end

sub_test_case 'filter plugin raises error' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::RaiseForNumPlugin.new
end

test 'has router and can emit events to error streams' do
assert @p.has_router?

@p.configure(config_element())
assert @p.router

@p.router = DummyRouter.new([])
data = {'message' => 'mydata'}
dummy_error = EOFError.new("dummy eof")

test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => 2, "message" => "Hello error router!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 2, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal "1", ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[1][0]
assert_equal "3", ary[1][1]["num"]

assert_equal 1, @p.router.emits.size

error_emits = @p.router.emits

assert_equal "testing", error_emits[0][0]
assert_equal event_time('2016-04-19 13:01:03 -0700'), error_emits[0][1]
assert_equal({"num" => 2, "message" => "Hello error router!"}, error_emits[0][2])
assert{ error_emits[0][3].is_a? RuntimeError }
assert_equal "Value of num is Number!", error_emits[0][3].message
end
end
end

0 comments on commit 6dbf4ea

Please sign in to comment.