Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate plugins with server plugin helper #1382

Merged
merged 7 commits into from
Dec 21, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class ForwardInput < Input
config_param :chunk_size_limit, :size, default: nil
desc 'Skip an event if incoming event is invalid.'
config_param :skip_invalid_event, :bool, default: false

desc "The field name of the client's source address."
config_param :source_address_key, :string, default: nil
desc "The field name of the client's hostname."
config_param :source_hostname_key, :string, default: nil

Expand Down Expand Up @@ -98,6 +101,7 @@ def configure(conf)
raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
end
end
@enable_field_injection = @source_address_key || @source_hostname_key

if @security
if @security.user_auth && @security.users.empty?
Expand Down Expand Up @@ -136,7 +140,6 @@ def configure(conf)
})
end
end
@lsock = @usock = nil
end

HEARTBEAT_UDP_PAYLOAD = "\0"
Expand Down Expand Up @@ -197,7 +200,7 @@ def handle_connection(conn)
log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
state = :established
when :established
options = on_message(msg, chunk_size, conn.remote_host)
options = on_message(msg, chunk_size, conn)
if options && r = response(options)
log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
conn.on_write_complete{ conn.close } if @deny_keepalive
Expand Down Expand Up @@ -253,26 +256,26 @@ def response(option)
nil
end

def on_message(msg, chunk_size, remote_host)
def on_message(msg, chunk_size, conn)
if msg.nil?
# for future TCP heartbeat_request
return
end

# TODO: raise an exception if broken chunk is generated by recoverable situation
unless msg.is_a?(Array)
log.warn "incoming chunk is broken:", host: remote_host, msg: msg
log.warn "incoming chunk is broken:", host: conn.remote_host, msg: msg
return
end

tag = msg[0]
entries = msg[1]

if @chunk_size_limit && (chunk_size > @chunk_size_limit)
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: conn.remote_host, limit: @chunk_size_limit, size: chunk_size
return
elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: conn.remote_host, limit: @chunk_size_warn_limit, size: chunk_size
end

case entries
Expand All @@ -282,14 +285,16 @@ def on_message(msg, chunk_size, remote_host)
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event
es = add_source_host(es, remote_host) if @source_hostname_key
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @enable_field_injection
es = add_source_host(es, conn)
end
router.emit_stream(tag, es)

when Array
# Forward
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, remote_host)
check_and_skip_invalid_event(tag, entries, conn.remote_host)
else
es = Fluent::MultiEventStream.new
entries.each { |e|
Expand All @@ -301,7 +306,9 @@ def on_message(msg, chunk_size, remote_host)
}
es
end
es = add_source_host(es, remote_host) if @source_hostname_key
if @enable_field_injection
es = add_source_host(es, conn)
end
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -310,12 +317,15 @@ def on_message(msg, chunk_size, remote_host)
time = msg[1]
record = msg[2]
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record
log.warn "got invalid event and drop it:", host: conn.remote_host, tag: tag, time: time, record: record
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Fluent::Engine.now if time.to_i == 0
record[@source_hostname_key] = remote_host if @source_hostname_key
if @enable_field_injection
record[@source_address_key] = conn.remote_addr if @source_address_key
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
end
router.emit(tag, time, record)
option = msg[3]
end
Expand All @@ -340,12 +350,31 @@ def check_and_skip_invalid_event(tag, es, remote_host)
new_es
end

def add_source_host(es, host)
def add_source_host(es, conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_source_info or something is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.

new_es = Fluent::MultiEventStream.new
es.each { |time, record|
record[@source_hostname_key] = host
new_es.add(time, record)
}
if @source_address_key && @source_hostname_key
address = conn.remote_addr
hostname = conn.remote_host
es.each { |time, record|
record[@source_address_key] = address
record[@source_hostname_key] = hostname
new_es.add(time, record)
}
elsif @source_address_key
address = conn.remote_addr
es.each { |time, record|
record[@source_address_key] = address
new_es.add(time, record)
}
elsif @source_hostname_key
hostname = conn.remote_host
es.each { |time, record|
record[@source_hostname_key] = hostname
new_es.add(time, record)
}
else
raise "BUG: don't call this method in this case"
end
new_es
end

Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def parse(text)
class HttpInput < Input
Fluent::Plugin.register_input('http', self)

# TODO: update this plugin implementation to use server plugin helper, after adding keepalive feature on it

helpers :parser, :compat_parameters, :event_loop

EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")
Expand Down
88 changes: 52 additions & 36 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
# limitations under the License.
#

require 'cool.io'
require 'yajl'

require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/plugin/parser'

require 'yajl'

module Fluent::Plugin
class SyslogInput < Input
Fluent::Plugin.register_input('syslog', self)

helpers :parser, :compat_parameters, :event_loop
helpers :parser, :compat_parameters, :server

DEFAULT_PARSER = 'syslog'
SYSLOG_REGEXP = /^\<([0-9]+)\>(.*)/
Expand Down Expand Up @@ -68,11 +67,6 @@ class SyslogInput < Input
7 => 'debug'
}

def initialize
super
require 'fluent/plugin/socket_util'
end

desc 'The port to listen to.'
config_param :port, :integer, default: 5140
desc 'The bind address to listen to.'
Expand All @@ -81,14 +75,22 @@ def initialize
config_param :tag, :string
desc 'The transport protocol used to receive logs.(udp, tcp)'
config_param :protocol_type, :enum, list: [:tcp, :udp], default: :udp

desc 'If true, add source host to event record.'
config_param :include_source_host, :bool, default: false
config_param :include_source_host, :bool, default: false, deprecated: 'use "source_hostname_key" or "source_address_key" instead.'
desc 'Specify key of source host when include_source_host is true.'
config_param :source_host_key, :string, default: 'source_host'.freeze

desc 'The field name of hostname of sender.'
config_param :source_hostname_key, :string, default: nil
desc 'The field name of source address of sender.'
config_param :source_address_key, :string, default: nil

desc 'The field name of the priority.'
config_param :priority_key, :string, default: nil
desc 'The field name of the facility.'
config_param :facility_key, :string, default: nil

config_param :blocking_timeout, :time, default: 0.5
config_param :message_length_limit, :size, default: 2048

Expand All @@ -107,25 +109,57 @@ def configure(conf)
@parser = parser_create
@parser_parse_priority = @parser.respond_to?(:with_priority) && @parser.with_priority

if @include_source_host
if @source_address_key
raise Fluent::ConfigError, "specify either source_address_key or include_source_host"
end
@source_address_key = @source_host_key
end
@resolve_name = !!@source_hostname_key

@_event_loop_run_timeout = @blocking_timeout
end

def start
super

@handler = listen(method(:message_handler))
event_loop_attach(@handler)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
case @protocol_type
when :udp then start_udp_server
when :tcp then start_tcp_server
else
raise "BUG: invalid protocol_type value:#{@protocol_type}"
end
end

def shutdown
@handler.close
def start_udp_server
server_create_udp(:in_syslog_udp_server, @port, bind: @bind, max_bytes: @message_length_limit, resolve_name: @resolve_name) do |data, sock|
message_handler(data.chomp, sock)
end
end

super
def start_tcp_server
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
delimiter = "\n"
delimiter_size = delimiter.size
server_create_connection(:in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_name) do |conn|
buffer = ""
conn.data do |data|
buffer << data
pos = 0
while idx = buffer.index(delimiter, pos)
msg = buffer[pos...idx]
pos = idx + delimiter_size
message_handler(msg, conn)
end
buffer.slice!(0, pos) if pos > 0
end
end
end

private

def message_handler(data, addr)
def message_handler(data, sock)
pri = nil
text = data
unless @parser_parse_priority
Expand All @@ -150,7 +184,8 @@ def message_handler(data, addr)

record[@priority_key] = priority if @priority_key
record[@facility_key] = facility if @facility_key
record[@source_host_key] = addr[2] if @include_source_host
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key

tag = "#{@tag}.#{facility}.#{priority}"
emit(tag, time, record)
Expand All @@ -160,25 +195,6 @@ def message_handler(data, addr)
log.error_backtrace
end

private

def listen(callback)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
if @protocol_type == :udp
@usock = client.listen_udp(@bind, @port)
Fluent::SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback)
else
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
lsock = client.listen_tcp(@bind, @port)
Coolio::TCPServer.new(lsock, nil, Fluent::SocketUtil::TcpHandler, log, "\n", callback)
end
end

def emit(tag, time, record)
router.emit(tag, time, record)
rescue => e
Expand Down
Loading