Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate filter_stdout plugin to v0.14 API #1058

Merged
merged 18 commits into from
Aug 1, 2016
Merged
30 changes: 19 additions & 11 deletions lib/fluent/plugin/filter_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,39 @@
# limitations under the License.
#

require 'fluent/filter'
require 'fluent/plugin'
require 'fluent/plugin/filter'

module Fluent
module Fluent::Plugin
class StdoutFilter < Filter
Plugin.register_filter('stdout', self)
Fluent::Plugin.register_filter('stdout', self)

helpers :formatter, :compat_parameters, :inject

DEFAULT_FORMAT_TYPE = 'stdout'

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end

# for tests
attr_reader :formatter

desc 'The format of the output.'
config_param :format, :string, default: 'stdout'
# config_param :output_type, :string, :default => 'json' (StdoutFormatter defines this)

def configure(conf)
conf['format'] ||= DEFAULT_FORMAT_TYPE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't do this operations.
Please use default_type keyword argument of formatter_create method instead.

compat_parameters_convert(conf, :inject, :formatter)
super
end

@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
def start
@formatter = formatter_create(conf: @config.elements('format').first)
super
end

def filter_stream(tag, es)
es.each { |time, record|
begin
log.write @formatter.format(tag, time, record)
r = inject_values_to_record(tag, time, record)
log.write @formatter.format(tag, time, r)
rescue => e
router.emit_error_event(tag, time, record, e)
end
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/test/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def msgpack(type)
raise ArgumentError, "unknown msgpack object type '#{type}'"
end
end

def capture_log(driver)
tmp = driver.instance.log.out
driver.instance.log.out = StringIO.new
yield
return driver.instance.log.out.string
ensure
driver.instance.log.out = tmp
end
end
end
end
256 changes: 186 additions & 70 deletions test/plugin/test_filter_stdout.rb
Original file line number Diff line number Diff line change
@@ -1,114 +1,230 @@
require_relative '../helper'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_stdout'
require 'timecop'
require 'flexmock/test_unit'

class StdoutFilterTest < Test::Unit::TestCase
include Fluent
include FlexMock::TestCase

def setup
Fluent::Test.setup
@old_tz = ENV["TZ"]
ENV["TZ"] = "UTC"
Timecop.freeze
end

def teardown
super # FlexMock::TestCase requires this
# http://flexmock.rubyforge.org/FlexMock/TestCase.html
Timecop.return
ENV["TZ"] = @old_tz
end

CONFIG = %[
]

def create_driver(conf = CONFIG)
Test::FilterTestDriver.new(StdoutFilter, 'filter.test').configure(conf)
Fluent::Test::Driver::Filter.new(Fluent::Plugin::StdoutFilter).configure(conf)
end

def emit(d, msg, time)
def filter(d, time, record)
d.run {
d.emit(msg, time)
}.filtered_as_array[0][2]
d.feed("filter.test", time, record)
}
d.filtered_records
end

def test_through_record
d = create_driver
time = Time.now
filtered = emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time))
assert_equal({'test' => 'test'}, filtered)
filtered = filter(d, event_time, {'test' => 'test'})
assert_equal([{'test' => 'test'}], filtered)
end

def test_configure_default
d = create_driver
assert_equal 'json', d.instance.formatter.output_type
end

def test_configure_output_type
d = create_driver(CONFIG + "\noutput_type json")
assert_equal 'json', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type hash")
assert_equal 'hash', d.instance.formatter.output_type

d = create_driver(CONFIG + "\noutput_type ltsv")
assert_equal 'ltsv', d.instance.formatter.output_type

assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
sub_test_case "flat style parameters" do
sub_test_case "configure" do
def test_configure_default
d = create_driver
d.run {}
assert_equal 'json', d.instance.formatter.output_type
end

data(json: "json",
hash: "hash",
ltsv: "ltsv")
def test_output_type(data)
d = create_driver(CONFIG + "\noutput_type #{data}")
d.run {}
assert_equal data, d.instance.formatter.output_type
end

def test_invalid_output_type
assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
d.run {}
end
end
end
end

def test_output_type_json
d = create_driver(CONFIG + "\noutput_type json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out
def test_output_type_json
d = create_driver(CONFIG + "\noutput_type json")
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + "\noutput_type json")
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end

# NOTE: Float::NAN is not jsonable
d = create_driver(CONFIG + "\noutput_type json")
flexmock(d.instance.router).should_receive(:emit_error_event)
emit(d, {'test' => Float::NAN}, time)
end
def test_output_type_hash
d = create_driver(CONFIG + "\noutput_type hash")
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + "\noutput_type hash")
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

def test_output_type_hash
d = create_driver(CONFIG + "\noutput_type hash")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
# Use include_time_key to output the message's time
def test_include_time_key
d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nlocaltime false")
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end

# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(CONFIG + "\noutput_type hash")
out = capture_log(d) { emit(d, {'test' => Float::NAN}, Fluent::EventTime.from_time(time)) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + "\nformat json")
out = capture_log(d) { filter(d, event_time, {'test' => 'test'}) }
assert_equal "{\"test\":\"test\"}\n", out
end
end

# Use include_time_key to output the message's time
def test_include_time_key
d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nutc")
time = Time.now
message_time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC")
out = capture_log(d) { emit(d, {'test' => 'test'}, message_time) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end
sub_test_case "with <format> sub section" do
sub_test_case "configure" do
def test_default
conf = format_config(%[
@type stdout
])
d = create_driver(conf)
d.run {}
assert_equal("json", d.instance.formatter.output_type)
end

data(json: "json",
hash: "hash",
ltsv: "ltsv")
def test_output_type(data)
conf = format_config(%[
@type stdout
output_type #{data}
])
d = create_driver(conf)
d.run {}
assert_equal(data, d.instance.formatter.output_type)
end

def test_invalid_output_type
assert_raise(Fluent::ConfigError) do
conf = format_config(%[
@type stdout
output_type foo
])
d = create_driver(conf)
d.run {}
end
end
end

# out_stdout formatter itself can also be replaced
def test_format_json
d = create_driver(CONFIG + "\nformat json")
time = Time.now
out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) }
assert_equal "{\"test\":\"test\"}\n", out
end
sub_test_case "output_type" do
def test_json
d = create_driver(format_config(%[
@type stdout
output_type json
]))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out
end

def test_json_nan
# NOTE: Float::NAN is not jsonable
d = create_driver(format_config(%[
@type stdout
output_type json
]))
etime = event_time
flexmock(d.instance.router).should_receive(:emit_error_event)
filter(d, etime, {'test' => Float::NAN})
end

def test_hash
d = create_driver(format_config(%[
@type stdout
output_type hash
]))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out
end

def test_hash_nan
# NOTE: Float::NAN is not jsonable, but hash string can output it.
d = create_driver(format_config(%[
@type stdout
output_type hash
]))
etime = event_time
time = Time.at(etime.sec)
out = capture_log(d) { filter(d, etime, {'test' => Float::NAN}) }
assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out
end

# Use include_time_key to output the message's time
def test_include_time_key
d = create_driver(format_config(%[
@type stdout
output_type json
]) +
inject_config(%[
time_key time
time_type string
localtime false
]))
etime = event_time
time = Time.at(etime.sec)
message_time = event_time("2011-01-02 13:14:15 UTC")
out = capture_log(d) { filter(d, message_time, {'test' => 'test'}) }
assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out
end
end

private
def format_config(config)
%[
<format>
#{config}
</format>
]
end

# Capture the log output of the block given
def capture_log(d, &block)
tmp = d.instance.log.out
d.instance.log.out = StringIO.new
yield
return d.instance.log.out.string
ensure
d.instance.log.out = tmp
def inject_config(config)
%[
<inject>
#{config}
</inject>
]
end
end
end