Skip to content

Commit

Permalink
Merge pull request #2992 from fluent/v1-api-in_unix
Browse files Browse the repository at this point in the history
in_unix: Use v1 plugin API
  • Loading branch information
repeatedly authored Jun 3, 2020
2 parents 60e4e8f + f0d38fc commit 1813f3b
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 149 deletions.
154 changes: 77 additions & 77 deletions lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,68 @@
# limitations under the License.
#

require 'fileutils'
require 'socket'
require 'fluent/env'
require 'fluent/plugin/input'
require 'fluent/msgpack_factory'

require 'cool.io'
require 'yajl'
require 'fileutils'
require 'socket'

module Fluent::Plugin
# TODO: This plugin will be 3rd party plugin
class UnixInput < Input
Fluent::Plugin.register_input('unix', self)

helpers :event_loop

def initialize
super

require 'fluent/input'
require 'fluent/event'
@lsock = nil
end

module Fluent
# obsolete
class StreamInput < Input
config_param :blocking_timeout, :time, default: 0.5
desc 'The path to your Unix Domain Socket.'
config_param :path, :string, default: Fluent::DEFAULT_SOCKET_PATH
desc 'The backlog of Unix Domain Socket.'
config_param :backlog, :integer, default: nil
desc "New tag instead of incoming tag"
config_param :tag, :string, default: nil

def configure(conf)
super
end

def start
super

@loop = Coolio::Loop.new
@lsock = listen
@loop.attach(@lsock)
@thread = Thread.new(&method(:run))
event_loop_attach(@lsock)
end

def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@lsock.close
@thread.join
if @lsock
event_loop_detach(@lsock)
@lsock.close
end

super
end

#def listen
#end
def listen
if File.exist?(@path)
log.warn "Found existing '#{@path}'. Remove this file for in_unix plugin"
File.unlink(@path)
end
FileUtils.mkdir_p(File.dirname(@path))

def run
@loop.run(@blocking_timeout)
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
log.info "listening fluent socket on #{@path}"
s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end

private

# message Entry {
# 1: long time
# 2: object record
Expand All @@ -79,23 +97,27 @@ def run
# 3: object record
# }
def on_message(msg)
# TODO format error
tag = msg[0].to_s
unless msg.is_a?(Array)
log.warn "incoming data is broken:", msg: msg
return
end

tag = @tag || (msg[0].to_s)
entries = msg[1]

if entries.class == String
case entries
when String
# PackedForward
es = MessagePackEventStream.new(entries)
es = Fluent::MessagePackEventStream.new(entries)
router.emit_stream(tag, es)

elsif entries.class == Array
when Array
# Forward
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
entries.each {|e|
record = e[1]
next if record.nil?
time = e[0]
time = (now ||= EventTime.now) if time.to_i == 0
time = convert_time(e[0])
es.add(time, record)
}
router.emit_stream(tag, es)
Expand All @@ -105,39 +127,42 @@ def on_message(msg)
record = msg[2]
return if record.nil?

time = msg[1]
time = EventTime.now if time.to_i == 0
time = convert_time(msg[1])
router.emit(tag, time, record)
end
end

def convert_time(time)
case time
when nil, 0
Fluent::EventTime.now
when Fluent::EventTime
time
else
Fluent::EventTime.from_time(Time.at(time))
end
end

class Handler < Coolio::Socket
def initialize(io, log, on_message)
super(io)
if io.is_a?(TCPSocket)
opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end

@on_message = on_message
@log = log
@log.trace {
remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil
"accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}"
}
end

def on_connect
end

def on_read(data)
first = data[0]
if first == '{' || first == '['
if first == '{'.freeze || first == '['.freeze
m = method(:on_read_json)
@y = Yajl::Parser.new
@y.on_parse_complete = @on_message
@parser = Yajl::Parser.new
@parser.on_parse_complete = @on_message
else
m = method(:on_read_msgpack)
@u = Fluent::MessagePackFactory.msgpack_unpacker
@parser = Fluent::MessagePackFactory.msgpack_unpacker
end

singleton_class.module_eval do
Expand All @@ -147,17 +172,17 @@ def on_read(data)
end

def on_read_json(data)
@y << data
rescue
@log.error "unexpected error", error: $!.to_s
@parser << data
rescue => e
@log.error "unexpected error in json payload", error: e.to_s
@log.error_backtrace
close
end

def on_read_msgpack(data)
@u.feed_each(data, &@on_message)
rescue
@log.error "unexpected error", error: $!.to_s
@parser.feed_each(data, &@on_message)
rescue => e
@log.error "unexpected error in msgpack payload", error: e.to_s
@log.error_backtrace
close
end
Expand All @@ -167,29 +192,4 @@ def on_close
end
end
end

class UnixInput < StreamInput
Plugin.register_input('unix', self)

desc 'The path to your Unix Domain Socket.'
config_param :path, :string, default: DEFAULT_SOCKET_PATH
desc 'The backlog of Unix Domain Socket.'
config_param :backlog, :integer, default: nil

def configure(conf)
super
#log.warn "'unix' input is obsoleted and will be removed. Use 'forward' instead."
end

def listen
if File.exist?(@path)
File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.info "listening fluent socket on #{@path}"
s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end
end
end
Loading

0 comments on commit 1813f3b

Please sign in to comment.