Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate filter_stdout plugin to v0.14 API #1058

Merged
merged 18 commits into from
Aug 1, 2016
Merged
29 changes: 18 additions & 11 deletions lib/fluent/plugin/filter_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,38 @@
# limitations under the License.
#

require 'fluent/filter'
require 'fluent/plugin'
require 'fluent/plugin/filter'

module Fluent
module Fluent::Plugin
class StdoutFilter < Filter
Plugin.register_filter('stdout', self)
Fluent::Plugin.register_filter('stdout', self)

helpers :formatter, :compat_parameters, :inject

DEFAULT_FORMAT_TYPE = 'stdout'

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end

# for tests
attr_reader :formatter

desc 'The format of the output.'
config_param :format, :string, default: 'stdout'
# config_param :output_type, :string, :default => 'json' (StdoutFormatter defines this)

def configure(conf)
compat_parameters_convert(conf, :inject, :formatter)
super
end

@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
def start
@formatter = formatter_create(conf: @config.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
super
end

def filter_stream(tag, es)
es.each { |time, record|
begin
log.write @formatter.format(tag, time, record)
r = inject_values_to_record(tag, time, record)
log.write @formatter.format(tag, time, r)
rescue => e
router.emit_error_event(tag, time, record, e)
end
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/test/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def msgpack(type)
raise ArgumentError, "unknown msgpack object type '#{type}'"
end
end

def capture_log(driver)
tmp = driver.instance.log.out
driver.instance.log.out = StringIO.new
yield
return driver.instance.log.out.string
ensure
driver.instance.log.out = tmp
end
end
end
end
245 changes: 171 additions & 74 deletions test/plugin/test_filter_stdout.rb
Original file line number Diff line number Diff line change
@@ -1,114 +1,211 @@
require_relative '../helper'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_stdout'
require 'timecop'
require 'flexmock/test_unit'

class StdoutFilterTest < Test::Unit::TestCase
include Fluent
include FlexMock::TestCase

def setup
Fluent::Test.setup
@old_tz = ENV["TZ"]
ENV["TZ"] = "UTC"
Timecop.freeze
end

def teardown
super # FlexMock::TestCase requires this
# http://flexmock.rubyforge.org/FlexMock/TestCase.html
Timecop.return
ENV["TZ"] = @old_tz
end

CONFIG = %[
]
CONFIG = config_element('ROOT')

def create_driver(conf = CONFIG)
Test::FilterTestDriver.new(StdoutFilter, 'filter.test').configure(conf)
Fluent::Test::Driver::Filter.new(Fluent::Plugin::StdoutFilter).configure(conf)
end

def emit(d, msg, time)
def filter(d, time, record)
d.run {
d.emit(msg, time)
}.filtered_as_array[0][2]
d.feed("filter.test", time, record)
}
d.filtered_records
end

def test_through_record
d = create_driver
time = Time.now
filtered = emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time))
assert_equal({'test' => 'test'}, filtered)
filtered = filter(d, event_time, {'test' => 'test'})
assert_equal([{'test' => 'test'}], filtered)
end

def test_configure_default
d = create_driver
assert_equal 'json', d.instance.formatter.output_type
end

def test_configure_output_type
d = create_driver(CONFIG + "\noutput_type json")
assert_equal 'json', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type hash")
assert_equal 'hash', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type ltsv")
assert_equal 'ltsv', d.instance.formatter.output_type

assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
sub_test_case "flat style parameters" do
sub_test_case "configure" do
def test_configure_default
d = create_driver
d.run {}
assert_equal 'json', d.instance.formatter.output_type
end

data(json: "json",
hash: "hash",
ltsv: "ltsv")
def test_output_type(data)
d = create_driver(CONFIG + config_element("", "", { "output_type" => data }))
d.run {}
assert_equal data, d.instance.formatter.output_type
end

def test_invalid_output_type
assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + config_element("", "", { "output_type" => "foo" }))
d.run {}
end
end
end
end

def test_output_type_json
d = create_driver(CONFIG + "\noutput_type json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + "\noutput_type json")
flexmock(d.instance.router).should_receive(:emit_error_event)
emit(d, {'test' => Float::NAN}, time)
end

def test_output_type_hash
d = create_driver(CONFIG + "\noutput_type hash")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
def test_output_type_json
d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" }))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + config_element("", "", { "output_type" => "json" }))
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + "\noutput_type hash")
out = capture_log(d) { emit(d, {'test' => Float::NAN}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end
def test_output_type_hash
d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" }))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + config_element("", "", { "output_type" => "hash" }))
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
def test_include_time_key
d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nutc")
time = Time.now
message_time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
out = capture_log(d) { emit(d, {'test' => 'test'}, message_time) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end
# Use include_time_key to output the message's time
def test_include_time_key
config = config_element("", "", {
"output_type" => "json",
"include_time_key" => true,
"localtime" => false
})
d = create_driver(config)
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end

# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + "\nformat json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "{\"test\":\"test\"}\n", out
# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + config_element("", "", { "format" => "json" }))
out = capture_log(d) { filter(d, event_time, {'test' => 'test'}) }
assert_equal "{\"test\":\"test\"}\n", out
end
end

private
sub_test_case "with <format> sub section" do
sub_test_case "configure" do
def test_default
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout"})
d = create_driver(conf)
d.run {}
assert_equal("json", d.instance.formatter.output_type)
end

data(json: "json",
hash: "hash",
ltsv: "ltsv")
def test_output_type(data)
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => data })
d = create_driver(conf)
d.run {}
assert_equal(data, d.instance.formatter.output_type)
end

def test_invalid_output_type
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "foo" })
assert_raise(Fluent::ConfigError) do
d = create_driver(conf)
d.run {}
end
end
end

# Capture the log output of the block given
def capture_log(d, &block)
tmp = d.instance.log.out
d.instance.log.out = StringIO.new
yield
return d.instance.log.out.string
ensure
d.instance.log.out = tmp
sub_test_case "output_type" do
def test_json
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out
end

def test_json_nan
# NOTE: Float::NAN is not jsonable
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "json" })
d = create_driver(conf)
etime = event_time
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end

def test_hash
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
end

def test_hash_nan
# NOTE: Float::NAN is not jsonable, but hash string can output it.
conf = config_element
conf.elements << config_element("format", "", { "@type" => "stdout", "output_type" => "hash" })
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
def test_include_time_key
conf = config_element
conf.elements << config_element("format", "", {
"@type" => "stdout",
"output_type" => "json"
})
conf.elements << config_element("inject", "", {
"time_key" => "time",
"time_type" => "string",
"localtime" => false
})
d = create_driver(conf)
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end
end
end
end