Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_unix: Use v1 plugin API #2992

Merged
merged 5 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 77 additions & 77 deletions lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,68 @@
# limitations under the License.
#

require 'fileutils'
require 'socket'
require 'fluent/env'
require 'fluent/plugin/input'
require 'fluent/msgpack_factory'

require 'cool.io'
require 'yajl'
require 'fileutils'
require 'socket'

module Fluent::Plugin
# TODO: This plugin will be 3rd party plugin
class UnixInput < Input
Fluent::Plugin.register_input('unix', self)

helpers :event_loop

def initialize
super

require 'fluent/input'
require 'fluent/event'
@lsock = nil
end

module Fluent
# obsolete
class StreamInput < Input
config_param :blocking_timeout, :time, default: 0.5
desc 'The path to your Unix Domain Socket.'
config_param :path, :string, default: Fluent::DEFAULT_SOCKET_PATH
desc 'The backlog of Unix Domain Socket.'
config_param :backlog, :integer, default: nil
desc "New tag instead of incoming tag"
config_param :tag, :string, default: nil

def configure(conf)
super
end

def start
super

@loop = Coolio::Loop.new
@lsock = listen
@loop.attach(@lsock)
@thread = Thread.new(&method(:run))
event_loop_attach(@lsock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

end

def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@lsock.close
@thread.join
if @lsock
event_loop_detach(@lsock)
@lsock.close
end

super
end

#def listen
#end
def listen
if File.exist?(@path)
log.warn "Found existing '#{@path}'. Remove this file for in_unix plugin"
File.unlink(@path)
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
end
FileUtils.mkdir_p(File.dirname(@path))

def run
@loop.run(@blocking_timeout)
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
log.info "listening fluent socket on #{@path}"
s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end

private

# message Entry {
# 1: long time
# 2: object record
Expand All @@ -79,23 +97,27 @@ def run
# 3: object record
# }
def on_message(msg)
# TODO format error
tag = msg[0].to_s
unless msg.is_a?(Array)
log.warn "incoming data is broken:", msg: msg
return
end

tag = @tag || (msg[0].to_s)
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
entries = msg[1]

if entries.class == String
case entries
when String
# PackedForward
es = MessagePackEventStream.new(entries)
es = Fluent::MessagePackEventStream.new(entries)
router.emit_stream(tag, es)

elsif entries.class == Array
when Array
# Forward
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
entries.each {|e|
record = e[1]
next if record.nil?
time = e[0]
time = (now ||= EventTime.now) if time.to_i == 0
time = convert_time(e[0])
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
es.add(time, record)
}
router.emit_stream(tag, es)
Expand All @@ -105,39 +127,42 @@ def on_message(msg)
record = msg[2]
return if record.nil?

time = msg[1]
time = EventTime.now if time.to_i == 0
time = convert_time(msg[1])
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
router.emit(tag, time, record)
end
end

def convert_time(time)
case
when time.nil? || (time == 0)
Fluent::EventTime.now
when time === Fluent::EventTime
time
else
Fluent::EventTime.from_time(Time.at(time))
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nits]

Suggested change
case
when time.nil? || (time == 0)
Fluent::EventTime.now
when time === Fluent::EventTime
time
else
Fluent::EventTime.from_time(Time.at(time))
end
case time
when nil, 0
Fluent::EventTime.now
when Fluent::EventTime
time
else
Fluent::EventTime.from_time(Time.at(time))
end

end

class Handler < Coolio::Socket
def initialize(io, log, on_message)
super(io)
if io.is_a?(TCPSocket)
opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end

@on_message = on_message
@log = log
@log.trace {
remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil
"accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}"
}
end

def on_connect
end

def on_read(data)
first = data[0]
if first == '{' || first == '['
if first == '{'.freeze || first == '['.freeze
m = method(:on_read_json)
@y = Yajl::Parser.new
@y.on_parse_complete = @on_message
@parser = Yajl::Parser.new
@parser.on_parse_complete = @on_message
else
m = method(:on_read_msgpack)
@u = Fluent::MessagePackFactory.msgpack_unpacker
@parser = Fluent::MessagePackFactory.msgpack_unpacker
end

singleton_class.module_eval do
Expand All @@ -147,17 +172,17 @@ def on_read(data)
end

def on_read_json(data)
@y << data
rescue
@log.error "unexpected error", error: $!.to_s
@parser << data
rescue => e
@log.error "unexpected error in json payload", error: e.to_s
@log.error_backtrace
close
end

def on_read_msgpack(data)
@u.feed_each(data, &@on_message)
rescue
@log.error "unexpected error", error: $!.to_s
@parser.feed_each(data, &@on_message)
rescue => e
@log.error "unexpected error in msgpack payload", error: e.to_s
@log.error_backtrace
close
end
Expand All @@ -167,29 +192,4 @@ def on_close
end
end
end

class UnixInput < StreamInput
Plugin.register_input('unix', self)

desc 'The path to your Unix Domain Socket.'
config_param :path, :string, default: DEFAULT_SOCKET_PATH
desc 'The backlog of Unix Domain Socket.'
config_param :backlog, :integer, default: nil

def configure(conf)
super
#log.warn "'unix' input is obsoleted and will be removed. Use 'forward' instead."
end

def listen
if File.exist?(@path)
File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.info "listening fluent socket on #{@path}"
s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end
end
end
Loading