Skip to content

Commit

Permalink
migrating in_http to v0.14 API
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Nov 9, 2016
1 parent a3a8444 commit fdb4aa8
Show file tree
Hide file tree
Showing 2 changed files with 387 additions and 268 deletions.
148 changes: 83 additions & 65 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,31 @@
# limitations under the License.
#

require 'fluent/plugin/input'
require 'fluent/plugin/parser'
require 'fluent/event'
require 'fluent/process'

require 'http/parser'
require 'webrick/httputils'
require 'uri'
require 'socket'
require 'json'

require 'cool.io'

require 'fluent/input'
require 'fluent/event'
require 'fluent/process'
module Fluent::Plugin
class InHttpParser < Parser
Fluent::Plugin.register_parser('in_http', self)
def parse(text)
# this plugin is dummy implementation not to raise error
yield nil, nil
end
end

module Fluent
class HttpInput < Input
Plugin.register_input('http', self)

include DetachMultiProcessMixin
Fluent::Plugin.register_input('http', self)

require 'http/parser'

def initialize
require 'webrick/httputils'
super
end
# include DetachMultiProcessMixin
helpers :parser, :compat_parameters, :event_loop

EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")

Expand All @@ -52,25 +55,36 @@ def initialize
config_param :add_http_headers, :bool, default: false
desc 'Add REMOTE_ADDR header to the record.'
config_param :add_remote_addr, :bool, default: false
desc 'The format of the HTTP body.'
config_param :format, :string, default: 'default'
config_param :blocking_timeout, :time, default: 0.5
desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)'
config_param :cors_allow_origins, :array, default: nil
desc 'Respond with empty gif image of 1x1 pixel.'
config_param :respond_with_empty_img, :bool, default: false

config_section :parse do
config_set_default :@type, 'in_http'
end

EVENT_RECORD_PARAMETER = '_event_record'

def configure(conf)
compat_parameters_convert(conf, :parser)

super

m = if @format == 'default'
m = if @parser_configs.first['@type'] == 'in_http'
@parser_msgpack = parser_create(type: 'msgpack')
@parser_msgpack.estimate_current_event = false
@parser_json = parser_create(type: 'json')
@parser_json.estimate_current_event = false
@format_name = 'default'
method(:parse_params_default)
else
@parser = Plugin.new_parser(@format)
@parser.configure(conf)
@parser = parser_create
@format_name = @parser_configs.first['@type']
method(:parse_params_with_parser)
end
(class << self; self; end).module_eval do
self.singleton_class.module_eval do
define_method(:parse_params, m)
end
end
Expand Down Expand Up @@ -100,7 +114,11 @@ def on_timer
end

def start
log.debug "listening http on #{@bind}:#{@port}"
@_event_loop_run_timeout = @blocking_timeout

super

log.debug "listening http", bind: @bind, port: @port

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
Expand All @@ -109,38 +127,37 @@ def start
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)

detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format, log,
@cors_allow_origins)
@lsock.listen(@backlog) unless @backlog.nil?

@loop = Coolio::Loop.new
@loop.attach(@km)
@loop.attach(@lsock)

@thread = Thread.new(&method(:run))
end
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(
lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format_name, log,
@cors_allow_origins
)
@lsock.listen(@backlog) unless @backlog.nil?
event_loop_attach(@km)
event_loop_attach(@lsock)

# detach_multi_process do
# super
# @km = KeepaliveManager.new(@keepalive_timeout)
# @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
# @body_size_limit, @format, log,
# @cors_allow_origins)
# @lsock.listen(@backlog) unless @backlog.nil?

# @loop = Coolio::Loop.new
# @loop.attach(@km)
# @loop.attach(@lsock)

# @thread = Thread.new(&method(:run))
# end
end

def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
def close
@lsock.close
@thread.join

super
end

def run
@loop.run(@blocking_timeout)
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
end

def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
Expand Down Expand Up @@ -170,9 +187,9 @@ def on_request(path_info, params)
end
time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
param_time.zero? ? Fluent::Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
else
record_time.nil? ? Engine.now : record_time
record_time.nil? ? Fluent::Engine.now : record_time
end
rescue
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
Expand All @@ -182,7 +199,7 @@ def on_request(path_info, params)
begin
# Support batched requests
if record.is_a?(Array)
mes = MultiEventStream.new
mes = Fluent::MultiEventStream.new
record.each do |single_record|
if @add_http_headers
params.each_pair { |k,v|
Expand Down Expand Up @@ -215,22 +232,23 @@ def on_request(path_info, params)
private

def parse_params_default(params)
record = if msgpack = params['msgpack']
Engine.msgpack_factory.unpacker.feed(msgpack).read
elsif js = params['json']
JSON.parse(js)
else
raise "'json' or 'msgpack' parameter is required"
end
return nil, record
if msgpack = params['msgpack']
@parser_msgpack.parse(msgpack) do |_time, record|
return nil, record
end
elsif js = params['json']
@parser_json.parse(js) do |_time, record|
return nil, record
end
else
raise "'json' or 'msgpack' parameter is required"
end
end

EVENT_RECORD_PARAMETER = '_event_record'

def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@parser.parse(content) { |time, record|
raise "Received event is not #{@format}: #{content}" if record.nil?
raise "Received event is not #{@format_name}: #{content}" if record.nil?
return time, record
}
else
Expand All @@ -241,13 +259,13 @@ def parse_params_with_parser(params)
class Handler < Coolio::Socket
attr_reader :content_type

def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins)
def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins)
super(io)
@km = km
@callback = callback
@body_size_limit = body_size_limit
@next_close = false
@format = format
@format_name = format_name
@log = log
@cors_allow_origins = cors_allow_origins
@idle = 0
Expand Down Expand Up @@ -355,7 +373,7 @@ def on_message_complete
uri = URI.parse(@parser.request_url)
params = WEBrick::HTTPUtils.parse_query(uri.query)

if @format != 'default'
if @format_name != 'default'
params[EVENT_RECORD_PARAMETER] = @body
elsif @content_type =~ /^application\/x-www-form-urlencoded/
params.update WEBrick::HTTPUtils.parse_query(@body)
Expand Down
Loading

0 comments on commit fdb4aa8

Please sign in to comment.