diff --git a/lib/fluent/plugin/in_unix.rb b/lib/fluent/plugin/in_unix.rb index f2998f91da..7f342871c2 100644 --- a/lib/fluent/plugin/in_unix.rb +++ b/lib/fluent/plugin/in_unix.rb @@ -14,50 +14,68 @@ # limitations under the License. # -require 'fileutils' -require 'socket' +require 'fluent/env' +require 'fluent/plugin/input' +require 'fluent/msgpack_factory' require 'cool.io' require 'yajl' +require 'fileutils' +require 'socket' + +module Fluent::Plugin + # TODO: This plugin will be 3rd party plugin + class UnixInput < Input + Fluent::Plugin.register_input('unix', self) + + helpers :event_loop + + def initialize + super -require 'fluent/input' -require 'fluent/event' + @lsock = nil + end -module Fluent - # obsolete - class StreamInput < Input - config_param :blocking_timeout, :time, default: 0.5 + desc 'The path to your Unix Domain Socket.' + config_param :path, :string, default: Fluent::DEFAULT_SOCKET_PATH + desc 'The backlog of Unix Domain Socket.' + config_param :backlog, :integer, default: nil + desc "New tag instead of incoming tag" + config_param :tag, :string, default: nil + + def configure(conf) + super + end def start super - @loop = Coolio::Loop.new @lsock = listen - @loop.attach(@lsock) - @thread = Thread.new(&method(:run)) + event_loop_attach(@lsock) end def shutdown - @loop.watchers.each {|w| w.detach } - @loop.stop - @lsock.close - @thread.join + if @lsock + event_loop_detach(@lsock) + @lsock.close + end super end - #def listen - #end + def listen + if File.exist?(@path) + log.warn "Found existing '#{@path}'. Remove this file for in_unix plugin" + File.unlink(@path) + end + FileUtils.mkdir_p(File.dirname(@path)) - def run - @loop.run(@blocking_timeout) - rescue - log.error "unexpected error", error: $!.to_s - log.error_backtrace + log.info "listening fluent socket on #{@path}" + s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message)) + s.listen(@backlog) unless @backlog.nil? + s end - private - # message Entry { # 1: long time # 2: object record @@ -79,23 +97,27 @@ def run # 3: object record # } def on_message(msg) - # TODO format error - tag = msg[0].to_s + unless msg.is_a?(Array) + log.warn "incoming data is broken:", msg: msg + return + end + + tag = @tag || (msg[0].to_s) entries = msg[1] - if entries.class == String + case entries + when String # PackedForward - es = MessagePackEventStream.new(entries) + es = Fluent::MessagePackEventStream.new(entries) router.emit_stream(tag, es) - elsif entries.class == Array + when Array # Forward - es = MultiEventStream.new + es = Fluent::MultiEventStream.new entries.each {|e| record = e[1] next if record.nil? - time = e[0] - time = (now ||= EventTime.now) if time.to_i == 0 + time = convert_time(e[0]) es.add(time, record) } router.emit_stream(tag, es) @@ -105,25 +127,28 @@ def on_message(msg) record = msg[2] return if record.nil? - time = msg[1] - time = EventTime.now if time.to_i == 0 + time = convert_time(msg[1]) router.emit(tag, time, record) end end + def convert_time(time) + case time + when nil, 0 + Fluent::EventTime.now + when Fluent::EventTime + time + else + Fluent::EventTime.from_time(Time.at(time)) + end + end + class Handler < Coolio::Socket def initialize(io, log, on_message) super(io) - if io.is_a?(TCPSocket) - opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } - io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - end + @on_message = on_message @log = log - @log.trace { - remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil - "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}" - } end def on_connect @@ -131,13 +156,13 @@ def on_connect def on_read(data) first = data[0] - if first == '{' || first == '[' + if first == '{'.freeze || first == '['.freeze m = method(:on_read_json) - @y = Yajl::Parser.new - @y.on_parse_complete = @on_message + @parser = Yajl::Parser.new + @parser.on_parse_complete = @on_message else m = method(:on_read_msgpack) - @u = Fluent::MessagePackFactory.msgpack_unpacker + @parser = Fluent::MessagePackFactory.msgpack_unpacker end singleton_class.module_eval do @@ -147,17 +172,17 @@ def on_read(data) end def on_read_json(data) - @y << data - rescue - @log.error "unexpected error", error: $!.to_s + @parser << data + rescue => e + @log.error "unexpected error in json payload", error: e.to_s @log.error_backtrace close end def on_read_msgpack(data) - @u.feed_each(data, &@on_message) - rescue - @log.error "unexpected error", error: $!.to_s + @parser.feed_each(data, &@on_message) + rescue => e + @log.error "unexpected error in msgpack payload", error: e.to_s @log.error_backtrace close end @@ -167,29 +192,4 @@ def on_close end end end - - class UnixInput < StreamInput - Plugin.register_input('unix', self) - - desc 'The path to your Unix Domain Socket.' - config_param :path, :string, default: DEFAULT_SOCKET_PATH - desc 'The backlog of Unix Domain Socket.' - config_param :backlog, :integer, default: nil - - def configure(conf) - super - #log.warn "'unix' input is obsoleted and will be removed. Use 'forward' instead." - end - - def listen - if File.exist?(@path) - File.unlink(@path) - end - FileUtils.mkdir_p File.dirname(@path) - log.info "listening fluent socket on #{@path}" - s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message)) - s.listen(@backlog) unless @backlog.nil? - s - end - end end diff --git a/test/plugin/test_in_unix.rb b/test/plugin/test_in_unix.rb index d922203d8f..25fa046764 100644 --- a/test/plugin/test_in_unix.rb +++ b/test/plugin/test_in_unix.rb @@ -1,125 +1,181 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_unix' -module StreamInputTest +class UnixInputTest < Test::Unit::TestCase def setup Fluent::Test.setup + @d = nil end - def test_time - d = create_driver + def teardown + @d.instance_shutdown if @d + end - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + TMP_DIR = File.dirname(__FILE__) + "/../tmp/in_unix#{ENV['TEST_ENV_NUMBER']}" + CONFIG = %[ + path #{TMP_DIR}/unix + backlog 1000 + ] + + def create_driver(conf = CONFIG) + Fluent::Test::Driver::Input.new(Fluent::Plugin::UnixInput).configure(conf) + end - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + def packer(*args) + Fluent::MessagePackFactory.msgpack_packer(*args) + end - d.run do - d.expected_emits.each {|tag,_time,record| - send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, _time, record]).to_s + def unpacker + Fluent::MessagePackFactory.msgpack_unpacker + end + + def send_data(data) + io = UNIXSocket.new("#{TMP_DIR}/unix") + begin + io.write data + ensure + io.close + end + end + + def test_configure + @d = create_driver + assert_equal "#{TMP_DIR}/unix", @d.instance.path + assert_equal 1000, @d.instance.backlog + end + + def test_time + @d = create_driver + + time = Fluent::EventTime.now + records = [ + ["tag1", 0, {"a" => 1}], + ["tag2", nil, {"a" => 2}], + ] + + @d.run(expect_records: records.length, timeout: 5) do + records.each {|tag, _time, record| + send_data packer.write([tag, _time, record]).to_s } end + + @d.events.each_with_index { |e, i| + orig = records[i] + assert_equal(orig[0], e[0]) + assert_true(time <= e[1]) + assert_equal(orig[2], e[2]) + } end def test_message - d = create_driver + @d = create_driver - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + time = Fluent::EventTime.now + records = [ + ["tag1", time, {"a" => 1}], + ["tag2", time, {"a" => 2}], + ] - d.run do - d.expected_emits.each {|tag,_time,record| - send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, _time, record]).to_s + @d.run(expect_records: records.length, timeout: 5) do + records.each {|tag, _time, record| + send_data packer.write([tag, _time, record]).to_s } end + + assert_equal(records, @d.events) end def test_forward - d = create_driver + @d = create_driver time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + records = [ + ["tag1", time, {"a" => 1}], + ["tag1", time, {"a" => 2}] + ] - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag1", time, {"a"=>2} - - d.run do + @d.run(expect_records: records.length, timeout: 20) do entries = [] - d.expected_emits.each {|tag,_time,record| + records.each {|tag, _time, record| entries << [_time, record] } - send_data Fluent::MessagePackFactory.msgpack_packer.write(["tag1", entries]).to_s + send_data packer.write(["tag1", entries]).to_s end + assert_equal(records, @d.events) end def test_packed_forward - d = create_driver + @d = create_driver - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag1", time, {"a"=>2} + time = Fluent::EventTime.now + records = [ + ["tag1", time, {"a" => 1}], + ["tag1", time, {"a" => 2}], + ] - d.run do + @d.run(expect_records: records.length, timeout: 20) do entries = '' - d.expected_emits.each {|tag,_time,record| - Fluent::MessagePackFactory.msgpack_packer(entries).write([_time, record]).flush + records.each {|_tag, _time, record| + packer(entries).write([_time, record]).flush } - send_data Fluent::MessagePackFactory.msgpack_packer.write(["tag1", entries]).to_s + send_data packer.write(["tag1", entries]).to_s end + assert_equal(records, @d.events) end def test_message_json - d = create_driver + @d = create_driver + + time = Fluent::EventTime.now + records = [ + ["tag1", time, {"a" => 1}], + ["tag2", time, {"a" => 2}], + ] + + @d.run(expect_records: records.length, timeout: 5) do + tag, _time, record = records[0] + send_data [tag, _time.to_i, record].to_json + tag, _time, record = records[1] + send_data [tag, _time.to_f, record].to_json + end - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + assert_equal(records, @d.events) + end + + def test_message_with_tag + @d = create_driver(CONFIG + "tag new_tag") - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + time = Fluent::EventTime.now + records = [ + ["tag1", time, {"a" => 1}], + ["tag2", time, {"a" => 2}], + ] - d.run do - d.expected_emits.each {|tag,_time,record| - send_data [tag, time, record].to_json + @d.run(expect_records: records.length, timeout: 5) do + records.each {|tag, _time, record| + send_data packer.write([tag, _time, record]).to_s } end - end - def create_driver(klass, conf) - Fluent::Test::InputTestDriver.new(klass).configure(conf) + @d.events.each { |event| + assert_equal("new_tag", event[0]) + } end - def send_data(data) - io = connect - begin - io.write data - ensure - io.close + data('string chunk' => 'broken string', + 'integer chunk' => 10) + def test_broken_message(data) + @d = create_driver + @d.run(shutdown: false, timeout: 5) do + @d.instance.__send__(:on_message, data) end - end -end - -class UnixInputTest < Test::Unit::TestCase - include StreamInputTest - - TMP_DIR = File.dirname(__FILE__) + "/../tmp/in_unix#{ENV['TEST_ENV_NUMBER']}" - CONFIG = %[ - path #{TMP_DIR}/unix - backlog 1000 - ] - def create_driver(conf=CONFIG) - super(Fluent::UnixInput, conf) - end - - def test_configure - d = create_driver - assert_equal "#{TMP_DIR}/unix", d.instance.path - assert_equal 1000, d.instance.backlog - end + assert_equal 0, @d.events.size - def connect - UNIXSocket.new("#{TMP_DIR}/unix") + logs = @d.instance.log.logs + assert_equal 1, logs.select { |line| + line =~ / \[warn\]: incoming data is broken: msg=#{data.inspect}/ + }.size, "should not accept broken chunk" end end unless Fluent.windows?