From 1473619735e0f32a16a6eec294ae466404e62f22 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 9 Nov 2016 12:15:55 +0900 Subject: [PATCH 01/11] migrating in_http to v0.14 API --- lib/fluent/plugin/in_http.rb | 148 +++++----- test/plugin/test_in_http.rb | 507 +++++++++++++++++++++-------------- 2 files changed, 387 insertions(+), 268 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index a387370fda..e8a442a386 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -14,28 +14,31 @@ # limitations under the License. # +require 'fluent/plugin/input' +require 'fluent/plugin/parser' +require 'fluent/event' +require 'fluent/process' + +require 'http/parser' +require 'webrick/httputils' require 'uri' require 'socket' require 'json' -require 'cool.io' - -require 'fluent/input' -require 'fluent/event' -require 'fluent/process' +module Fluent::Plugin + class InHttpParser < Parser + Fluent::Plugin.register_parser('in_http', self) + def parse(text) + # this plugin is dummy implementation not to raise error + yield nil, nil + end + end -module Fluent class HttpInput < Input - Plugin.register_input('http', self) - - include DetachMultiProcessMixin + Fluent::Plugin.register_input('http', self) - require 'http/parser' - - def initialize - require 'webrick/httputils' - super - end + # include DetachMultiProcessMixin + helpers :parser, :compat_parameters, :event_loop EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8") @@ -52,25 +55,36 @@ def initialize config_param :add_http_headers, :bool, default: false desc 'Add REMOTE_ADDR header to the record.' config_param :add_remote_addr, :bool, default: false - desc 'The format of the HTTP body.' - config_param :format, :string, default: 'default' config_param :blocking_timeout, :time, default: 0.5 desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)' config_param :cors_allow_origins, :array, default: nil desc 'Respond with empty gif image of 1x1 pixel.' config_param :respond_with_empty_img, :bool, default: false + config_section :parse do + config_set_default :@type, 'in_http' + end + + EVENT_RECORD_PARAMETER = '_event_record' + def configure(conf) + compat_parameters_convert(conf, :parser) + super - m = if @format == 'default' + m = if @parser_configs.first['@type'] == 'in_http' + @parser_msgpack = parser_create(type: 'msgpack') + @parser_msgpack.estimate_current_event = false + @parser_json = parser_create(type: 'json') + @parser_json.estimate_current_event = false + @format_name = 'default' method(:parse_params_default) else - @parser = Plugin.new_parser(@format) - @parser.configure(conf) + @parser = parser_create + @format_name = @parser_configs.first['@type'] method(:parse_params_with_parser) end - (class << self; self; end).module_eval do + self.singleton_class.module_eval do define_method(:parse_params, m) end end @@ -100,7 +114,11 @@ def on_timer end def start - log.debug "listening http on #{@bind}:#{@port}" + @_event_loop_run_timeout = @blocking_timeout + + super + + log.debug "listening http", bind: @bind, port: @port socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? @@ -109,38 +127,37 @@ def start client = ServerEngine::SocketManager::Client.new(socket_manager_path) lsock = client.listen_tcp(@bind, @port) - detach_multi_process do - super - @km = KeepaliveManager.new(@keepalive_timeout) - @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), - @body_size_limit, @format, log, - @cors_allow_origins) - @lsock.listen(@backlog) unless @backlog.nil? - - @loop = Coolio::Loop.new - @loop.attach(@km) - @loop.attach(@lsock) - - @thread = Thread.new(&method(:run)) - end + @km = KeepaliveManager.new(@keepalive_timeout) + @lsock = Coolio::TCPServer.new( + lsock, nil, Handler, @km, method(:on_request), + @body_size_limit, @format_name, log, + @cors_allow_origins + ) + @lsock.listen(@backlog) unless @backlog.nil? + event_loop_attach(@km) + event_loop_attach(@lsock) + + # detach_multi_process do + # super + # @km = KeepaliveManager.new(@keepalive_timeout) + # @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), + # @body_size_limit, @format, log, + # @cors_allow_origins) + # @lsock.listen(@backlog) unless @backlog.nil? + + # @loop = Coolio::Loop.new + # @loop.attach(@km) + # @loop.attach(@lsock) + + # @thread = Thread.new(&method(:run)) + # end end - def shutdown - @loop.watchers.each {|w| w.detach } - @loop.stop + def close @lsock.close - @thread.join - super end - def run - @loop.run(@blocking_timeout) - rescue - log.error "unexpected error", error: $!.to_s - log.error_backtrace - end - def on_request(path_info, params) begin path = path_info[1..-1] # remove / @@ -170,9 +187,9 @@ def on_request(path_info, params) end time = if param_time = params['time'] param_time = param_time.to_f - param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time)) + param_time.zero? ? Fluent::Engine.now : Fluent::EventTime.from_time(Time.at(param_time)) else - record_time.nil? ? Engine.now : record_time + record_time.nil? ? Fluent::Engine.now : record_time end rescue return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"] @@ -182,7 +199,7 @@ def on_request(path_info, params) begin # Support batched requests if record.is_a?(Array) - mes = MultiEventStream.new + mes = Fluent::MultiEventStream.new record.each do |single_record| if @add_http_headers params.each_pair { |k,v| @@ -215,22 +232,23 @@ def on_request(path_info, params) private def parse_params_default(params) - record = if msgpack = params['msgpack'] - Engine.msgpack_factory.unpacker.feed(msgpack).read - elsif js = params['json'] - JSON.parse(js) - else - raise "'json' or 'msgpack' parameter is required" - end - return nil, record + if msgpack = params['msgpack'] + @parser_msgpack.parse(msgpack) do |_time, record| + return nil, record + end + elsif js = params['json'] + @parser_json.parse(js) do |_time, record| + return nil, record + end + else + raise "'json' or 'msgpack' parameter is required" + end end - EVENT_RECORD_PARAMETER = '_event_record' - def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] @parser.parse(content) { |time, record| - raise "Received event is not #{@format}: #{content}" if record.nil? + raise "Received event is not #{@format_name}: #{content}" if record.nil? return time, record } else @@ -241,13 +259,13 @@ def parse_params_with_parser(params) class Handler < Coolio::Socket attr_reader :content_type - def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins) + def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins) super(io) @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false - @format = format + @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @idle = 0 @@ -355,7 +373,7 @@ def on_message_complete uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) - if @format != 'default' + if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif @content_type =~ /^application\/x-www-form-urlencoded/ params.update WEBrick::HTTPUtils.parse_query(@body) diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 7d95714a54..d79ed2fceb 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -1,7 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_http' require 'net/http' +require 'timecop' class HttpInputTest < Test::Unit::TestCase class << self @@ -20,6 +21,10 @@ def setup Fluent::Test.setup end + def teardown + Timecop.return + end + PORT = unused_port CONFIG = %[ port #{PORT} @@ -30,7 +35,7 @@ def setup ] def create_driver(conf=CONFIG) - Fluent::Test::InputTestDriver.new(Fluent::HttpInput).configure(conf, true) + Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpInput).configure(conf) end def test_configure @@ -44,228 +49,293 @@ def test_configure def test_time d = create_driver + time = event_time("2011-01-02 13:14:15.123 UTC") + Timecop.freeze(Time.at(time)) - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - Fluent::Engine.now = time - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + events = [ + ["tag1", time, {"a" => 1}], + ["tag2", time, {"a" => 2}], + ] + res_codes = [] - d.run do - d.expected_emits.each {|tag,_time,record| + d.run(expect_records: 2) do + events.each do |tag, _time, record| res = post("/#{tag}", {"json"=>record.to_json}) - assert_equal "200", res.code - } + res_codes << res.conde + end end + + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_time_as_float d = create_driver - - float_time = Time.parse("2011-01-02 13:14:15.123 UTC").to_f - time = Fluent::EventTime.from_time(Time.at(float_time)) - - d.expect_emit "tag1", time, {"a"=>1} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15.123 UTC") + float_time = time.to_f + + events = [ + ["tag1", float_time, {"a"=>1}], + ] + res_codes = [] + + d.run(expect_records: 1) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s}) + res_codes << res.code + end end + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] end def test_json d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s}) + res_codes << res.code + end end - - d.emit_streams.each { |tag, es| - assert !include_http_header?(es.first[1]) - } + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - events = [{"a"=>1},{"a"=>2}] - tag = "tag1" - - events.each { |ev| - d.expect_emit tag, time, ev - } - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time_i, records[0]], + ["tag1", time_i, records[1]], + ] + res_codes = [] + d.run(expect_records: 2) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_json_with_add_remote_addr d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1} - d.expect_emit "tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1}], + ["tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2}], + ] + res_codes = [] + d.run(expect_records: 2) do + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code + end end - + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_remote_addr d = create_driver(CONFIG + "add_remote_addr true") - - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] - tag = "tag1" - - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1} - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2} - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, records[0]], + ["tag1", time, records[1]], + ] + res_codes = [] + + d.run(expect_records: 2) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - d.expect_emit "tag2", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], + ["tag2", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) + res_codes << res.code + end end - + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] - tag = "tag1" - - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2} - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) - assert_equal "200", res.code + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) + res_codes << res.code end - + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") - - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] - tag = "tag1" - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, records[0]], + ["tag1", time, records[1]], + ] + res_codes = [] + + d.run(expect_records: 2) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end - - d.emit_streams.each { |_tag, es| - assert include_http_header?(es.first[1]) - } + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - records = [["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}]] - - d.run do - records.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] - d.emit_streams.each { |tag, es| - assert include_http_header?(es.first[1]) - } + assert include_http_header?(d.events[0][2]) + assert include_http_header?(d.events[1][2]) end def test_application_json d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}?time=#{_time.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}?time=#{time_i.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_msgpack d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>time_i.to_s}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_msgpack d = create_driver - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - events = [{"a"=>1},{"a"=>2}] - tag = "tag1" - - events.each { |ev| - d.expect_emit tag, time, ev - } - - d.run do - res = post("/#{tag}", {"msgpack"=>events.to_msgpack, "time"=>time.to_s}) - assert_equal "200", res.code + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, records[0]], + ["tag1", time, records[1]], + ] + res_codes = [] + d.run(expect_records: 2) do + res = post("/#{tag}", {"msgpack"=>events.to_msgpack, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_with_regexp @@ -274,20 +344,27 @@ def test_with_regexp types field_1:integer ]) - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"field_1" => 1, "field_2" => 'str'} - d.expect_emit "tag2", time, {"field_1" => 2, "field_2" => 'str'} + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"field_1" => 1, "field_2" => 'str'}], + ["tag2", time, {"field_1" => 2, "field_2" => 'str'}], + ] + res_codes = [] - d.run do - d.expected_emits.each { |tag, _time, record| + d.run(expect_records: 2) do + events.each do |tag, t, record| body = record.map { |k, v| v.to_s }.join(':') - res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'application/octet-stream'}) - assert_equal "200", res.code - } + res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'application/octet-stream'}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_with_csv @@ -297,72 +374,96 @@ def test_with_csv format csv keys foo,bar ]) - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"foo" => "1", "bar" => 'st"r'} - d.expect_emit "tag2", time, {"foo" => "2", "bar" => 'str'} - - d.run do - d.expected_emits.each { |tag, _time, record| + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"foo" => "1", "bar" => 'st"r'}], + ["tag2", time, {"foo" => "2", "bar" => 'str'}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| body = record.map { |k, v| v }.to_csv - res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) - assert_equal "200", res.code - } + res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_resonse_with_empty_img d = create_driver(CONFIG + "respond_with_empty_img true") assert_equal true, d.instance.respond_with_empty_img - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_bodies = [] d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code # Ruby returns ASCII-8 encoded string for GIF. - assert_equal Fluent::HttpInput::EMPTY_GIF_IMAGE, res.body.force_encoding("UTF-8") - } + res_bodies << res.body.force_encoding("UTF-8") + end end + assert_equal ["200", "200"], res_codes + assert_equal [Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE, Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE], res_bodies + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_cors_allowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>1} + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_headers = [] d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}, {"Origin"=>"http://foo.com"}) - assert_equal "200", res.code - assert_equal "http://foo.com", res["Access-Control-Allow-Origin"] - } + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://foo.com"}) + res_codes << res.code + res_headers << res["Access-Control-Allow-Origin"] + end end + assert_equal ["200", "200"], res_codes + assert_equal ["http://foo.com", "http://foo.com"], res_headers + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_cors_disallowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + res_codes = [] - d.expected_emits_length = 0 + d.end_if{ res_codes.size == 2 } d.run do - res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time.to_s}, {"Origin"=>"http://bar.com"}) - assert_equal "403", res.code - - res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time.to_s}, {"Origin"=>"http://bar.com"}) - assert_equal "403", res.code + res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) end + assert_equal ["403", "403"], res_codes end $test_in_http_connection_object_ids = [] @@ -388,7 +489,7 @@ def on_message_begin end end - class Fluent::HttpInput::Handler + class Fluent::Plugin::HttpInput::Handler prepend ContentTypeHook end From 31a559d8aa3a7a88e109e79457cef9401eefd1b6 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 16:49:47 +0900 Subject: [PATCH 02/11] add #to_f: without this def, time.to_f is equal to time.to_int.to_f --- lib/fluent/time.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb index 08c0e3ded8..1c22eaf361 100644 --- a/lib/fluent/time.rb +++ b/lib/fluent/time.rb @@ -50,6 +50,10 @@ def to_int @sec end + def to_f + @sec + @nsec / 1_000_000_000.0 + end + # for Time.at def to_r Rational(@sec * 1_000_000_000 + @nsec, 1_000_000_000) From 74be7bdff3fef8ddcaf9e9b54700e13edbaed9cd Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 16:53:45 +0900 Subject: [PATCH 03/11] convert regexp "format" into regexp parser explicitly Without this change, Plugin.new_parser will instantiate Fluent::TextParser instance. It is a kind of Fluent::Compat::Parser, and will convert configuration for it (twice). This double conversion breaks value modification of "types" parameter. --- lib/fluent/plugin_helper/compat_parameters.rb | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 1c9de44854..ba84a931c3 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -48,7 +48,7 @@ module CompatParameters } PARSER_PARAMS = { - "format" => "@type", + "format" => nil, "types" => nil, "types_delimiter" => nil, "types_label_delimiter" => nil, @@ -253,6 +253,15 @@ def compat_parameters_parser(conf) # TODO: warn obsolete parameters if these are deprecated hash = compat_parameters_copy_to_subsection_attributes(conf, PARSER_PARAMS) + if conf["format"] + if conf["format"].start_with?("/") && conf["format"].end_with?("/") + hash["@type"] = "regexp" + hash["expression"] = conf["format"] + else + hash["@type"] = conf["format"] + end + end + if conf["types"] delimiter = conf["types_delimiter"] || ',' label_delimiter = conf["types_label_delimiter"] || ':' From 100e5999c619ae4b79b1c1e694cd4a603db7469b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 16:59:54 +0900 Subject: [PATCH 04/11] check record is a Hash or not (nil? array?) --- lib/fluent/plugin/parser.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/parser.rb b/lib/fluent/plugin/parser.rb index e03fcd2daa..66109fa215 100644 --- a/lib/fluent/plugin/parser.rb +++ b/lib/fluent/plugin/parser.rb @@ -96,7 +96,7 @@ def parse_partial_data(data, &block) end def parse_time(record) - if @time_key && record.has_key?(@time_key) + if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key) src = if @keep_time_key record[@time_key] else From 6a5490cc3776f955ec21c3f8be03e7842d053ce5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 17:00:47 +0900 Subject: [PATCH 05/11] fix to use NumericTimeParser to parse float time (without error of floating point value) --- lib/fluent/plugin/in_http.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index e8a442a386..7ee1cd8a73 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -137,6 +137,8 @@ def start event_loop_attach(@km) event_loop_attach(@lsock) + @float_time_parser = Fluent::NumericTimeParser.new(:float) + # detach_multi_process do # super # @km = KeepaliveManager.new(@keepalive_timeout) @@ -187,7 +189,7 @@ def on_request(path_info, params) end time = if param_time = params['time'] param_time = param_time.to_f - param_time.zero? ? Fluent::Engine.now : Fluent::EventTime.from_time(Time.at(param_time)) + param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time) else record_time.nil? ? Fluent::Engine.now : record_time end From 27be8bce0c735c2cb7c83fd4be18ca27adc2f6b7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 17:01:18 +0900 Subject: [PATCH 06/11] fix wrong test codes --- test/plugin/test_in_http.rb | 58 ++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index d79ed2fceb..8372947d8b 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -61,7 +61,7 @@ def test_time d.run(expect_records: 2) do events.each do |tag, _time, record| res = post("/#{tag}", {"json"=>record.to_json}) - res_codes << res.conde + res_codes << res.code end end @@ -77,13 +77,13 @@ def test_time_as_float float_time = time.to_f events = [ - ["tag1", float_time, {"a"=>1}], + ["tag1", time, {"a"=>1}], ] res_codes = [] d.run(expect_records: 1) do events.each do |tag, t, record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s}) + res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s}) res_codes << res.code end end @@ -125,8 +125,9 @@ def test_multi_json ["tag1", time_i, records[0]], ["tag1", time_i, records[1]], ] + tag = "tag1" res_codes = [] - d.run(expect_records: 2) do + d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) res_codes << res.code end @@ -164,20 +165,23 @@ def test_multi_json_with_add_remote_addr time_i = time.to_i records = [{"a"=>1},{"a"=>2}] - events = [ - ["tag1", time, records[0]], - ["tag1", time, records[1]], - ] + tag = "tag1" res_codes = [] - d.run(expect_records: 2) do + d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes - assert_equal events, d.events + + assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] + assert{ d.events[0][2].has_key?("REMOTE_ADDR") && d.events[0][2]["REMOTE_ADDR"] =~ /^\d{1,4}(\.\d{1,4}){3}$/ } + + assert_equal "tag1", d.events[1][0] assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] end def test_json_with_add_remote_addr_given_x_forwarded_for @@ -213,13 +217,14 @@ def test_multi_json_with_add_remote_addr_given_x_forwarded_for ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}], ] + tag = "tag1" res_codes = [] - d.run(expect_records: 2) do + d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) res_codes << res.code end - assert_equal ["200", "200"], res_codes + assert_equal ["200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] @@ -230,20 +235,25 @@ def test_multi_json_with_add_http_headers time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i records = [{"a"=>1},{"a"=>2}] - events = [ - ["tag1", time, records[0]], - ["tag1", time, records[1]], - ] + tag = "tag1" res_codes = [] - d.run(expect_records: 2) do + d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes - assert_equal events, d.events + + assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] + + assert_equal "tag1", d.events[1][0] assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] + + assert include_http_header?(d.events[0][2]) + assert include_http_header?(d.events[1][2]) end def test_json_with_add_http_headers @@ -263,9 +273,14 @@ def test_json_with_add_http_headers end end assert_equal ["200", "200"], res_codes - assert_equal events, d.events + + assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] + + assert_equal "tag2", d.events[1][0] assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] assert include_http_header?(d.events[0][2]) assert include_http_header?(d.events[1][2]) @@ -327,9 +342,10 @@ def test_multi_msgpack ["tag1", time, records[0]], ["tag1", time, records[1]], ] + tag = "tag1" res_codes = [] d.run(expect_records: 2) do - res = post("/#{tag}", {"msgpack"=>events.to_msgpack, "time"=>time_i.to_s}) + res = post("/#{tag}", {"msgpack"=>records.to_msgpack, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes @@ -461,7 +477,9 @@ def test_cors_disallowed d.end_if{ res_codes.size == 2 } d.run do res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res_codes << res.code res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res_codes << res.code end assert_equal ["403", "403"], res_codes end From f82ee53819762acdb3c9262bde976a622213b180 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 17:21:46 +0900 Subject: [PATCH 07/11] remove fluent/process from code --- lib/fluent/plugin/in_http.rb | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 7ee1cd8a73..b6913c93af 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -17,7 +17,6 @@ require 'fluent/plugin/input' require 'fluent/plugin/parser' require 'fluent/event' -require 'fluent/process' require 'http/parser' require 'webrick/httputils' @@ -37,7 +36,6 @@ def parse(text) class HttpInput < Input Fluent::Plugin.register_input('http', self) - # include DetachMultiProcessMixin helpers :parser, :compat_parameters, :event_loop EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8") @@ -138,21 +136,6 @@ def start event_loop_attach(@lsock) @float_time_parser = Fluent::NumericTimeParser.new(:float) - - # detach_multi_process do - # super - # @km = KeepaliveManager.new(@keepalive_timeout) - # @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), - # @body_size_limit, @format, log, - # @cors_allow_origins) - # @lsock.listen(@backlog) unless @backlog.nil? - - # @loop = Coolio::Loop.new - # @loop.attach(@km) - # @loop.attach(@lsock) - - # @thread = Thread.new(&method(:run)) - # end end def close From f87305cf8c644efec9e19a94eac174116646338f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 19:03:16 +0900 Subject: [PATCH 08/11] Fix to set time_type=string if "format" specifier is used for regular expression directly. It's because old TextParser estimates to parse "time" field without any time_format (using Ruby built-in formats). --- lib/fluent/plugin_helper/compat_parameters.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index ba84a931c3..81864b4457 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -217,7 +217,7 @@ def compat_parameters_inject(conf) def compat_parameters_extract(conf) return unless conf.elements('extract').empty? - return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } + return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } && !conf.has_key?('format') # TODO: warn obsolete parameters if these are deprecated hash = compat_parameters_copy_to_subsection_attributes(conf, EXTRACT_PARAMS) @@ -225,6 +225,9 @@ def compat_parameters_extract(conf) if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) hash['time_key'] ||= 'time' hash['time_type'] = 'unixtime' + elsif conf.has_key?('format') && conf["format"].start_with?("/") && conf["format"].end_with?("/") # old-style regexp parser + hash['time_key'] ||= 'time' + hash['time_type'] ||= 'string' end if conf.has_key?('localtime') || conf.has_key?('utc') if conf.has_key?('localtime') && conf.has_key?('utc') From 6f5a2bf78b6157ee808ca8eb88fdb4f69b1a3be8 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 11 Nov 2016 19:05:07 +0900 Subject: [PATCH 09/11] fix to set just expression as expression --- lib/fluent/plugin_helper/compat_parameters.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 81864b4457..5daa4a7190 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -259,7 +259,7 @@ def compat_parameters_parser(conf) if conf["format"] if conf["format"].start_with?("/") && conf["format"].end_with?("/") hash["@type"] = "regexp" - hash["expression"] = conf["format"] + hash["expression"] = conf["format"][1..-2] else hash["@type"] = conf["format"] end From 9d0e80aa637365706e8d513b5433d8db0f8aacee Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 14 Nov 2016 13:05:11 +0900 Subject: [PATCH 10/11] fix not to overwrite default usage "" --- lib/fluent/plugin/in_http.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index b6913c93af..1ef34f131d 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -71,9 +71,9 @@ def configure(conf) super m = if @parser_configs.first['@type'] == 'in_http' - @parser_msgpack = parser_create(type: 'msgpack') + @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') @parser_msgpack.estimate_current_event = false - @parser_json = parser_create(type: 'json') + @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json') @parser_json.estimate_current_event = false @format_name = 'default' method(:parse_params_default) From 2b9bb14e7a86127ae8cc73fc752882b0be74e286 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 14 Nov 2016 13:05:34 +0900 Subject: [PATCH 11/11] remove expected REMOTE_ADDR field from records to be posted --- test/plugin/test_in_http.rb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 8372947d8b..b4536b71c4 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -190,8 +190,8 @@ def test_json_with_add_remote_addr_given_x_forwarded_for time_i = time.to_i events = [ - ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], - ["tag2", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], ] res_codes = [] @@ -202,9 +202,14 @@ def test_json_with_add_remote_addr_given_x_forwarded_for end end assert_equal ["200", "200"], res_codes - assert_equal events, d.events + + assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] + assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}, d.events[0][2]) + + assert_equal "tag2", d.events[1][0] assert_equal_event_time time, d.events[1][1] + assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}, d.events[1][2]) end def test_multi_json_with_add_remote_addr_given_x_forwarded_for