Skip to content

Commit 0be9a64

Browse files
authored
Merge pull request #1145 from ganmacs/change-piplelineing-rule-to-speed-up
Optimize multiple filters call
2 parents bf78d78 + 7bf115b commit 0be9a64

File tree

5 files changed

+235
-5
lines changed

5 files changed

+235
-5
lines changed

example/multi_filters.conf

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# This example is to measure optimized filter pipeline performance.
2+
3+
<source>
4+
@type dummy
5+
tag test
6+
size 1000
7+
</source>
8+
9+
<filter test>
10+
@type grep
11+
exclude1 hello .
12+
</filter>
13+
14+
<filter test>
15+
@type grep
16+
exclude1 hello .
17+
</filter>
18+
19+
<filter test>
20+
@type grep
21+
exclude1 hello .
22+
</filter>
23+
24+
<filter test>
25+
@type grep
26+
exclude1 hello .
27+
</filter>
28+
29+
<filter test>
30+
@type grep
31+
exclude1 hello .
32+
</filter>
33+
34+
<filter test>
35+
@type grep
36+
exclude1 hello .
37+
</filter>
38+
39+
<filter test>
40+
@type grep
41+
exclude1 hello .
42+
</filter>
43+
44+
<filter test>
45+
@type grep
46+
exclude1 hello .
47+
</filter>
48+
49+
<filter test>
50+
@type grep
51+
exclude1 hello .
52+
</filter>
53+
54+
<filter test>
55+
@type grep
56+
exclude1 hello .
57+
</filter>
58+
59+
<match test>
60+
@type buffered_null
61+
</match>

lib/fluent/event_router.rb

+76-4
Original file line numberDiff line numberDiff line change
@@ -136,23 +136,95 @@ class Pipeline
136136
def initialize
137137
@filters = []
138138
@output = nil
139+
@optimizer = FilterOptimizer.new
139140
end
140141

141142
def add_filter(filter)
142143
@filters << filter
144+
@optimizer.filters = @filters
143145
end
144146

145147
def set_output(output)
146148
@output = output
147149
end
148150

149151
def emit_events(tag, es)
150-
processed = es
151-
@filters.each { |filter|
152-
processed = filter.filter_stream(tag, processed)
153-
}
152+
processed = @optimizer.filter_stream(tag, es)
154153
@output.emit_events(tag, processed)
155154
end
155+
156+
class FilterOptimizer
157+
def initialize(filters = [])
158+
@filters = filters
159+
end
160+
161+
def filters=(filters)
162+
@filters = filters
163+
reset_optimization
164+
end
165+
166+
def filter_stream(tag, es)
167+
if optimizable?
168+
optimized_filter_stream(tag, es)
169+
else
170+
@filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }
171+
end
172+
end
173+
174+
private
175+
176+
def optimized_filter_stream(tag, es)
177+
new_es = MultiEventStream.new
178+
es.each do |time, record|
179+
filtered_record = record
180+
filtered_time = time
181+
182+
catch :break_loop do
183+
@filters.each do |filter|
184+
if filter.has_filter_with_time
185+
begin
186+
filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record)
187+
throw :break_loop unless filtered_record && filtered_time
188+
rescue => e
189+
filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
190+
end
191+
else
192+
begin
193+
filtered_record = filter.filter(tag, filtered_time, filtered_record)
194+
throw :break_loop unless filtered_record
195+
rescue => e
196+
filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
197+
end
198+
end
199+
end
200+
201+
new_es.add(filtered_time, filtered_record)
202+
end
203+
end
204+
new_es
205+
end
206+
207+
def optimizable?
208+
return @optimizable unless @optimizable.nil?
209+
@optimizable = if filters_having_filter_stream.empty?
210+
true
211+
else
212+
$log.info "Filtering works with worse performance, because #{filters_having_filter_stream.map(&:class)} uses `#filter_stream` method."
213+
false
214+
end
215+
end
216+
217+
def filters_having_filter_stream
218+
@filters_having_filter_stream ||= @filters.select do |filter|
219+
filter.class.instance_methods(false).include?(:filter_stream)
220+
end
221+
end
222+
223+
def reset_optimization
224+
@optimizable = nil
225+
@filters_having_filter_stream = nil
226+
end
227+
end
156228
end
157229

158230
def find(tag)

lib/fluent/plugin/filter.rb

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class Filter < Base
3030

3131
helpers :event_emitter
3232

33+
attr_reader :has_filter_with_time
34+
3335
def initialize
3436
super
3537
@has_filter_with_time = has_filter_with_time?

lib/fluent/plugin/in_dummy.rb

+9-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class DummyInput < Input
3030

3131
desc "The value is the tag assigned to the generated events."
3232
config_param :tag, :string
33+
desc "The number of events in event stream of each emits."
34+
config_param :size, :integer, default: 1
3335
desc "It configures how many events to generate per second."
3436
config_param :rate, :integer, default: 1
3537
desc "If specified, each generated event has an auto-incremented key field."
@@ -97,7 +99,13 @@ def run
9799

98100
def emit(num)
99101
begin
100-
num.times { router.emit(@tag, Fluent::Engine.now, generate()) }
102+
if @size > 1
103+
num.times do
104+
router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] })
105+
end
106+
else
107+
num.times { router.emit(@tag, Fluent::Engine.now, generate) }
108+
end
101109
rescue => _
102110
# ignore all errors not to stop emits by emit errors
103111
end

test/test_event_router.rb

+87
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ def event_router
185185

186186
sub_test_case 'filter' do
187187
test 'filter should be called when tag matched' do
188+
filter = Class.new(FluentTestFilter) { |x|
189+
def filter_stream(_tag, es); end
190+
}.new
191+
188192
event_router.add_rule('test', filter)
189193

190194
assert_rr do
@@ -229,6 +233,89 @@ def event_router
229233
end
230234
end
231235

236+
sub_test_case 'optimized filter' do
237+
setup do
238+
@record = { 'k' => 'v' }
239+
@now = Engine.now
240+
end
241+
242+
test 'call optimized filter when the filter plugin implements #filter without #filter_stream' do
243+
event_router.add_rule('test', filter)
244+
245+
assert_rr do
246+
mock(filter).filter('test', @now, @record) { @record }
247+
event_router.emit('test', @now, @record)
248+
end
249+
end
250+
251+
test 'call optimized filter when the filter plugin implements #filter_with_time without #filter_stream' do
252+
filter = Class.new(FluentTestFilter) {
253+
undef_method :filter
254+
def filter_with_time(tag, time, record); end
255+
}.new
256+
257+
event_router.add_rule('test', filter)
258+
259+
assert_rr do
260+
mock(filter).filter_with_time('test', @now, @record) { [time, @record] }
261+
event_router.emit('test', @now, @record)
262+
end
263+
end
264+
265+
test "don't call optimized filter when filter plugins implement #filter_stream" do
266+
filter = Class.new(FluentTestFilter) {
267+
undef_method :filter
268+
def filter_stream(tag, time, record); end
269+
}.new
270+
271+
event_router.add_rule('test', filter)
272+
273+
assert_rr do
274+
mock(filter).filter_stream('test', is_a(OneEventStream)) { OneEventStream.new(@now, @record) }
275+
event_router.emit('test', @now, @record)
276+
end
277+
end
278+
279+
test 'call optimized filter when filter plugins have #filter_with_time instead of #filter' do
280+
filter_with_time = Class.new(FluentTestFilter) {
281+
undef_method :filter
282+
def filter_with_time(tag, time, record); end
283+
}.new
284+
285+
event_router.add_rule('test', filter_with_time)
286+
event_router.add_rule('test', filter)
287+
288+
assert_rr do
289+
mock(filter_with_time).filter_with_time('test', @now, @record) { [@now + 1, @record] }
290+
mock(filter).filter('test', @now + 1, @record) { @record }
291+
event_router.emit('test', @now, @record)
292+
end
293+
end
294+
295+
test "don't call optimized filter even if just a filter of some filters implements #filter_stream method" do
296+
filter_stream = Class.new(FluentTestFilter) {
297+
def filter_stream(tag, record); end
298+
}.new
299+
300+
filter_with_time = Class.new(FluentTestFilter) {
301+
undef_method :filter
302+
def filter_with_time(tag, time, record); end
303+
}.new
304+
305+
filters = [filter_stream, filter_with_time, filter]
306+
filters.each { |f| event_router.add_rule('test', f) }
307+
308+
e = OneEventStream.new(@now, @record)
309+
assert_rr do
310+
mock($log).info("Filtering works with worse performance, because #{[filter_stream].map(&:class)} uses `#filter_stream` method.")
311+
mock(filter_stream).filter_stream('test', is_a(OneEventStream)) { e }
312+
mock(filter).filter_stream('test', is_a(OneEventStream)) { e }
313+
mock(filter_with_time).filter_stream('test', is_a(OneEventStream)) { e }
314+
event_router.emit('test', @now, @record)
315+
end
316+
end
317+
end
318+
232319
sub_test_case 'emit_error_handler' do
233320
test 'call handle_emits_error when emit failed' do
234321
event_router.add_rule('test', error_output)

0 commit comments

Comments
 (0)