Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce server plugin helpers #1312

Merged
merged 29 commits into from
Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a01883a
make sure to mark as loop not running
tagomoris Nov 15, 2016
6874665
add server plugin helper
tagomoris Nov 16, 2016
1e7b78f
migrate plugins to v0.14 APIs with server plugin helper
tagomoris Nov 16, 2016
5c6ffce
update implementation using serever plugin helper
tagomoris Nov 16, 2016
9fb9c3d
fix buggy test code
tagomoris Nov 16, 2016
cca6e9a
On windows, @socket_manager_path is a port number
tagomoris Nov 16, 2016
faa3137
close-on-exec is unavailable on windows
tagomoris Nov 16, 2016
ebc8087
fix not to set close-on-exec, because it's set by ruby
tagomoris Nov 16, 2016
6dca048
remove mixin not required anymore
tagomoris Nov 16, 2016
45ac6a6
set do_not_reverse_lookup to sockets to listen and after accepted
tagomoris Nov 17, 2016
d4d5f35
return watcher when attached, and detach watchers attached by plugin …
tagomoris Nov 22, 2016
d766ecd
break #shutdown by timeout if any event watcher blocks infinitly
tagomoris Nov 22, 2016
faa51a8
add argument checks
tagomoris Nov 22, 2016
48fe5e6
show protocol of created servers in dump
tagomoris Nov 22, 2016
84d82b7
remove all callbacks for udp server, add established connection handling
tagomoris Nov 22, 2016
93b5e00
add tests of server plugin helper
tagomoris Nov 22, 2016
93e5c1e
show tests which be stuck
tagomoris Nov 22, 2016
3493bb9
update serverengine dependency to fix IPv6 support on Windows
tagomoris Nov 22, 2016
c12758e
fix not to use power_assert to avoid SystemStackError on this assertion
tagomoris Nov 22, 2016
f29de3e
fix to assert hostname directly for environments which does not have …
tagomoris Nov 22, 2016
5a97f75
update ServerEngine not to crash for IPv6 & Windows
tagomoris Nov 24, 2016
f1f5dc5
refer both of source_hostname_key and source_host_key (as optional pa…
tagomoris Nov 24, 2016
89ebb46
simplify code and inspect objects for errors
tagomoris Nov 24, 2016
6497319
add helper method to get unused UDP ports
tagomoris Nov 24, 2016
742b79f
revert verbose testing option for CI services
tagomoris Nov 24, 2016
cf763b9
removed debugging message
tagomoris Nov 24, 2016
a29481f
separate setting socket options to share it with socket plugin helper
tagomoris Nov 25, 2016
ecc0959
rescue Errno::ECONNRESET for cases ICMP has trouble (for getting host…
tagomoris Nov 25, 2016
4e63ef4
require top level ServerEngine
tagomoris Nov 25, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: ruby
cache: bundler

# script: bundle exec rake test TESTOPTS=-v

# http://rubies.travis-ci.org/
# See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c
matrix:
Expand Down
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ install:
build: off
test_script:
- bundle exec rake test
# - bundle exec rake test TESTOPTS=-v

branches:
only:
Expand Down
2 changes: 1 addition & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("msgpack", [">= 0.7.0", "< 2.0.0"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", ["~> 1.4.5"])
gem.add_runtime_dependency("serverengine", ["~> 2.0"])
gem.add_runtime_dependency("serverengine", [">= 2.0.4", "< 3.0.0"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"])
gem.add_runtime_dependency("sigdump", ["~> 0.2.2"])
gem.add_runtime_dependency("tzinfo", ["~> 1.0"])
Expand Down
219 changes: 50 additions & 169 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,30 @@
require 'yajl'
require 'digest'

require 'fluent/plugin/socket_util'
require 'fcntl'
require 'cool.io'

module Fluent::Plugin
class ForwardInput < Input
Fluent::Plugin.register_input('forward', self)

# See the wiki page below for protocol specification
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1

helpers :event_loop
helpers :server

LISTEN_PORT = 24224

desc 'The port to listen to.'
config_param :port, :integer, default: LISTEN_PORT
desc 'The bind address to listen to.'
config_param :bind, :string, default: '0.0.0.0'

config_param :backlog, :integer, default: nil
# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
desc 'The timeout time used to set linger option.'
config_param :linger_timeout, :integer, default: 0
# This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value.
config_param :blocking_timeout, :time, default: 0.5
desc 'Try to resolve hostname from IP addresses or not.'
config_param :resolve_hostname, :bool, default: nil
desc 'Connections will be disconnected right after receiving first message if this value is true.'
config_param :deny_keepalive, :bool, default: false

Expand Down Expand Up @@ -91,6 +90,15 @@ class ForwardInput < Input
def configure(conf)
super

if @source_hostname_key
# TODO: add test
if @resolve_hostname.nil?
@resolve_hostname = true
elsif !@resolve_hostname # user specifies "false" in config
raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
end
end

if @security
if @security.user_auth && @security.users.empty?
raise Fluent::ConfigError, "<user> sections required if user_auth enabled"
Expand Down Expand Up @@ -131,44 +139,35 @@ def configure(conf)
@lsock = @usock = nil
end

HEARTBEAT_UDP_PAYLOAD = "\0"

def start
super

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
server_create_connection(
:in_forward_server, @port,
bind: @bind,
shared: false,
resolve_name: @resolve_hostname,
linger_timeout: @linger_timeout,
backlog: @backlog,
&method(:handle_connection)
)

server_create(:in_forward_server_udp_heartbeat, @port, shared: false, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock|
log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data
begin
sock.write HEARTBEAT_UDP_PAYLOAD
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
log.trace "error while heartbeat response", host: sock.remote_host, error: e
end
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)

@lsock = listen(client)
event_loop_attach(@lsock)

@usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
event_loop_attach(@hbr)
end

def close
@lsock.close if @lsock
@usock.close if @usock
super
end

def listen(client)
log.info "listening fluent socket on #{@bind}:#{@port}"
sock = client.listen_tcp(@bind, @port)
s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection))
s.listen(@backlog) unless @backlog.nil?
s
end

private

def handle_connection(conn)
send_data = ->(serializer, data){ conn.write serializer.call(data) }

log.trace "connected fluent socket", address: conn.remote_addr, port: conn.remote_port
log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port
state = :established
nonce = nil
user_auth_salt = nil
Expand All @@ -182,7 +181,7 @@ def handle_connection(conn)
state = :pingpong
end

log.trace "accepted fluent socket", address: conn.remote_addr, port: conn.remote_port
log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port

read_messages(conn) do |msg, chunk_size, serializer|
case state
Expand All @@ -198,15 +197,11 @@ def handle_connection(conn)
log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
state = :established
when :established
options = on_message(msg, chunk_size, conn.remote_addr)
options = on_message(msg, chunk_size, conn.remote_host)
if options && r = response(options)
send_data.call(serializer, r)
log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
if @deny_keepalive
conn.on_write_complete do
conn.close
end
end
conn.on_write_complete{ conn.close } if @deny_keepalive
send_data.call(serializer, r)
else
if @deny_keepalive
conn.close
Expand All @@ -222,7 +217,7 @@ def read_messages(conn, &block)
feeder = nil
serializer = nil
bytes = 0
conn.on_data do |data|
conn.data do |data|
# only for first call of callback
unless feeder
first = data[0]
Expand Down Expand Up @@ -258,26 +253,26 @@ def response(option)
nil
end

def on_message(msg, chunk_size, peeraddr)
def on_message(msg, chunk_size, remote_host)
if msg.nil?
# for future TCP heartbeat_request
return
end

# TODO: raise an exception if broken chunk is generated by recoverable situation
unless msg.is_a?(Array)
log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg
log.warn "incoming chunk is broken:", host: remote_host, msg: msg
return
end

tag = msg[0]
entries = msg[1]

if @chunk_size_limit && (chunk_size > @chunk_size_limit)
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size
return
elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size
end

case entries
Expand All @@ -287,14 +282,14 @@ def on_message(msg, chunk_size, peeraddr)
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event
es = add_source_host(es, remote_host) if @source_hostname_key
router.emit_stream(tag, es)

when Array
# Forward
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, peeraddr)
check_and_skip_invalid_event(tag, entries, remote_host)
else
es = Fluent::MultiEventStream.new
entries.each { |e|
Expand All @@ -306,7 +301,7 @@ def on_message(msg, chunk_size, peeraddr)
}
es
end
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
es = add_source_host(es, remote_host) if @source_hostname_key
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -315,12 +310,12 @@ def on_message(msg, chunk_size, peeraddr)
time = msg[1]
record = msg[2]
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record
log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Fluent::Engine.now if time.to_i == 0
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
record[@source_hostname_key] = remote_host if @source_hostname_key
router.emit(tag, time, record)
option = msg[3]
end
Expand All @@ -333,11 +328,11 @@ def invalid_event?(tag, time, record)
!((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
end

def check_and_skip_invalid_event(tag, es, peeraddr)
def check_and_skip_invalid_event(tag, es, remote_host)
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
if invalid_event?(tag, time, record)
log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
next
end
new_es.add(time, record)
Expand All @@ -354,11 +349,6 @@ def add_source_host(es, host)
new_es
end

def source_message(peeraddr)
_, port, host, addr = peeraddr
"host: #{host}, addr: #{addr}, port: #{port}"
end

def select_authenticate_users(node, username)
if node.nil? || node[:users].empty?
@security.users.select{|u| u.username == username}
Expand Down Expand Up @@ -424,114 +414,5 @@ def generate_pong(auth_result, reason_or_salt, nonce, shared_key)
shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest
['PONG', true, '', @security.self_hostname, shared_key_digest_hex]
end

class Handler < Coolio::Socket
attr_reader :protocol, :remote_port, :remote_addr, :remote_host

PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"]

def initialize(io, linger_timeout, log, on_connect_callback)
super(io)

@peeraddr = nil
if io.is_a?(TCPSocket) # for unix domain socket support in the future
@peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end

### TODO: disabling name rev resolv
proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED )
if addr == '?'
port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil
end
@protocol = proto
@remote_port = port
@remote_addr = addr
@remote_host = host
@writing = false
@closing = false
@mutex = Mutex.new

@chunk_counter = 0
@on_connect_callback = on_connect_callback
@log = log
@log.trace {
begin
remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername)
rescue
remote_port = nil
remote_addr = nil
end
[ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ]
}
end

def on_connect
@on_connect_callback.call(self)
end

# API to register callback for data arrival
def on_data(&callback)
@on_read_callback = callback
end

def on_read(data)
@on_read_callback.call(data)
rescue => e
@log.error "unexpected error on reading data from client", address: @remote_addr, error: e
@log.error_backtrace
close
end

def on_write_complete
closing = @mutex.synchronize {
@writing = false
@closing
}
if closing
close
end
end

def close
writing = @mutex.synchronize {
@closing = true
@writing
}
unless writing
super
end
end
end

class HeartbeatRequestHandler < Coolio::IO
def initialize(io, callback)
super(io)
@io = io
@callback = callback
end

def on_readable
begin
msg, addr = @io.recvfrom(1024)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
return
end
host = addr[3]
port = addr[1]
@callback.call(host, port, msg)
rescue
# TODO log?
end
end

def on_heartbeat_request(host, port, msg)
#log.trace "heartbeat request from #{host}:#{port}"
begin
@usock.send "\0", 0, host, port
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
end
end
end
end
Loading