Skip to content

Commit

Permalink
port RoundRobinOutput plugin to v0.14 API
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Jun 29, 2016
1 parent ede9831 commit e647bf5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 68 deletions.
52 changes: 13 additions & 39 deletions lib/fluent/plugin/out_roundrobin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,42 @@
# limitations under the License.
#

require 'fluent/output'
require 'fluent/plugin/multi_output'
require 'fluent/config/error'

module Fluent
module Fluent::Plugin
class RoundRobinOutput < MultiOutput
Plugin.register_output('roundrobin', self)
Fluent::Plugin.register_output('roundrobin', self)

config_section :store do
config_param :weight, :integer, default: 1
end

def initialize
super

@outputs = []
@weights = []
end

attr_reader :outputs, :weights
attr_reader :weights
attr_accessor :rand_seed

def configure(conf)
super

conf.elements.select {|e|
e.name == 'store'
}.each {|e|
type = e['@type']
unless type
raise ConfigError, "Missing 'type' parameter on <store> directive"
end

weight = e['weight']
weight = weight ? weight.to_i : 1
log.debug "adding store type=#{type.dump}, weight=#{weight}"

output = Plugin.new_output(type)
output.router = router
output.configure(e)
@outputs << output
@weights << weight
}
@stores.each do |store|
@weights << store.weight
end
@rr = -1 # starts from @output[0]
@rand_seed = Random.new.seed
end

def start
super

rebuild_weight_array

@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
next_output.emit(tag, es, chain)
def process(tag, es)
next_output.emit_events(tag, es)
end

private
Expand Down
64 changes: 35 additions & 29 deletions test/plugin/test_out_roundrobin.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
require_relative '../helper'
require 'fluent/test'
require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_roundrobin'

class RoundRobinOutputTest < Test::Unit::TestCase
class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
require 'fluent/plugin/out_test'
require 'fluent/plugin/out_test2'
end

def shutdown
Expand All @@ -20,47 +21,54 @@ def setup

CONFIG = %[
<store>
type test
@type test
name c0
</store>
<store>
type test
@type test2
name c1
</store>
<store>
type test
@type test
name c2
</store>
]
CONFIG_WITH_WEIGHT = %[
<store>
type test
@type test
name c0
weight 3
</store>
<store>
type test
@type test2
name c1
weight 3
</store>
<store>
type test
@type test
name c2
</store>
]

def create_driver(conf = CONFIG)
Fluent::Test::OutputTestDriver.new(Fluent::RoundRobinOutput).configure(conf)
Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::RoundRobinOutput).configure(conf)
end

def test_configure
d = create_driver

outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::TestOutput, outputs[0].class
assert_equal Fluent::TestOutput, outputs[1].class
assert_equal Fluent::TestOutput, outputs[2].class

assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
assert_equal Fluent::Plugin::TestOutput, outputs[2].class

assert !outputs[0].has_router?
assert outputs[1].has_router?
assert outputs[1].router
assert !outputs[2].has_router?

assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
Expand All @@ -75,9 +83,11 @@ def test_configure

outputs = d.instance.outputs
assert_equal 3, outputs.size
assert_equal Fluent::TestOutput, outputs[0].class
assert_equal Fluent::TestOutput, outputs[1].class
assert_equal Fluent::TestOutput, outputs[2].class

assert_equal Fluent::Plugin::TestOutput, outputs[0].class
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
assert_equal Fluent::Plugin::TestOutput, outputs[2].class

assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
Expand All @@ -89,15 +99,15 @@ def test_configure
assert_equal 1, weights[2]
end

def test_emit
def test_events_feeded_to_plugins_by_roundrobin
d = create_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.run do
d.emit({"a"=>1}, time)
d.emit({"a"=>2}, time)
d.emit({"a"=>3}, time)
d.emit({"a"=>4}, time)
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test') do
d.feed(time, {"a" => 1})
d.feed(time, {"a" => 2})
d.feed(time, {"a" => 3})
d.feed(time, {"a" => 4})
end

os = d.instance.outputs
Expand All @@ -114,19 +124,15 @@ def test_emit
assert_equal [
[time, {"a"=>3}],
], os[2].events

d.instance.outputs.each {|o|
assert_not_nil o.router
}
end

def test_emit_weighted
def test_events_feeded_with_specified_weights
d = create_driver(CONFIG_WITH_WEIGHT)

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.run do
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: 'test') do
14.times do |i|
d.emit({"a"=>i}, time)
d.feed(time, {"a" => i})
end
end

Expand Down

0 comments on commit e647bf5

Please sign in to comment.