diff --git a/example/copy_roundrobin.conf b/example/copy_roundrobin.conf
new file mode 100644
index 0000000000..935d29a45c
--- /dev/null
+++ b/example/copy_roundrobin.conf
@@ -0,0 +1,39 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb
index c60eeb6966..7100042b9e 100644
--- a/lib/fluent/plugin/out_copy.rb
+++ b/lib/fluent/plugin/out_copy.rb
@@ -14,72 +14,29 @@
# limitations under the License.
#
-require 'fluent/output'
+require 'fluent/plugin/multi_output'
require 'fluent/config/error'
require 'fluent/event'
-module Fluent
+module Fluent::Plugin
class CopyOutput < MultiOutput
- Plugin.register_output('copy', self)
+ Fluent::Plugin.register_output('copy', self)
desc 'If true, pass different record to each `store` plugin.'
config_param :deep_copy, :bool, default: false
- def initialize
- super
- @outputs = []
- end
-
- attr_reader :outputs
-
- def configure(conf)
- super
- conf.elements.select {|e|
- e.name == 'store'
- }.each {|e|
- type = e['@type']
- unless type
- raise ConfigError, "Missing 'type' parameter on directive"
- end
- log.debug "adding store type=#{type.dump}"
-
- output = Plugin.new_output(type)
- output.router = router
- output.configure(e)
- @outputs << output
- }
- end
-
- def start
- super
-
- @outputs.each do |o|
- o.start unless o.started?
- end
- end
-
- def shutdown
- @outputs.each do |o|
- o.shutdown unless o.shutdown?
- end
-
- super
- end
-
- def emit(tag, es, chain)
+ def process(tag, es)
unless es.repeatable?
- m = MultiEventStream.new
+ m = Fluent::MultiEventStream.new
es.each {|time,record|
m.add(time, record)
}
es = m
end
- if @deep_copy
- chain = CopyOutputChain.new(@outputs, tag, es, chain)
- else
- chain = OutputChain.new(@outputs, tag, es, chain)
+
+ outputs.each do |output|
+ output.emit_events(tag, @deep_copy ? es.dup : es)
end
- chain.next
end
end
end
diff --git a/lib/fluent/plugin/out_roundrobin.rb b/lib/fluent/plugin/out_roundrobin.rb
index 3e327195f1..d66af41c26 100644
--- a/lib/fluent/plugin/out_roundrobin.rb
+++ b/lib/fluent/plugin/out_roundrobin.rb
@@ -14,68 +14,41 @@
# limitations under the License.
#
-require 'fluent/output'
+require 'fluent/plugin/multi_output'
require 'fluent/config/error'
-module Fluent
+module Fluent::Plugin
class RoundRobinOutput < MultiOutput
- Plugin.register_output('roundrobin', self)
+ Fluent::Plugin.register_output('roundrobin', self)
+
+ config_section :store do
+ config_param :weight, :integer, default: 1
+ end
def initialize
super
-
- @outputs = []
@weights = []
end
- attr_reader :outputs, :weights
- attr_accessor :rand_seed
+ attr_reader :weights
def configure(conf)
super
- conf.elements.select {|e|
- e.name == 'store'
- }.each {|e|
- type = e['@type']
- unless type
- raise ConfigError, "Missing 'type' parameter on directive"
- end
-
- weight = e['weight']
- weight = weight ? weight.to_i : 1
- log.debug "adding store type=#{type.dump}, weight=#{weight}"
-
- output = Plugin.new_output(type)
- output.router = router
- output.configure(e)
- @outputs << output
- @weights << weight
- }
+ @stores.each do |store|
+ @weights << store.weight
+ end
@rr = -1 # starts from @output[0]
@rand_seed = Random.new.seed
end
def start
super
-
rebuild_weight_array
-
- @outputs.each do |o|
- o.start unless o.started?
- end
- end
-
- def shutdown
- @outputs.each do |o|
- o.shutdown unless o.shutdown?
- end
-
- super
end
- def emit(tag, es, chain)
- next_output.emit(tag, es, chain)
+ def process(tag, es)
+ next_output.emit_events(tag, es)
end
private
diff --git a/lib/fluent/test/driver/multi_output.rb b/lib/fluent/test/driver/multi_output.rb
new file mode 100644
index 0000000000..a51c65e162
--- /dev/null
+++ b/lib/fluent/test/driver/multi_output.rb
@@ -0,0 +1,52 @@
+#
+# Fluentd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'fluent/test/driver/base'
+require 'fluent/test/driver/event_feeder'
+
+require 'fluent/plugin/multi_output'
+
+module Fluent
+ module Test
+ module Driver
+ class MultiOutput < Base
+ include EventFeeder
+
+ def initialize(klass, opts: {}, &block)
+ super
+ raise ArgumentError, "plugin is not an instance of Fluent::Plugin::MultiOutput" unless @instance.is_a? Fluent::Plugin::MultiOutput
+ @flush_buffer_at_cleanup = nil
+ end
+
+ def run(flush: true, **kwargs, &block)
+ @flush_buffer_at_cleanup = flush
+ super(**kwargs, &block)
+ end
+
+ def run_actual(**kwargs, &block)
+ super(**kwargs, &block)
+ if @flush_buffer_at_cleanup
+ @instance.outputs.each{|o| o.force_flush }
+ end
+ end
+
+ def flush
+ @instance.outputs.each{|o| o.force_flush }
+ end
+ end
+ end
+ end
+end
diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb
index 2241ba07b0..e1dbb055af 100644
--- a/test/plugin/test_out_copy.rb
+++ b/test/plugin/test_out_copy.rb
@@ -1,12 +1,14 @@
require_relative '../helper'
-require 'fluent/test'
+require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_copy'
+require 'fluent/event'
class CopyOutputTest < Test::Unit::TestCase
class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
require 'fluent/plugin/out_test'
+ require 'fluent/plugin/out_test2'
end
def shutdown
@@ -20,21 +22,21 @@ def setup
CONFIG = %[
- type test
+ @type test
name c0
- type test
+ @type test2
name c1
- type test
+ @type test
name c2
]
def create_driver(conf = CONFIG)
- Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput).configure(conf)
+ Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(conf)
end
def test_configure
@@ -42,158 +44,117 @@ def test_configure
outputs = d.instance.outputs
assert_equal 3, outputs.size
- assert_equal Fluent::TestOutput, outputs[0].class
- assert_equal Fluent::TestOutput, outputs[1].class
- assert_equal Fluent::TestOutput, outputs[2].class
+ assert_equal Fluent::Plugin::TestOutput, outputs[0].class
+ assert_equal Fluent::Plugin::Test2Output, outputs[1].class
+ assert_equal Fluent::Plugin::TestOutput, outputs[2].class
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
end
- def test_emit
+ def test_feed_events
d = create_driver
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
+ assert !d.instance.outputs[0].has_router?
+ assert_not_nil d.instance.outputs[1].router
+ assert !d.instance.outputs[2].has_router?
- d.instance.outputs.each {|o|
- assert_equal [
- [time, {"a"=>1}],
- [time, {"a"=>2}],
- ], o.events
- }
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: 'test') do
+ d.feed(time, {"a" => 1})
+ d.feed(time, {"a" => 2})
+ end
d.instance.outputs.each {|o|
- assert_not_nil o.router
+ assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end
- def test_msgpack_es_emit_bug
- d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput)
-
- outputs = %w(p1 p2).map do |pname|
- p = Fluent::Plugin.new_output('test')
- p.configure('name' => pname)
- p.define_singleton_method(:emit) do |tag, es, chain|
- es.each do |time, record|
- super(tag, [[time, record]], chain)
- end
- end
- p
- end
-
- d.instance.instance_eval { @outputs = outputs }
+ def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
+ d = create_driver
- es = if defined?(MessagePack::Packer)
- time = Time.parse("2013-05-26 06:37:22 UTC").to_i
- packer = Fluent::Engine.msgpack_factory.packer
- packer.pack([time, {"a" => 1}])
- packer.pack([time, {"a" => 2}])
- Fluent::MessagePackEventStream.new(packer.to_s)
- else
- events = "#{[time, {"a" => 1}].to_msgpack}#{[time, {"a" => 2}].to_msgpack}"
- Fluent::MessagePackEventStream.new(events)
- end
+ time = event_time("2011-01-02 13:14:15 UTC")
+ source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ])
+ es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream)
- d.instance.emit('test', es, Fluent::NullOutputChain.instance)
+ d.run(default_tag: 'test') do
+ d.feed(es)
+ end
d.instance.outputs.each { |o|
- assert_equal [
- [time, {"a"=>1}],
- [time, {"a"=>2}],
- ], o.events
+ assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events
}
end
- def create_event_test_driver(is_deep_copy = false)
- deep_copy_config = %[
-deep_copy #{is_deep_copy}
-
- type test
- name c0
-
-
- type test
- name c1
-
-
- type test
- name c2
-
-]
-
- output1 = Fluent::Plugin.new_output('test')
- output1.configure('name' => 'output1')
- output1.define_singleton_method(:emit) do |tag, es, chain|
+ def create_event_test_driver(does_deep_copy = false)
+ config = %[
+ deep_copy #{does_deep_copy}
+
+ @type test
+ name output1
+
+
+ @type test
+ name output2
+
+ ]
+
+ d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config)
+ d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
es.each do |time, record|
record['foo'] = 'bar'
- super(tag, [[time, record]], chain)
- end
- end
-
- output2 = Fluent::Plugin.new_output('test')
- output2.configure('name' => 'output2')
- output2.define_singleton_method(:emit) do |tag, es, chain|
- es.each do |time, record|
- super(tag, [[time, record]], chain)
end
+ super(tag, es)
end
-
- outputs = [output1, output2]
-
- d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput)
- d = d.configure(deep_copy_config)
- d.instance.instance_eval { @outputs = outputs }
d
end
- def test_one_event
- time = Time.parse("2013-05-26 06:37:22 UTC").to_i
-
- d = create_event_test_driver(false)
- es = Fluent::OneEventStream.new(time, {"a" => 1})
- d.instance.emit('test', es, Fluent::NullOutputChain.instance)
-
- assert_equal [
- [[time, {"a"=>1, "foo"=>"bar"}]],
- [[time, {"a"=>1, "foo"=>"bar"}]]
- ], d.instance.outputs.map{ |o| o.events }
+ time = event_time("2013-05-26 06:37:22 UTC")
+ mes0 = Fluent::MultiEventStream.new
+ mes0.add(time, {"a" => 1})
+ mes0.add(time, {"b" => 1})
+ mes1 = Fluent::MultiEventStream.new
+ mes1.add(time, {"a" => 1})
+ mes1.add(time, {"b" => 1})
+
+ data(
+ "OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})],
+ "OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})],
+ "ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
+ "ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
+ "MultiEventStream without deep_copy" => [false, mes0],
+ "MultiEventStream with deep_copy" => [true, mes1],
+ )
+ def test_deep_copy_controls_shallow_or_deep_copied(data)
+ does_deep_copy, es = data
+
+ d = create_event_test_driver(does_deep_copy)
+
+ d.run(default_tag: 'test') do
+ d.feed(es)
+ end
- d = create_event_test_driver(true)
- es = Fluent::OneEventStream.new(time, {"a" => 1})
- d.instance.emit('test', es, Fluent::NullOutputChain.instance)
+ events = d.instance.outputs.map(&:events)
- assert_equal [
- [[time, {"a"=>1, "foo"=>"bar"}]],
- [[time, {"a"=>1}]]
- ], d.instance.outputs.map{ |o| o.events }
- end
+ if does_deep_copy
+ events[0].each_with_index do |entry0, i|
+ record0 = entry0.last
+ record1 = events[1][i].last
- def test_multi_event
- time = Time.parse("2013-05-26 06:37:22 UTC").to_i
-
- d = create_event_test_driver(false)
- es = Fluent::MultiEventStream.new
- es.add(time, {"a" => 1})
- es.add(time, {"b" => 2})
- d.instance.emit('test', es, Fluent::NullOutputChain.instance)
-
- assert_equal [
- [[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]],
- [[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]]
- ], d.instance.outputs.map{ |o| o.events }
-
- d = create_event_test_driver(true)
- es = Fluent::MultiEventStream.new
- es.add(time, {"a" => 1})
- es.add(time, {"b" => 2})
- d.instance.emit('test', es, Fluent::NullOutputChain.instance)
-
- assert_equal [
- [[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]],
- [[time, {"a"=>1}], [time, {"b"=>2}]]
- ], d.instance.outputs.map{ |o| o.events }
+ assert{ record0.object_id != record1.object_id }
+ assert_equal "bar", record0["foo"]
+ assert !record1.has_key?("foo")
+ end
+ else
+ events[0].each_with_index do |entry0, i|
+ record0 = entry0.last
+ record1 = events[1][i].last
+
+ assert{ record0.object_id == record1.object_id }
+ assert_equal "bar", record0["foo"]
+ assert_equal "bar", record1["foo"]
+ end
+ end
end
end
diff --git a/test/plugin/test_out_roundrobin.rb b/test/plugin/test_out_roundrobin.rb
index 56722f0174..eaff66770b 100644
--- a/test/plugin/test_out_roundrobin.rb
+++ b/test/plugin/test_out_roundrobin.rb
@@ -1,5 +1,5 @@
require_relative '../helper'
-require 'fluent/test'
+require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_roundrobin'
class RoundRobinOutputTest < Test::Unit::TestCase
@@ -7,6 +7,7 @@ class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
require 'fluent/plugin/out_test'
+ require 'fluent/plugin/out_test2'
end
def shutdown
@@ -20,37 +21,37 @@ def setup
CONFIG = %[
- type test
+ @type test
name c0
- type test
+ @type test2
name c1
- type test
+ @type test
name c2
]
CONFIG_WITH_WEIGHT = %[
- type test
+ @type test
name c0
weight 3
- type test
+ @type test2
name c1
weight 3
- type test
+ @type test
name c2
]
def create_driver(conf = CONFIG)
- Fluent::Test::OutputTestDriver.new(Fluent::RoundRobinOutput).configure(conf)
+ Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::RoundRobinOutput).configure(conf)
end
def test_configure
@@ -58,9 +59,16 @@ def test_configure
outputs = d.instance.outputs
assert_equal 3, outputs.size
- assert_equal Fluent::TestOutput, outputs[0].class
- assert_equal Fluent::TestOutput, outputs[1].class
- assert_equal Fluent::TestOutput, outputs[2].class
+
+ assert_equal Fluent::Plugin::TestOutput, outputs[0].class
+ assert_equal Fluent::Plugin::Test2Output, outputs[1].class
+ assert_equal Fluent::Plugin::TestOutput, outputs[2].class
+
+ assert !outputs[0].has_router?
+ assert outputs[1].has_router?
+ assert outputs[1].router
+ assert !outputs[2].has_router?
+
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
@@ -75,9 +83,11 @@ def test_configure
outputs = d.instance.outputs
assert_equal 3, outputs.size
- assert_equal Fluent::TestOutput, outputs[0].class
- assert_equal Fluent::TestOutput, outputs[1].class
- assert_equal Fluent::TestOutput, outputs[2].class
+
+ assert_equal Fluent::Plugin::TestOutput, outputs[0].class
+ assert_equal Fluent::Plugin::Test2Output, outputs[1].class
+ assert_equal Fluent::Plugin::TestOutput, outputs[2].class
+
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
@@ -89,15 +99,15 @@ def test_configure
assert_equal 1, weights[2]
end
- def test_emit
+ def test_events_feeded_to_plugins_by_roundrobin
d = create_driver
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.run do
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
- d.emit({"a"=>3}, time)
- d.emit({"a"=>4}, time)
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: 'test') do
+ d.feed(time, {"a" => 1})
+ d.feed(time, {"a" => 2})
+ d.feed(time, {"a" => 3})
+ d.feed(time, {"a" => 4})
end
os = d.instance.outputs
@@ -114,19 +124,15 @@ def test_emit
assert_equal [
[time, {"a"=>3}],
], os[2].events
-
- d.instance.outputs.each {|o|
- assert_not_nil o.router
- }
end
- def test_emit_weighted
+ def test_events_feeded_with_specified_weights
d = create_driver(CONFIG_WITH_WEIGHT)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.run do
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: 'test') do
14.times do |i|
- d.emit({"a"=>i}, time)
+ d.feed(time, {"a" => i})
end
end
diff --git a/test/scripts/fluent/plugin/out_test.rb b/test/scripts/fluent/plugin/out_test.rb
index 7a1cd75298..3dd66bf83f 100644
--- a/test/scripts/fluent/plugin/out_test.rb
+++ b/test/scripts/fluent/plugin/out_test.rb
@@ -14,17 +14,25 @@
# limitations under the License.
#
-module Fluent
+require 'fluent/plugin/output'
+require 'fluent/event'
+
+module Fluent::Plugin
class TestOutput < Output
- Plugin.register_output('test', self)
+ Fluent::Plugin.register_output('test', self)
+
+ config_param :name, :string
+
+ config_section :buffer do
+ config_set_default :chunk_keys, ['tag']
+ end
def initialize
super
@emit_streams = []
- @name = nil
end
- attr_reader :emit_streams, :name
+ attr_reader :emit_streams
def emits
all = []
@@ -54,23 +62,20 @@ def records
all
end
- def configure(conf)
- if name = conf['name']
- @name = name
- end
- end
-
- def start
- super
+ def prefer_buffered_processing
+ false
end
- def shutdown
- super
+ def process(tag, es)
+ @emit_streams << [tag, es.to_a]
end
- def emit(tag, es, chain)
- chain.next
- @emit_streams << [tag, es.to_a]
+ def write(chunk)
+ es = Fluent::ArrayEventStream.new
+ chunk.each do |time, record|
+ es.add(time, record)
+ end
+ @emit_streams << [tag, es]
end
end
end
diff --git a/test/scripts/fluent/plugin/out_test2.rb b/test/scripts/fluent/plugin/out_test2.rb
new file mode 100644
index 0000000000..aefacc9a4d
--- /dev/null
+++ b/test/scripts/fluent/plugin/out_test2.rb
@@ -0,0 +1,80 @@
+#
+# Fluentd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Fluent::Plugin
+ class Test2Output < Output
+ Fluent::Plugin.register_output('test2', self)
+
+ helpers :event_emitter
+
+ config_param :name, :string
+
+ config_section :buffer do
+ config_set_default :chunk_keys, ['tag']
+ end
+
+ def initialize
+ super
+ @emit_streams = []
+ end
+
+ attr_reader :emit_streams
+
+ def emits
+ all = []
+ @emit_streams.each {|tag,events|
+ events.each {|time,record|
+ all << [tag, time, record]
+ }
+ }
+ all
+ end
+
+ def events
+ all = []
+ @emit_streams.each {|tag,events|
+ all.concat events
+ }
+ all
+ end
+
+ def records
+ all = []
+ @emit_streams.each {|tag,events|
+ events.each {|time,record|
+ all << record
+ }
+ }
+ all
+ end
+
+ def prefer_buffered_processing
+ false
+ end
+
+ def process(tag, es)
+ @emit_streams << [tag, es.to_a]
+ end
+
+ def write(chunk)
+ es = Fluent::ArrayEventStream.new
+ chunk.each do |time, record|
+ es.add(time, record)
+ end
+ @emit_streams << [tag, es]
+ end
+ end
+end
diff --git a/test/test_output.rb b/test/test_output.rb
index e40bc2e39e..3bb9160bfd 100644
--- a/test/test_output.rb
+++ b/test/test_output.rb
@@ -16,6 +16,7 @@ class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), 'scripts'))
require 'fluent/plugin/out_test'
+ require 'fluent/plugin/out_test2'
end
def shutdown
@@ -64,54 +65,6 @@ def test_configure
# assert_equal Float, d.instance.retry_wait.class
end
- def test_calc_retry_wait
- omit "too internal test"
-
- # default
- d = create_driver
- d.instance.retry_limit.times {
- # "d.instance.instance_variable_get(:@num_errors) += 1" causes SyntaxError
- d.instance.instance_eval { @num_errors += 1 }
- }
- wait = d.instance.retry_wait * (2 ** (d.instance.retry_limit - 1))
- assert( d.instance.calc_retry_wait > wait - wait / 8.0 )
-
- # max_retry_wait
- d = create_driver(CONFIG + %[max_retry_wait 4])
- d.instance.retry_limit.times {
- d.instance.instance_eval { @num_errors += 1 }
- }
- assert_equal 4, d.instance.calc_retry_wait
- end
-
- def test_calc_retry_wait_with_integer_retry_wait
- omit "too internal test"
-
- d = create_driver(CONFIG + %[retry_wait 2s])
- d.instance.retry_limit.times {
- d.instance.instance_eval { @num_errors += 1 }
- }
- assert_equal true, d.instance.calc_retry_wait.finite?
- end
-
- def test_large_num_retries
- omit "too internal test"
-
- # Test that everything works properly after a very large number of
- # retries and we hit the expected max_retry_wait.
- exp_max_retry_wait = 300
- d = create_driver(CONFIG + %[
- disable_retry_limit true
- max_retry_wait #{exp_max_retry_wait}
- ])
- d.instance.instance_eval { @num_errors += 1000 }
- assert_equal exp_max_retry_wait, d.instance.calc_retry_wait
- d.instance.instance_eval { @num_errors += 1000 }
- assert_equal exp_max_retry_wait, d.instance.calc_retry_wait
- d.instance.instance_eval { @num_errors += 1000 }
- assert_equal exp_max_retry_wait, d.instance.calc_retry_wait
- end
-
def create_mock_driver(conf=CONFIG)
Fluent::Test::BufferedOutputTestDriver.new(Fluent::BufferedOutput) do
attr_accessor :submit_flush_threads
@@ -141,78 +94,15 @@ def write(chunk)
end.configure(conf)
end
- def test_submit_flush_target
- omit "too internal test"
-
- # default
- d = create_mock_driver
- d.instance.start_mock
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.shutdown
- assert_equal 1, d.instance.submit_flush_threads.size
-
- # num_threads 4
- d = create_mock_driver(CONFIG + %[num_threads 4])
- d.instance.start
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 1, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 2, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 3, d.instance.instance_variable_get('@writer_current_position')
- d.instance.submit_flush
- assert_equal 0, d.instance.instance_variable_get('@writer_current_position')
- d.instance.shutdown
- assert (d.instance.submit_flush_threads.size > 1), "fails if only one thread works to submit flush"
- end
-
def test_secondary
d = create_driver(CONFIG + %[
- type test
+ type test2
name c0
])
assert_not_nil d.instance.instance_variable_get(:@secondary).router
end
-
- sub_test_case "test_force_flush" do
- setup do
- time = Time.parse("2011-01-02 13:14:15 UTC")
- Timecop.freeze(time)
- @time = time.to_i
- end
-
- teardown do
- Timecop.return
- end
-
- test "force_flush works on retrying" do
- omit "too internal test"
-
- d = create_driver(CONFIG)
- d.instance.start
- buffer = d.instance.instance_variable_get(:@buffer)
- # imitate 10 failures
- d.instance.instance_variable_set(:@num_errors, 10)
- d.instance.instance_variable_set(:@next_retry_time, @time + d.instance.calc_retry_wait)
- # buffer should be popped (flushed) immediately
- flexmock(buffer).should_receive(:pop).once
- # force_flush
- buffer.emit("test", 'test', NullOutputChain.instance)
- d.instance.force_flush
- 10.times { sleep 0.05 }
- end
- end
end
class ObjectBufferedOutputTest < ::Test::Unit::TestCase
@@ -262,33 +152,6 @@ def create_driver(conf=CONFIG)
Fluent::Test::TimeSlicedOutputTestDriver.new(TimeSlicedOutputTestPlugin).configure(conf, true)
end
- sub_test_case "force_flush test" do
- setup do
- time = Time.parse("2011-01-02 13:14:15 UTC")
- Timecop.freeze(time)
- @es = OneEventStream.new(time.to_i, {"message" => "foo"})
- end
-
- teardown do
- Timecop.return
- end
-
- test "force_flush immediately flushes" do
- omit "too internal test"
-
- d = create_driver(CONFIG + %[
- time_format %Y%m%d%H%M%S
- ])
- d.instance.start
- # buffer should be popped (flushed) immediately
- flexmock(d.instance.instance_variable_get(:@buffer)).should_receive(:pop).once
- # force_flush
- d.instance.emit('test', @es, NullOutputChain.instance)
- d.instance.force_flush
- 10.times { sleep 0.05 }
- end
- end
-
sub_test_case "test emit" do
setup do
@time = Time.parse("2011-01-02 13:14:15 UTC")
@@ -299,19 +162,6 @@ def create_driver(conf=CONFIG)
Timecop.return
end
- test "emit with valid event" do
- omit "there's no #emit method anymore in output plugins"
-
- d = create_driver
- d.instance.start
- if d.instance.method(:emit).arity == 3
- d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"}), NullOutputChain.instance)
- else
- d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"}))
- end
- assert_equal 0, d.instance.log.logs.size
- end
-
test "emit with invalid event" do
d = create_driver
d.instance.start