Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move plugin util modules to compat #1091

Merged
merged 5 commits into from
Jul 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions lib/fluent/compat/exec_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'msgpack'
require 'yajl'

require 'fluent/engine'
require 'fluent/plugin'
require 'fluent/parser'

module Fluent
module Compat
module ExecUtil
SUPPORTED_FORMAT = {
'tsv' => :tsv,
'json' => :json,
'msgpack' => :msgpack,
}

class Parser
def initialize(on_message)
@on_message = on_message
end
end

class TextParserWrapperParser < Parser
def initialize(conf, on_message)
@parser = Plugin.new_parser(conf['format'])
@parser.configure(conf)
super(on_message)
end

def call(io)
io.each_line(&method(:each_line))
end

def each_line(line)
line.chomp!
@parser.parse(line) { |time, record|
@on_message.call(record, time)
}
end
end

class TSVParser < Parser
def initialize(keys, on_message)
@keys = keys
super(on_message)
end

def call(io)
io.each_line(&method(:each_line))
end

def each_line(line)
line.chomp!
vals = line.split("\t")

record = Hash[@keys.zip(vals)]

@on_message.call(record)
end
end

class JSONParser < Parser
def call(io)
y = Yajl::Parser.new
y.on_parse_complete = @on_message
y.parse(io)
end
end

class MessagePackParser < Parser
def call(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
end
end
end

class Formatter
end

class TSVFormatter < Formatter
def initialize(in_keys)
@in_keys = in_keys
super()
end

def call(record, out)
last = @in_keys.length-1
for i in 0..last
key = @in_keys[i]
out << record[key].to_s
out << "\t" if i != last
end
out << "\n"
end
end

class JSONFormatter < Formatter
def call(record, out)
out << Yajl.dump(record) << "\n"
end
end

class MessagePackFormatter < Formatter
def call(record, out)
record.to_msgpack(out)
end
end
end
end
end
54 changes: 54 additions & 0 deletions lib/fluent/compat/file_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Compat
module FileUtil
# Check file is writable if file exists
# Check directory is writable if file does not exist
#
# @param [String] path File path
# @return [Boolean] file is writable or not
def writable?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)

dirname = File.dirname(path)
return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable?

# Check file is writable in conjunction wtih mkdir_p(dirname(path))
#
# @param [String] path File path
# @return [Boolean] file writable or not
def writable_p?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)

dirname = File.dirname(path)
until File.exist?(dirname)
dirname = File.dirname(dirname)
end

return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable_p?
end
end
end
165 changes: 165 additions & 0 deletions lib/fluent/compat/socket_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'ipaddr'

require 'cool.io'

require 'fluent/plugin'
require 'fluent/input'

module Fluent
module Compat
module SocketUtil
def create_udp_socket(host)
if IPAddr.new(IPSocket.getaddress(host)).ipv4?
UDPSocket.new
else
UDPSocket.new(Socket::AF_INET6)
end
end
module_function :create_udp_socket

class UdpHandler < Coolio::IO
def initialize(io, log, body_size_limit, callback)
super(io)
@io = io
@log = log
@body_size_limit = body_size_limit
@callback = callback
end

def on_readable
msg, addr = @io.recvfrom_nonblock(@body_size_limit)
msg.chomp!
@callback.call(msg, addr)
rescue => e
@log.error "unexpected error", error: e
end
end

class TcpHandler < Coolio::Socket
PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"]

def initialize(io, log, delimiter, callback)
super(io)
@timeout = 0
if io.is_a?(TCPSocket)
@addr = (io.peeraddr rescue PEERADDR_FAILED)

opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end
@delimiter = delimiter
@callback = callback
@log = log
@log.trace { "accepted fluent socket object_id=#{self.object_id}" }
@buffer = "".force_encoding('ASCII-8BIT')
end

def on_connect
end

def on_read(data)
@buffer << data
pos = 0

while i = @buffer.index(@delimiter, pos)
msg = @buffer[pos...i]
@callback.call(msg, @addr)
pos = i + @delimiter.length
end
@buffer.slice!(0, pos) if pos > 0
rescue => e
@log.error "unexpected error", error: e
close
end

def on_close
@log.trace { "closed fluent socket object_id=#{self.object_id}" }
end
end

class BaseInput < Fluent::Input
def initialize
super
require 'fluent/parser'
end

desc 'Tag of output events.'
config_param :tag, :string
desc 'The format of the payload.'
config_param :format, :string
desc 'The port to listen to.'
config_param :port, :integer, default: 5150
desc 'The bind address to listen to.'
config_param :bind, :string, default: '0.0.0.0'
desc "The field name of the client's hostname."
config_param :source_host_key, :string, default: nil
config_param :blocking_timeout, :time, default: 0.5

def configure(conf)
super

@parser = Plugin.new_parser(@format)
@parser.configure(conf)
end

def start
super

@loop = Coolio::Loop.new
@handler = listen(method(:on_message))
@loop.attach(@handler)
@thread = Thread.new(&method(:run))
end

def shutdown
@loop.watchers.each { |w| w.detach }
@loop.stop if @loop.instance_variable_get("@running")
@handler.close
@thread.join

super
end

def run
@loop.run(@blocking_timeout)
rescue => e
log.error "unexpected error", error: e
log.error_backtrace
end

private

def on_message(msg, addr)
@parser.parse(msg) { |time, record|
unless time && record
log.warn "pattern not match: #{msg.inspect}"
return
end

record[@source_host_key] = addr[3] if @source_host_key
router.emit(@tag, time, record)
}
rescue => e
log.error msg.dump, error: e, host: addr[3]
log.error_backtrace
end
end
end
end
end
34 changes: 34 additions & 0 deletions lib/fluent/compat/string_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Compat
module StringUtil
def match_regexp(regexp, string)
begin
return regexp.match(string)
rescue ArgumentError => e
raise e unless e.message.index("invalid byte sequence in".freeze).zero?
$log.info "invalid byte sequence is replaced in `#{string}`"
string = string.scrub('?')
retry
end
return true
end
module_function :match_regexp
end
end
end
Loading