Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize multiple filters call #1145

Merged
merged 4 commits into from
Aug 19, 2016

Conversation

ganmacs
Copy link
Member

@ganmacs ganmacs commented Aug 5, 2016

Please merge this PR after #1140 is merged, Becasue this PR depends on #1140.

If your code can be optimized, the speed of filter_stream will be about 1.2 times faster.
This result is caused by avoiding to call a lot of Fluent::MultiEventStream#add.

the requirement of running optimization is that filter plugins that you are using don't implement filter_stream.

the executed command is here.

bundle exec fluentd -c example/multi_filters.conf

The measured code, as shown below. (https://github.com/fluent/fluentd/pull/1145/files#diff-c18beef6caee95367d6268a4c181f913R166)

def filter_stream(tag, es)
  require 'ruby-prof'
  RubyProf.start

  if optimizable?
    processed = optimized_filter_stream(tag, es)
  else
    processed = @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }
  end

  result = RubyProf.stop
  printer = RubyProf::FlatPrinter.new(result)
  printer.print(STDOUT)
  processed
end

The result o calling optimized_filter_stream

* indicates recursively called methods
Measure Mode: wall_time
Thread ID: 70252402759920
Fiber ID: 70252404764700
Total: 0.051646
Sort by: self_time

 %self      total      self      wait     child     calls  name
 19.05      0.024     0.010     0.000     0.014    20000   Hash#each
 15.70      0.041     0.008     0.000     0.033    10000   Fluent::Plugin::GrepFilter#filter
 11.29      0.006     0.006     0.000     0.000    10000   Regexp#match
 10.95      0.011     0.006     0.000     0.006    10000   <Module::Fluent::Compat::StringUtil>#match_regexp
  4.66      0.002     0.002     0.000     0.000    10000   NilClass#to_s
  1.78      0.051     0.001     0.000     0.050    11000  *Kernel#catch
  1.29      0.052     0.001     0.000     0.051     1001  *Array#each
  1.15      0.001     0.001     0.000     0.000     1000   Fluent::MultiEventStream#add
  0.02      0.052     0.000     0.000     0.052        1   Fluent::EventRouter::Pipeline::FilterOptimizer#optimized_filter_stream
  0.02      0.052     0.000     0.000     0.052        1   Fluent::EventRouter::Pipeline::FilterOptimizer#filter_stream
  0.01      0.000     0.000     0.000     0.000        1   Fluent::MultiEventStream#initialize
  0.01      0.000     0.000     0.000     0.000        1   Class#new
  0.00      0.052     0.000     0.000     0.052        1   Fluent::ArrayEventStream#each

This result of calling @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }

* indicates recursively called methods
Measure Mode: wall_time
Thread ID: 70231922124020
Fiber ID: 70231922694520
Total: 0.063186
Sort by: self_time

 %self      total      self      wait     child     calls  name
 17.57      0.026     0.011     0.000     0.015    20000   Hash#each
 17.14      0.037     0.011     0.000     0.026    10000   Kernel#catch
 16.16      0.057     0.010     0.000     0.047        9   Range#each
 14.94      0.046     0.009     0.000     0.037    10000   Fluent::Plugin::GrepFilter#filter
 10.08      0.012     0.006     0.000     0.006    10000   <Module::Fluent::Compat::StringUtil>#match_regexp
  9.56      0.006     0.006     0.000     0.000    10000   Regexp#match
  8.69      0.005     0.005     0.000     0.000    10000   Fluent::MultiEventStream#add
  4.23      0.003     0.003     0.000     0.000    10000   NilClass#to_s
  0.04      0.063     0.000     0.000     0.063       10   Fluent::Plugin::Filter#filter_stream
  0.03      0.000     0.000     0.000     0.000       10   Class#new
  0.03      0.057     0.000     0.000     0.057        9   Fluent::MultiEventStream#each
  0.02      0.063     0.000     0.000     0.063        2  *Array#each
  0.02      0.000     0.000     0.000     0.000       10   Fluent::MultiEventStream#initialize
  0.02      0.063     0.000     0.000     0.063        1   Fluent::EventRouter::Pipeline::FilterOptimizer#filter_stream
  0.01      0.006     0.000     0.000     0.006        1   Fluent::ArrayEventStream#each
  0.01      0.063     0.000     0.000     0.063        1   Enumerable#reduce

Environment

ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G31
  • PROCESSOR: 2.7 GHz Intel Core i5
  • MEMORY: 8 GB 1867 MHz DDR3

@repeatedly
Copy link
Member

You should also measure one filter case.

@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch 2 times, most recently from 038127e to 6bcf841 Compare August 7, 2016 13:30
if optimizable?
optimized_filter_stream(tag, es)
else
$log.info "Can't optimized filters, because some plugins implement `filter_stream`"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log messages should be for users, not only for plugin developers.
So it's better to show "filtering works with worse performance, 'xxx_filter' users #filter_stream method" or so.
Users will get known about performance, and can complaint on that plugin about implementation.

@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch from 6bcf841 to d74b3b7 Compare August 8, 2016 08:28
@ganmacs
Copy link
Member Author

ganmacs commented Aug 8, 2016

@repeatedly
The result of one filter case is below.

The result of calling optimized_filter_stream.

* indicates recursively called methods
Measure Mode: wall_time
Thread ID: 70314214842120
Fiber ID: 70314197783380
Total: 0.007549
Sort by: self_time

 %self      total      self      wait     child     calls  name
 19.04      0.001     0.001     0.000     0.000     1000   Regexp#match
 16.03      0.005     0.001     0.000     0.004     1000   Kernel#catch
 15.85      0.004     0.001     0.000     0.002     2000   Hash#each
 13.13      0.006     0.001     0.000     0.005     1000   Fluent::Plugin::GrepFilter#filter
 10.82      0.001     0.001     0.000     0.000     1000   Fluent::MultiEventStream#add
  8.70      0.002     0.001     0.000     0.001     1000   <Module::Fluent::Compat::StringUtil>#match_regexp
  3.63      0.000     0.000     0.000     0.000     1000   NilClass#to_s
  0.16      0.008     0.000     0.000     0.008        1   Fluent::EventRouter::Pipeline::FilterOptimizer#filter_stream
  0.09      0.008     0.000     0.000     0.008        1   Fluent::Plugin::Filter#filter_stream
  0.07      0.008     0.000     0.000     0.008        2  *Array#each
  0.07      0.008     0.000     0.000     0.008        1   Fluent::ArrayEventStream#each
  0.05      0.000     0.000     0.000     0.000        1   Fluent::MultiEventStream#initialize
  0.04      0.008     0.000     0.000     0.008        1   Enumerable#reduce
  0.04      0.000     0.000     0.000     0.000        1   Class#new

The result of calling @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }

* indicates recursively called methods
Measure Mode: wall_time
Thread ID: 70182152853480
Fiber ID: 70182152439780
Total: 0.008376
Sort by: self_time

 %self      total      self      wait     child     calls  name
 14.62      0.003     0.001     0.000     0.002     2000   Hash#each
 11.96      0.008     0.001     0.000     0.007     2000  *Kernel#catch
 10.94      0.005     0.001     0.000     0.004     1000   Fluent::Plugin::GrepFilter#filter
  8.15      0.008     0.001     0.000     0.008     1001  *Array#each
  8.12      0.001     0.001     0.000     0.001     1000   <Module::Fluent::Compat::StringUtil>#match_regexp
  7.74      0.001     0.001     0.000     0.000     1000   Regexp#match
  6.93      0.001     0.001     0.000     0.000     1000   Fluent::MultiEventStream#add
  4.40      0.000     0.000     0.000     0.000     1000   NilClass#to_s
  0.12      0.008     0.000     0.000     0.008        1   Fluent::EventRouter::Pipeline::FilterOptimizer#filter_stream
  0.07      0.008     0.000     0.000     0.008        1   Fluent::EventRouter::Pipeline::FilterOptimizer#optimized_filter_stream
  0.05      0.000     0.000     0.000     0.000        1   Class#new
  0.04      0.008     0.000     0.000     0.008        1   Fluent::ArrayEventStream#each
  0.04      0.000     0.000     0.000     0.000        1   Fluent::MultiEventStream#initialize

This is conf file.

<source>
  @type dummy
  tag   test
  size 1000
</source>

<filter test>
  @type grep
  exclude1 hello .
</filter>

<match test>
  @type buffered_null
</match>

@tagomoris tagomoris added enhancement Feature request or improve operations v0.14 labels Aug 10, 2016
@tagomoris
Copy link
Member

#1140 was merged into master.
@ganmacs Could you rebase for detailed code review?

@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch 2 times, most recently from 2cbaff5 to 3165a0d Compare August 10, 2016 08:23
@filters.each do |filter|
if filter.has_filter_with_time
begin
filtered_time, filtered_record = *filter.filter_with_time(tag, filtered_time, filtered_record)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no need to use splat (*) to return 2 values from method

@tagomoris
Copy link
Member

Some required test cases are missing. For example:

  • only one of some filters has #filter_stream method
  • filter plugin implements #filter
  • filter plugin implements #filter_with_time
  • filters, some implement #filter and others implement #filter_with_time

This pull-request includes the change how to handle/call methods of filters. So tests above are required.

@nurse
Copy link
Contributor

nurse commented Aug 10, 2016

Just a thought, stackprof is better for performance tuning.

@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch 2 times, most recently from 9b46e48 to 860b7be Compare August 11, 2016 02:03
@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch 2 times, most recently from e46bf8a to da6a163 Compare August 12, 2016 02:09
@ganmacs
Copy link
Member Author

ganmacs commented Aug 12, 2016

@tagomoris

Added these tests.

only one of some filters has #filter_stream method
filter plugin implements #filter
filter plugin implements #filter_with_time
filters, some implement #filter and others implement #filter_with_time

@now = Engine.now
end

test 'call optimized filter when filter plugin that has #method' do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#filter instead of #method, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tagomoris
Copy link
Member

@ganmacs please check and fix the subjects of tests newly added (especially for English syntax etc).

@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch 2 times, most recently from 60dad98 to 32c875d Compare August 17, 2016 01:12
end
end

test "don't call optimized filter whenf only one of some filters has #filter_stream method" do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"whenf"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"even if just a filter of some filters implements #filter_stream method"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end
end

test 'call optimized filter when filter plugin that has #filter_with_time' do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"when the filter plugin implements #filter_with_time without #filter_stream"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch from 32c875d to e1de49e Compare August 18, 2016 07:32
@ganmacs ganmacs force-pushed the change-piplelineing-rule-to-speed-up branch from e1de49e to 7bf115b Compare August 18, 2016 07:37
@tagomoris
Copy link
Member

LGTM. Thank you!

@tagomoris tagomoris merged commit 0be9a64 into fluent:master Aug 19, 2016
cosmo0920 pushed a commit to cosmo0920/fluent-plugin-anonymizer that referenced this pull request Mar 1, 2017
Because in v0.14, #filter will be optimized.
So, we can remove this method for v0.14.

see: fluent/fluentd#1145
cosmo0920 added a commit to cosmo0920/fluent-plugin-anonymizer that referenced this pull request Mar 1, 2017
Because in v0.14, #filter will be optimized.
So, we can remove this method for v0.14.

see: fluent/fluentd#1145
@ganmacs ganmacs deleted the change-piplelineing-rule-to-speed-up branch November 28, 2019 00:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Feature request or improve operations v0.14
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants