Skip to content

Commit e3426be

Browse files
committed
port RoundRobinOutput plugin to v0.14 API
1 parent ee0a9bc commit e3426be

File tree

2 files changed

+48
-68
lines changed

2 files changed

+48
-68
lines changed

lib/fluent/plugin/out_roundrobin.rb

+13-39
Original file line numberDiff line numberDiff line change
@@ -14,68 +14,42 @@
1414
# limitations under the License.
1515
#
1616

17-
require 'fluent/output'
17+
require 'fluent/plugin/multi_output'
1818
require 'fluent/config/error'
1919

20-
module Fluent
20+
module Fluent::Plugin
2121
class RoundRobinOutput < MultiOutput
22-
Plugin.register_output('roundrobin', self)
22+
Fluent::Plugin.register_output('roundrobin', self)
23+
24+
config_section :store do
25+
config_param :weight, :integer, default: 1
26+
end
2327

2428
def initialize
2529
super
26-
27-
@outputs = []
2830
@weights = []
2931
end
3032

31-
attr_reader :outputs, :weights
33+
attr_reader :weights
3234
attr_accessor :rand_seed
3335

3436
def configure(conf)
3537
super
3638

37-
conf.elements.select {|e|
38-
e.name == 'store'
39-
}.each {|e|
40-
type = e['@type']
41-
unless type
42-
raise ConfigError, "Missing 'type' parameter on <store> directive"
43-
end
44-
45-
weight = e['weight']
46-
weight = weight ? weight.to_i : 1
47-
log.debug "adding store type=#{type.dump}, weight=#{weight}"
48-
49-
output = Plugin.new_output(type)
50-
output.router = router
51-
output.configure(e)
52-
@outputs << output
53-
@weights << weight
54-
}
39+
@stores.each do |store|
40+
@weights << store.weight
41+
end
5542
@rr = -1 # starts from @output[0]
5643
@rand_seed = Random.new.seed
5744
end
5845

5946
def start
6047
super
61-
6248
rebuild_weight_array
63-
64-
@outputs.each do |o|
65-
o.start unless o.started?
66-
end
67-
end
68-
69-
def shutdown
70-
@outputs.each do |o|
71-
o.shutdown unless o.shutdown?
72-
end
73-
74-
super
7549
end
7650

77-
def emit(tag, es, chain)
78-
next_output.emit(tag, es, chain)
51+
def process(tag, es)
52+
next_output.emit_events(tag, es)
7953
end
8054

8155
private

test/plugin/test_out_roundrobin.rb

+35-29
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
require_relative '../helper'
2-
require 'fluent/test'
2+
require 'fluent/test/driver/multi_output'
33
require 'fluent/plugin/out_roundrobin'
44

55
class RoundRobinOutputTest < Test::Unit::TestCase
66
class << self
77
def startup
88
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
99
require 'fluent/plugin/out_test'
10+
require 'fluent/plugin/out_test2'
1011
end
1112

1213
def shutdown
@@ -20,47 +21,54 @@ def setup
2021

2122
CONFIG = %[
2223
<store>
23-
type test
24+
@type test
2425
name c0
2526
</store>
2627
<store>
27-
type test
28+
@type test2
2829
name c1
2930
</store>
3031
<store>
31-
type test
32+
@type test
3233
name c2
3334
</store>
3435
]
3536
CONFIG_WITH_WEIGHT = %[
3637
<store>
37-
type test
38+
@type test
3839
name c0
3940
weight 3
4041
</store>
4142
<store>
42-
type test
43+
@type test2
4344
name c1
4445
weight 3
4546
</store>
4647
<store>
47-
type test
48+
@type test
4849
name c2
4950
</store>
5051
]
5152

5253
def create_driver(conf = CONFIG)
53-
Fluent::Test::OutputTestDriver.new(Fluent::RoundRobinOutput).configure(conf)
54+
Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::RoundRobinOutput).configure(conf)
5455
end
5556

5657
def test_configure
5758
d = create_driver
5859

5960
outputs = d.instance.outputs
6061
assert_equal 3, outputs.size
61-
assert_equal Fluent::TestOutput, outputs[0].class
62-
assert_equal Fluent::TestOutput, outputs[1].class
63-
assert_equal Fluent::TestOutput, outputs[2].class
62+
63+
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
64+
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
65+
assert_equal Fluent::Plugin::TestOutput, outputs[2].class
66+
67+
assert !outputs[0].has_router?
68+
assert outputs[1].has_router?
69+
assert outputs[1].router
70+
assert !outputs[2].has_router?
71+
6472
assert_equal "c0", outputs[0].name
6573
assert_equal "c1", outputs[1].name
6674
assert_equal "c2", outputs[2].name
@@ -75,9 +83,11 @@ def test_configure
7583

7684
outputs = d.instance.outputs
7785
assert_equal 3, outputs.size
78-
assert_equal Fluent::TestOutput, outputs[0].class
79-
assert_equal Fluent::TestOutput, outputs[1].class
80-
assert_equal Fluent::TestOutput, outputs[2].class
86+
87+
assert_equal Fluent::Plugin::TestOutput, outputs[0].class
88+
assert_equal Fluent::Plugin::Test2Output, outputs[1].class
89+
assert_equal Fluent::Plugin::TestOutput, outputs[2].class
90+
8191
assert_equal "c0", outputs[0].name
8292
assert_equal "c1", outputs[1].name
8393
assert_equal "c2", outputs[2].name
@@ -89,15 +99,15 @@ def test_configure
8999
assert_equal 1, weights[2]
90100
end
91101

92-
def test_emit
102+
def test_events_feeded_to_plugins_by_roundrobin
93103
d = create_driver
94104

95-
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
96-
d.run do
97-
d.emit({"a"=>1}, time)
98-
d.emit({"a"=>2}, time)
99-
d.emit({"a"=>3}, time)
100-
d.emit({"a"=>4}, time)
105+
time = event_time("2011-01-02 13:14:15 UTC")
106+
d.run(default_tag: 'test') do
107+
d.feed(time, {"a" => 1})
108+
d.feed(time, {"a" => 2})
109+
d.feed(time, {"a" => 3})
110+
d.feed(time, {"a" => 4})
101111
end
102112

103113
os = d.instance.outputs
@@ -114,19 +124,15 @@ def test_emit
114124
assert_equal [
115125
[time, {"a"=>3}],
116126
], os[2].events
117-
118-
d.instance.outputs.each {|o|
119-
assert_not_nil o.router
120-
}
121127
end
122128

123-
def test_emit_weighted
129+
def test_events_feeded_with_specified_weights
124130
d = create_driver(CONFIG_WITH_WEIGHT)
125131

126-
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
127-
d.run do
132+
time = event_time("2011-01-02 13:14:15 UTC")
133+
d.run(default_tag: 'test') do
128134
14.times do |i|
129-
d.emit({"a"=>i}, time)
135+
d.feed(time, {"a" => i})
130136
end
131137
end
132138

0 commit comments

Comments
 (0)