Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate forward plugin to v0.14 api #1306

Merged
merged 10 commits into from
Nov 9, 2016
130 changes: 25 additions & 105 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@
# limitations under the License.
#

require 'fcntl'

require 'cool.io'
require 'fluent/plugin/input'
require 'fluent/msgpack_factory'
require 'yajl'
require 'digest'

require 'fluent/input'
require 'fluent/plugin/socket_util'
require 'fcntl'
require 'cool.io'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool.io is required by event_loop helper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wait. This commit doesn't include socket helpers. Ignore this comment.


module Fluent
module Fluent::Plugin
class ForwardInput < Input
Plugin.register_input('forward', self)
Fluent::Plugin.register_input('forward', self)

LISTEN_PORT = 24224
# See the wiki page below for protocol specification
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1

def initialize
super
require 'fluent/plugin/socket_util'
end
helpers :event_loop

LISTEN_PORT = 24224

desc 'The port to listen to.'
config_param :port, :integer, default: LISTEN_PORT
Expand Down Expand Up @@ -125,46 +128,30 @@ def configure(conf)
})
end
end
@lsock = @usock = nil
end

def start
super

@loop = Coolio::Loop.new

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)

@lsock = listen(client)
@loop.attach(@lsock)
event_loop_attach(@lsock)

@usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
@loop.attach(@hbr)

@thread = Thread.new(&method(:run))
event_loop_attach(@hbr)
end

def shutdown
# In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached.
# In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached
# and therfore RuntimeError occurs saying that it is not attached to a loop.
# It occurs only when testing for sending responses to ForwardOutput.
# Sending responses needs to write the socket that is previously used only to read
# and a handler has 2 watchers that is used to read and to write.
# This problem occurs possibly because those watchers are thought to be related to each other
# and when detaching one of them the other is also detached for some reasons.
# As a workaround, check if watchers are attached before detaching them.
@loop.watchers.each {|w| w.detach if w.attached? }
@loop.stop
@usock.close
@thread.join
@lsock.close

def stop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop is called before shutdown, right?
What happens when close socket before shutdown/close event loop?
Is it safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's mistake and it should be done in #close. I'll fix it.

@lsock.close if @lsock
@usock.close if @usock
super
end

Expand All @@ -176,23 +163,6 @@ def listen(client)
s
end

#config_param :path, :string, :default => DEFAULT_SOCKET_PATH
#def listen
# if File.exist?(@path)
# File.unlink(@path)
# end
# FileUtils.mkdir_p File.dirname(@path)
# log.debug "listening fluent socket on #{@path}"
# Coolio::UNIXServer.new(@path, Handler, method(:on_message))
#end

def run
@loop.run(@blocking_timeout)
rescue => e
log.error "unexpected error", error: e
log.error_backtrace
end

private

def handle_connection(conn)
Expand Down Expand Up @@ -288,29 +258,6 @@ def response(option)
nil
end

# message Entry {
# 1: long time
# 2: object record
# }
#
# message Forward {
# 1: string tag
# 2: list<Entry> entries
# 3: object option (optional)
# }
#
# message PackedForward {
# 1: string tag
# 2: raw entries # msgpack stream of Entry
# 3: object option (optional)
# }
#
# message Message {
# 1: string tag
# 2: long? time
# 3: object record
# 4: object option (optional)
# }
def on_message(msg, chunk_size, peeraddr)
if msg.nil?
# for future TCP heartbeat_request
Expand Down Expand Up @@ -338,7 +285,7 @@ def on_message(msg, chunk_size, peeraddr)
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? CompressedMessagePackEventStream : MessagePackEventStream
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
Expand All @@ -349,12 +296,12 @@ def on_message(msg, chunk_size, peeraddr)
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, peeraddr)
else
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
entries.each { |e|
record = e[1]
next if record.nil?
time = e[0]
time = (now ||= Engine.now) if time.to_i == 0
time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
es.add(time, record)
}
es
Expand All @@ -372,7 +319,7 @@ def on_message(msg, chunk_size, peeraddr)
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Engine.now if time.to_i == 0
time = Fluent::Engine.now if time.to_i == 0
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
router.emit(tag, time, record)
option = msg[3]
Expand All @@ -387,7 +334,7 @@ def invalid_event?(tag, time, record)
end

def check_and_skip_invalid_event(tag, es, peeraddr)
new_es = MultiEventStream.new
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
if invalid_event?(tag, time, record)
log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
Expand All @@ -399,7 +346,7 @@ def check_and_skip_invalid_event(tag, es, peeraddr)
end

def add_source_host(es, host)
new_es = MultiEventStream.new
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
record[@source_hostname_key] = host
new_es.add(time, record)
Expand Down Expand Up @@ -430,33 +377,6 @@ def generate_helo(nonce, user_auth_salt)
['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}]
end

##### Authentication Handshake
#
# 1. (client) connect to server
# * Socket handshake, checks certificate and its significate (in client, if using SSL)
# 2. (server)
# * check network/domain acl (if enabled)
# * disconnect when failed
# 3. (server) send HELO
# * ['HELO', options(hash)]
# * options:
# * nonce: string (required)
# * auth: string or blank_string (string: authentication required, and its salt is this value)
# 4. (client) send PING
# * ['PING', selfhostname, sharedkey_salt, sha512_hex(sharedkey_salt + selfhostname + nonce + sharedkey), username || '', sha512_hex(auth_salt + username + password) || '']
# 5. (server) check PING
# * check sharedkey
# * check username / password (if required)
# * send PONG FAILURE if failed
# * ['PONG', false, 'reason of authentication failure', '', '']
# 6. (server) send PONG
# * ['PONG', bool(authentication result), 'reason if authentication failed', selfhostname, sha512_hex(salt + selfhostname + nonce + sharedkey)]
# 7. (client) check PONG
# * check sharedkey
# * disconnect when failed
# 8. connection established
# * send data from client

def check_ping(message, remote_addr, user_auth_salt, nonce)
log.debug "checking ping"
# ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
Expand Down
Loading