Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize multiple filters call #1145

Merged
merged 4 commits into from
Aug 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions example/multi_filters.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# This example is to measure optimized filter pipeline performance.

<source>
@type dummy
tag test
size 1000
</source>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<filter test>
@type grep
exclude1 hello .
</filter>

<match test>
@type buffered_null
</match>
80 changes: 76 additions & 4 deletions lib/fluent/event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,23 +136,95 @@ class Pipeline
def initialize
@filters = []
@output = nil
@optimizer = FilterOptimizer.new
end

def add_filter(filter)
@filters << filter
@optimizer.filters = @filters
end

def set_output(output)
@output = output
end

def emit_events(tag, es)
processed = es
@filters.each { |filter|
processed = filter.filter_stream(tag, processed)
}
processed = @optimizer.filter_stream(tag, es)
@output.emit_events(tag, processed)
end

class FilterOptimizer
def initialize(filters = [])
@filters = filters
end

def filters=(filters)
@filters = filters
reset_optimization
end

def filter_stream(tag, es)
if optimizable?
optimized_filter_stream(tag, es)
else
@filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }
end
end

private

def optimized_filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
filtered_record = record
filtered_time = time

catch :break_loop do
@filters.each do |filter|
if filter.has_filter_with_time
begin
filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record)
throw :break_loop unless filtered_record && filtered_time
rescue => e
filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
end
else
begin
filtered_record = filter.filter(tag, filtered_time, filtered_record)
throw :break_loop unless filtered_record
rescue => e
filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
end
end
end

new_es.add(filtered_time, filtered_record)
end
end
new_es
end

def optimizable?
return @optimizable unless @optimizable.nil?
@optimizable = if filters_having_filter_stream.empty?
true
else
$log.info "Filtering works with worse performance, because #{filters_having_filter_stream.map(&:class)} uses `#filter_stream` method."
false
end
end

def filters_having_filter_stream
@filters_having_filter_stream ||= @filters.select do |filter|
filter.class.instance_methods(false).include?(:filter_stream)
end
end

def reset_optimization
@optimizable = nil
@filters_having_filter_stream = nil
end
end
end

def find(tag)
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class Filter < Base

helpers :event_emitter

attr_reader :has_filter_with_time

def initialize
super
@has_filter_with_time = has_filter_with_time?
Expand Down
10 changes: 9 additions & 1 deletion lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class DummyInput < Input

desc "The value is the tag assigned to the generated events."
config_param :tag, :string
desc "The number of events in event stream of each emits."
config_param :size, :integer, default: 1
desc "It configures how many events to generate per second."
config_param :rate, :integer, default: 1
desc "If specified, each generated event has an auto-incremented key field."
Expand Down Expand Up @@ -97,7 +99,13 @@ def run

def emit(num)
begin
num.times { router.emit(@tag, Fluent::Engine.now, generate()) }
if @size > 1
num.times do
router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] })
end
else
num.times { router.emit(@tag, Fluent::Engine.now, generate) }
end
rescue => _
# ignore all errors not to stop emits by emit errors
end
Expand Down
87 changes: 87 additions & 0 deletions test/test_event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ def event_router

sub_test_case 'filter' do
test 'filter should be called when tag matched' do
filter = Class.new(FluentTestFilter) { |x|
def filter_stream(_tag, es); end
}.new

event_router.add_rule('test', filter)

assert_rr do
Expand Down Expand Up @@ -229,6 +233,89 @@ def event_router
end
end

sub_test_case 'optimized filter' do
setup do
@record = { 'k' => 'v' }
@now = Engine.now
end

test 'call optimized filter when the filter plugin implements #filter without #filter_stream' do
event_router.add_rule('test', filter)

assert_rr do
mock(filter).filter('test', @now, @record) { @record }
event_router.emit('test', @now, @record)
end
end

test 'call optimized filter when the filter plugin implements #filter_with_time without #filter_stream' do
filter = Class.new(FluentTestFilter) {
undef_method :filter
def filter_with_time(tag, time, record); end
}.new

event_router.add_rule('test', filter)

assert_rr do
mock(filter).filter_with_time('test', @now, @record) { [time, @record] }
event_router.emit('test', @now, @record)
end
end

test "don't call optimized filter when filter plugins implement #filter_stream" do
filter = Class.new(FluentTestFilter) {
undef_method :filter
def filter_stream(tag, time, record); end
}.new

event_router.add_rule('test', filter)

assert_rr do
mock(filter).filter_stream('test', is_a(OneEventStream)) { OneEventStream.new(@now, @record) }
event_router.emit('test', @now, @record)
end
end

test 'call optimized filter when filter plugins have #filter_with_time instead of #filter' do
filter_with_time = Class.new(FluentTestFilter) {
undef_method :filter
def filter_with_time(tag, time, record); end
}.new

event_router.add_rule('test', filter_with_time)
event_router.add_rule('test', filter)

assert_rr do
mock(filter_with_time).filter_with_time('test', @now, @record) { [@now + 1, @record] }
mock(filter).filter('test', @now + 1, @record) { @record }
event_router.emit('test', @now, @record)
end
end

test "don't call optimized filter even if just a filter of some filters implements #filter_stream method" do
filter_stream = Class.new(FluentTestFilter) {
def filter_stream(tag, record); end
}.new

filter_with_time = Class.new(FluentTestFilter) {
undef_method :filter
def filter_with_time(tag, time, record); end
}.new

filters = [filter_stream, filter_with_time, filter]
filters.each { |f| event_router.add_rule('test', f) }

e = OneEventStream.new(@now, @record)
assert_rr do
mock($log).info("Filtering works with worse performance, because #{[filter_stream].map(&:class)} uses `#filter_stream` method.")
mock(filter_stream).filter_stream('test', is_a(OneEventStream)) { e }
mock(filter).filter_stream('test', is_a(OneEventStream)) { e }
mock(filter_with_time).filter_stream('test', is_a(OneEventStream)) { e }
event_router.emit('test', @now, @record)
end
end
end

sub_test_case 'emit_error_handler' do
test 'call handle_emits_error when emit failed' do
event_router.add_rule('test', error_output)
Expand Down