diff --git a/example/in_forward.conf b/example/in_forward.conf
index ff7598075f..a639738f14 100644
--- a/example/in_forward.conf
+++ b/example/in_forward.conf
@@ -8,4 +8,7 @@
@type stdout
+ #
+ # flush_interval 10s
+ #
diff --git a/example/out_buffered_null.conf b/example/out_null.conf
similarity index 74%
rename from example/out_buffered_null.conf
rename to example/out_null.conf
index a6f2e928e0..0c13dd4769 100644
--- a/example/out_buffered_null.conf
+++ b/example/out_null.conf
@@ -13,16 +13,20 @@
- @type buffered_null
- try_flush_interval 60
- flush_interval 60
- buffer_chunk_limit 1k
- buffer_queue_limit 2
+ @type null
+
+ flush_interval 60s
+ chunk_limit_size 1k
+ total_limit_size 4k
+
diff --git a/lib/fluent/plugin/out_buffered_null.rb b/lib/fluent/plugin/out_buffered_null.rb
deleted file mode 100644
index e605dd1fa8..0000000000
--- a/lib/fluent/plugin/out_buffered_null.rb
+++ /dev/null
@@ -1,59 +0,0 @@
-#
-# Fluentd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'fluent/plugin/output'
-
-module Fluent::Plugin
- class BufferedNullOutput < Output
- # This plugin is for tests of buffer plugins
-
- Fluent::Plugin.register_output('buffered_null', self)
-
- config_section :buffer do
- config_set_default :chunk_keys, ['tag']
- config_set_default :flush_at_shutdown, true
- config_set_default :chunk_limit_size, 10 * 1024
- end
-
- attr_accessor :feed_proc, :delayed
-
- def initialize
- super
- @delayed = false
- @feed_proc = nil
- end
-
- def prefer_delayed_commit
- @delayed
- end
-
- def write(chunk)
- if @feed_proc
- @feed_proc.call(chunk)
- else
- # ignore chunk.read
- end
- end
-
- def try_write(chunk)
- if @feed_proc
- @feed_proc.call(chunk)
- else
- # ignore chunk.read
- end
- end
- end
-end
diff --git a/lib/fluent/plugin/out_buffered_stdout.rb b/lib/fluent/plugin/out_buffered_stdout.rb
deleted file mode 100644
index 1630b15024..0000000000
--- a/lib/fluent/plugin/out_buffered_stdout.rb
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Fluentd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-require 'fluent/plugin/output'
-
-module Fluent::Plugin
- class BufferedStdoutOutput < Output
- Fluent::Plugin.register_output('buffered_stdout', self)
-
- helpers :formatter, :inject, :compat_parameters
-
- config_section :buffer do
- config_set_default :chunk_keys, ['tag']
- config_set_default :flush_at_shutdown, true
- config_set_default :chunk_limit_size, 10 * 1024
- end
-
- DEFAULT_FORMAT_TYPE = 'json'
-
- config_section :format do
- config_set_default :@type, DEFAULT_FORMAT_TYPE
- end
-
- attr_accessor :delayed
-
- def initialize
- super
- @delayed = false
- end
-
- def prefer_delayed_commit
- @delayed
- end
-
- def configure(conf)
- if conf['output_type'] && !conf['format']
- conf['format'] = conf['output_type']
- end
- compat_parameters_convert(conf, :inject, :formatter)
- super
- @formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
- end
-
- def write(chunk)
- chunk.write_to($log)
- end
-
- def try_write(chunk)
- chunk.write_to($log)
- end
-
- def format(tag, time, record)
- record = inject_values_to_record(tag, time, record)
- "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
- end
- end
-end
diff --git a/lib/fluent/plugin/out_null.rb b/lib/fluent/plugin/out_null.rb
index e95bacf366..5d9a27aec0 100644
--- a/lib/fluent/plugin/out_null.rb
+++ b/lib/fluent/plugin/out_null.rb
@@ -18,10 +18,47 @@
module Fluent::Plugin
class NullOutput < Output
+ # This plugin is for tests of non-buffered/buffered plugins
Fluent::Plugin.register_output('null', self)
+ config_section :buffer do
+ config_set_default :chunk_keys, ['tag']
+ config_set_default :flush_at_shutdown, true
+ config_set_default :chunk_limit_size, 10 * 1024
+ end
+
+ def prefer_buffered_processing
+ false
+ end
+
+ def prefer_delayed_commit
+ @delayed
+ end
+
+ attr_accessor :feed_proc, :delayed
+
+ def initialize
+ super
+ @delayed = false
+ @feed_proc = nil
+ end
+
def process(tag, es)
# Do nothing
end
+
+ def write(chunk)
+ if @feed_proc
+ @feed_proc.call(chunk)
+ end
+ end
+
+ def try_write(chunk)
+ if @feed_proc
+ @feed_proc.call(chunk)
+ end
+ # not to commit chunks for testing
+ # commit_write(chunk.unique_id)
+ end
end
end
diff --git a/lib/fluent/plugin/out_stdout.rb b/lib/fluent/plugin/out_stdout.rb
index 8d187fd83e..9604772346 100644
--- a/lib/fluent/plugin/out_stdout.rb
+++ b/lib/fluent/plugin/out_stdout.rb
@@ -24,25 +24,61 @@ class StdoutOutput < Output
DEFAULT_FORMAT_TYPE = 'json'
+ config_section :buffer do
+ config_set_default :chunk_keys, ['tag']
+ config_set_default :flush_at_shutdown, true
+ config_set_default :chunk_limit_size, 10 * 1024
+ end
+
config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end
+ def prefer_buffered_processing
+ false
+ end
+
+ def prefer_delayed_commit
+ @delayed
+ end
+
+ attr_accessor :delayed
+
+ def initialize
+ super
+ @delayed = false
+ end
+
def configure(conf)
if conf['output_type'] && !conf['format']
conf['format'] = conf['output_type']
end
compat_parameters_convert(conf, :inject, :formatter)
+
super
+
@formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
end
def process(tag, es)
es.each {|time,record|
- r = inject_values_to_record(tag, time, record)
- $log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, r).chomp}\n"
+ $log.write(format(tag, time, record))
}
$log.flush
end
+
+ def format(tag, time, record)
+ record = inject_values_to_record(tag, time, record)
+ "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
+ end
+
+ def write(chunk)
+ chunk.write_to($log)
+ end
+
+ def try_write(chunk)
+ chunk.write_to($log)
+ commit_write(chunk.unique_id)
+ end
end
end
diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb
index f201d92f89..232938438f 100644
--- a/test/plugin/test_in_monitor_agent.rb
+++ b/test/plugin/test_in_monitor_agent.rb
@@ -119,6 +119,8 @@ def test_configure
"@id"=>"null",
"@type" => "null"
},
+ "buffer_queue_length" => 0,
+ "buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "null",
@@ -285,6 +287,8 @@ def get(uri, header = {})
"@id" => "null",
"@type" => "null"
},
+ "buffer_queue_length" => 0,
+ "buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "null",
diff --git a/test/plugin/test_out_buffered_null.rb b/test/plugin/test_out_buffered_null.rb
deleted file mode 100644
index 269074b44f..0000000000
--- a/test/plugin/test_out_buffered_null.rb
+++ /dev/null
@@ -1,79 +0,0 @@
-require_relative '../helper'
-require 'fluent/test/driver/output'
-require 'fluent/plugin/out_buffered_null'
-
-class BufferedNullOutputTestCase < Test::Unit::TestCase
- sub_test_case 'BufferedNullOutput' do
- test 'default chunk limit size is 100' do
- d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('')
- assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size
- assert d.instance.buffer_config.flush_at_shutdown
- assert_equal ['tag'], d.instance.buffer_config.chunk_keys
- assert d.instance.chunk_key_tag
- assert !d.instance.chunk_key_time
- assert_equal [], d.instance.chunk_keys
- end
-
- test 'writes standard formattted chunks' do
- d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('')
- t = event_time("2016-05-23 00:22:13 -0800")
- d.run(default_tag: 'test', flush: true) do
- d.feed(t, {"message" => "null null null"})
- d.feed(t, {"message" => "null null"})
- d.feed(t, {"message" => "null"})
- end
-
- assert_equal 3, d.instance.emit_count
- assert_equal 3, d.instance.emit_records
- end
-
- test 'check for chunk passed to #write' do
- d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('')
- data = []
- d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] }
-
- t = event_time("2016-05-23 00:22:13 -0800")
- d.run(default_tag: 'test', flush: true) do
- d.feed(t, {"message" => "null null null"})
- d.feed(t, {"message" => "null null"})
- d.feed(t, {"message" => "null"})
- end
-
- assert_equal 1, data.size
- _, tag, binary = data.first
- events = []
- Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj }
- assert_equal 'test', tag
- assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events
- end
-
- test 'check for chunk passed to #try_write' do
- d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('')
- data = []
- d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] }
- d.instance.delayed = true
-
- t = event_time("2016-05-23 00:22:13 -0800")
- d.run(default_tag: 'test', flush: true, shutdown: false) do
- d.feed(t, {"message" => "null null null"})
- d.feed(t, {"message" => "null null"})
- d.feed(t, {"message" => "null"})
- end
-
- assert_equal 1, data.size
- chunk_id, tag, binary = data.first
- events = []
- Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj }
- assert_equal 'test', tag
- assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events
-
- assert_equal [chunk_id], d.instance.buffer.dequeued.keys
-
- d.instance.commit_write(chunk_id)
-
- assert_equal [], d.instance.buffer.dequeued.keys
-
- d.instance_shutdown
- end
- end
-end
diff --git a/test/plugin/test_out_buffered_stdout.rb b/test/plugin/test_out_buffered_stdout.rb
deleted file mode 100644
index 6f36a4643e..0000000000
--- a/test/plugin/test_out_buffered_stdout.rb
+++ /dev/null
@@ -1,122 +0,0 @@
-require_relative '../helper'
-require 'fluent/test/driver/output'
-require 'fluent/plugin/out_buffered_stdout'
-
-class BufferedStdoutOutputTest < Test::Unit::TestCase
- def setup
- Fluent::Test.setup
- end
-
- CONFIG = %[
- ]
-
- def create_driver(conf = CONFIG)
- Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedStdoutOutput).configure(conf)
- end
-
- test 'default configure' do
- d = create_driver
- assert_equal [], d.instance.formatter_configs
- assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size
- assert d.instance.buffer_config.flush_at_shutdown
- assert_equal ['tag'], d.instance.buffer_config.chunk_keys
- assert d.instance.chunk_key_tag
- assert !d.instance.chunk_key_time
- assert_equal [], d.instance.chunk_keys
- end
-
- test 'configure with output_type' do
- d = create_driver(CONFIG + "\noutput_type json")
- assert_equal 'json', d.instance.formatter_configs.first[:@type]
-
- d = create_driver(CONFIG + "\noutput_type hash")
- assert_equal 'hash', d.instance.formatter_configs.first[:@type]
-
- assert_raise(Fluent::ConfigError) do
- d = create_driver(CONFIG + "\noutput_type foo")
- end
- end
-
- sub_test_case "emit with default config" do
- test '#write(synchronous)' do
- d = create_driver
- time = event_time()
-
- out = capture_log do
- d.run(default_tag: 'test', flush: true) do
- d.feed(time, {'test' => 'test'})
- end
- end
- assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
- end
- end
-
- sub_test_case "emit json" do
- data('oj' => 'oj', 'yajl' => 'yajl')
- test '#write(synchronous)' do |data|
- d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}")
- time = event_time()
-
- out = capture_log do
- d.run(default_tag: 'test', flush: true) do
- d.feed(time, {'test' => 'test'})
- end
- end
- assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
- end
-
- data('oj' => 'oj', 'yajl' => 'yajl')
- test '#try_write(asynchronous)' do |data|
- d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}")
- time = event_time()
- d.instance.delayed = true
-
- out = capture_log do
- d.run(default_tag: 'test', flush: true, shutdown: false) do
- d.feed(time, {'test' => 'test'})
- end
- end
-
- assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
- end
- end
-
- sub_test_case 'emit hash' do
- test '#write(synchronous)' do
- d = create_driver(CONFIG + "\noutput_type hash")
- time = event_time()
-
- out = capture_log do
- d.run(default_tag: 'test', flush: true) do
- d.feed(time, {'test' => 'test'})
- end
- end
-
- assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out
- end
-
- test '#try_write(asynchronous)' do
- d = create_driver(CONFIG + "\noutput_type hash")
- time = event_time()
- d.instance.delayed = true
-
- out = capture_log do
- d.run(default_tag: 'test', flush: true, shutdown: false) do
- d.feed(time, {'test' => 'test'})
- end
- end
-
- assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out
- end
- end
-
- # Capture the log output of the block given
- def capture_log(&block)
- tmp = $log
- $log = StringIO.new
- yield
- return $log.string
- ensure
- $log = tmp
- end
-end
diff --git a/test/plugin/test_out_null.rb b/test/plugin/test_out_null.rb
index 87c8119621..dd1edd6ec5 100644
--- a/test/plugin/test_out_null.rb
+++ b/test/plugin/test_out_null.rb
@@ -11,19 +11,95 @@ def create_driver(conf = "")
Fluent::Test::Driver::Output.new(Fluent::Plugin::NullOutput).configure(conf)
end
- def test_configure
- assert_nothing_raised do
- create_driver
+ sub_test_case 'non-buffered' do
+ test 'configure' do
+ assert_nothing_raised do
+ create_driver
+ end
+ end
+
+ test 'process' do
+ d = create_driver
+ assert_nothing_raised do
+ d.run do
+ d.feed("test", Fluent::EventTime.now, {"test" => "null"})
+ end
+ end
+ assert_equal([], d.events(tag: "test"))
end
end
- def test_process
- d = create_driver
- assert_nothing_raised do
- d.run do
- d.feed("test", Fluent::EventTime.now, {"test" => "null"})
+ sub_test_case 'buffered' do
+ test 'default chunk limit size is 100' do
+ d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
+ assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size
+ assert d.instance.buffer_config.flush_at_shutdown
+ assert_equal ['tag'], d.instance.buffer_config.chunk_keys
+ assert d.instance.chunk_key_tag
+ assert !d.instance.chunk_key_time
+ assert_equal [], d.instance.chunk_keys
+ end
+
+ test 'writes standard formattted chunks' do
+ d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
+ t = event_time("2016-05-23 00:22:13 -0800")
+ d.run(default_tag: 'test', flush: true) do
+ d.feed(t, {"message" => "null null null"})
+ d.feed(t, {"message" => "null null"})
+ d.feed(t, {"message" => "null"})
end
+
+ assert_equal 3, d.instance.emit_count
+ assert_equal 3, d.instance.emit_records
+ end
+
+ test 'check for chunk passed to #write' do
+ d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
+ data = []
+ d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] }
+
+ t = event_time("2016-05-23 00:22:13 -0800")
+ d.run(default_tag: 'test', flush: true) do
+ d.feed(t, {"message" => "null null null"})
+ d.feed(t, {"message" => "null null"})
+ d.feed(t, {"message" => "null"})
+ end
+
+ assert_equal 1, data.size
+ _, tag, binary = data.first
+ events = []
+ Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj }
+ assert_equal 'test', tag
+ assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events
+ end
+
+ test 'check for chunk passed to #try_write' do
+ d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
+ data = []
+ d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] }
+ d.instance.delayed = true
+
+ t = event_time("2016-05-23 00:22:13 -0800")
+ d.run(default_tag: 'test', flush: true, shutdown: false) do
+ d.feed(t, {"message" => "null null null"})
+ d.feed(t, {"message" => "null null"})
+ d.feed(t, {"message" => "null"})
+ end
+
+ assert_equal 1, data.size
+ chunk_id, tag, binary = data.first
+ events = []
+ Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj }
+ assert_equal 'test', tag
+ assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events
+
+ assert_equal [chunk_id], d.instance.buffer.dequeued.keys
+
+ d.instance.commit_write(chunk_id)
+
+ assert_equal [], d.instance.buffer.dequeued.keys
+
+ d.instance_shutdown
end
- assert_equal([], d.events(tag: "test"))
end
end
diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb
index 4b4c20e4d5..8fd02cb06e 100644
--- a/test/plugin/test_out_stdout.rb
+++ b/test/plugin/test_out_stdout.rb
@@ -14,70 +14,168 @@ def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::StdoutOutput).configure(conf)
end
- def test_configure
- d = create_driver
- assert_equal [], d.instance.formatter_configs
- end
+ sub_test_case 'non-buffered' do
+ test 'configure' do
+ d = create_driver
+ assert_equal [], d.instance.formatter_configs
+ end
- def test_configure_output_type
- d = create_driver(CONFIG + "\noutput_type json")
- assert_equal 'json', d.instance.formatter_configs.first[:@type]
+ test 'configure output_type' do
+ d = create_driver(CONFIG + "\noutput_type json")
+ assert_equal 'json', d.instance.formatter_configs.first[:@type]
- d = create_driver(CONFIG + "\noutput_type hash")
- assert_equal 'hash', d.instance.formatter_configs.first[:@type]
+ d = create_driver(CONFIG + "\noutput_type hash")
+ assert_equal 'hash', d.instance.formatter_configs.first[:@type]
- assert_raise(Fluent::ConfigError) do
- d = create_driver(CONFIG + "\noutput_type foo")
+ assert_raise(Fluent::ConfigError) do
+ d = create_driver(CONFIG + "\noutput_type foo")
+ end
end
- end
- def test_emit_in_default
- d = create_driver
- time = event_time()
- out = capture_log do
- d.run(default_tag: 'test') do
- d.feed(time, {'test' => 'test1'})
+ test 'emit with default configuration' do
+ d = create_driver
+ time = event_time()
+ out = capture_log do
+ d.run(default_tag: 'test') do
+ d.feed(time, {'test' => 'test1'})
+ end
end
+ assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out
end
- assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out
- end
- data('oj' => 'oj', 'yajl' => 'yajl')
- def test_emit_json(data)
- d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}")
- time = event_time()
- out = capture_log do
- d.run(default_tag: 'test') do
- d.feed(time, {'test' => 'test1'})
+ data('oj' => 'oj', 'yajl' => 'yajl')
+ test 'emit in json format' do |data|
+ d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}")
+ time = event_time()
+ out = capture_log do
+ d.run(default_tag: 'test') do
+ d.feed(time, {'test' => 'test1'})
+ end
+ end
+ assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out
+
+ if data == 'yajl'
+ # NOTE: Float::NAN is not jsonable
+ assert_raise(Yajl::EncodeError) { d.feed('test', time, {'test' => Float::NAN}) }
+ else
+ out = capture_log { d.feed('test', time, {'test' => Float::NAN}) }
+ assert_equal "#{Time.at(time).localtime} test: {\"test\":NaN}\n", out
end
end
- assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out
- if data == 'yajl'
- # NOTE: Float::NAN is not jsonable
- assert_raise(Yajl::EncodeError) { d.feed('test', time, {'test' => Float::NAN}) }
- else
+ test 'emit in hash format' do
+ d = create_driver(CONFIG + "\noutput_type hash")
+ time = event_time()
+ out = capture_log do
+ d.run(default_tag: 'test') do
+ d.feed(time, {'test' => 'test2'})
+ end
+ end
+ assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test2\"}\n", out
+
+ # NOTE: Float::NAN is not jsonable, but hash string can output it.
out = capture_log { d.feed('test', time, {'test' => Float::NAN}) }
- assert_equal "#{Time.at(time).localtime} test: {\"test\":NaN}\n", out
+ assert_equal "#{Time.at(time).localtime} test: {\"test\"=>NaN}\n", out
end
end
- def test_emit_hash
- d = create_driver(CONFIG + "\noutput_type hash")
- time = event_time()
- out = capture_log do
- d.run(default_tag: 'test') do
- d.feed(time, {'test' => 'test2'})
+ sub_test_case 'buffered' do
+ test 'configure' do
+ d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
+ assert_equal [], d.instance.formatter_configs
+ assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size
+ assert d.instance.buffer_config.flush_at_shutdown
+ assert_equal ['tag'], d.instance.buffer_config.chunk_keys
+ assert d.instance.chunk_key_tag
+ assert !d.instance.chunk_key_time
+ assert_equal [], d.instance.chunk_keys
+ end
+
+ test 'configure with output_type' do
+ d = create_driver(config_element("ROOT", "", {"output_type" => "json"}, [config_element("buffer")]))
+ assert_equal 'json', d.instance.formatter_configs.first[:@type]
+
+ d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")]))
+ assert_equal 'hash', d.instance.formatter_configs.first[:@type]
+
+ assert_raise(Fluent::ConfigError) do
+ create_driver(config_element("ROOT", "", {"output_type" => "foo"}, [config_element("buffer")]))
end
end
- assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test2\"}\n", out
- # NOTE: Float::NAN is not jsonable, but hash string can output it.
- out = capture_log { d.feed('test', time, {'test' => Float::NAN}) }
- assert_equal "#{Time.at(time).localtime} test: {\"test\"=>NaN}\n", out
- end
+ sub_test_case "emit with default config" do
+ test '#write(synchronous)' do
+ d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")]))
+ time = event_time()
+
+ out = capture_log do
+ d.run(default_tag: 'test', flush: true) do
+ d.feed(time, {'test' => 'test'})
+ end
+ end
+ assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
+ end
+ end
- private
+ sub_test_case "emit json" do
+ data('oj' => 'oj', 'yajl' => 'yajl')
+ test '#write(synchronous)' do |data|
+ d = create_driver(config_element("ROOT", "", {"output_type" => "json", "json_parser" => data}, [config_element("buffer")]))
+ time = event_time()
+
+ out = capture_log do
+ d.run(default_tag: 'test', flush: true) do
+ d.feed(time, {'test' => 'test'})
+ end
+ end
+ assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
+ end
+
+ data('oj' => 'oj', 'yajl' => 'yajl')
+ test '#try_write(asynchronous)' do |data|
+ d = create_driver(config_element("ROOT", "", {"output_type" => "json", "json_parser" => data}, [config_element("buffer")]))
+ time = event_time()
+ d.instance.delayed = true
+
+ out = capture_log do
+ d.run(default_tag: 'test', flush: true, shutdown: false) do
+ d.feed(time, {'test' => 'test'})
+ end
+ end
+
+ assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
+ end
+ end
+
+ sub_test_case 'emit hash' do
+ test '#write(synchronous)' do
+ d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")]))
+ time = event_time()
+
+ out = capture_log do
+ d.run(default_tag: 'test', flush: true) do
+ d.feed(time, {'test' => 'test'})
+ end
+ end
+
+ assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out
+ end
+
+ test '#try_write(asynchronous)' do
+ d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")]))
+ time = event_time()
+ d.instance.delayed = true
+
+ out = capture_log do
+ d.run(default_tag: 'test', flush: true, shutdown: false) do
+ d.feed(time, {'test' => 'test'})
+ end
+ end
+
+ assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out
+ end
+ end
+ end
# Capture the log output of the block given
def capture_log(&block)