diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index a387370fda..1ef34f131d 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -14,28 +14,29 @@ # limitations under the License. # +require 'fluent/plugin/input' +require 'fluent/plugin/parser' +require 'fluent/event' + +require 'http/parser' +require 'webrick/httputils' require 'uri' require 'socket' require 'json' -require 'cool.io' - -require 'fluent/input' -require 'fluent/event' -require 'fluent/process' +module Fluent::Plugin + class InHttpParser < Parser + Fluent::Plugin.register_parser('in_http', self) + def parse(text) + # this plugin is dummy implementation not to raise error + yield nil, nil + end + end -module Fluent class HttpInput < Input - Plugin.register_input('http', self) - - include DetachMultiProcessMixin - - require 'http/parser' + Fluent::Plugin.register_input('http', self) - def initialize - require 'webrick/httputils' - super - end + helpers :parser, :compat_parameters, :event_loop EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8") @@ -52,25 +53,36 @@ def initialize config_param :add_http_headers, :bool, default: false desc 'Add REMOTE_ADDR header to the record.' config_param :add_remote_addr, :bool, default: false - desc 'The format of the HTTP body.' - config_param :format, :string, default: 'default' config_param :blocking_timeout, :time, default: 0.5 desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)' config_param :cors_allow_origins, :array, default: nil desc 'Respond with empty gif image of 1x1 pixel.' config_param :respond_with_empty_img, :bool, default: false + config_section :parse do + config_set_default :@type, 'in_http' + end + + EVENT_RECORD_PARAMETER = '_event_record' + def configure(conf) + compat_parameters_convert(conf, :parser) + super - m = if @format == 'default' + m = if @parser_configs.first['@type'] == 'in_http' + @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') + @parser_msgpack.estimate_current_event = false + @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json') + @parser_json.estimate_current_event = false + @format_name = 'default' method(:parse_params_default) else - @parser = Plugin.new_parser(@format) - @parser.configure(conf) + @parser = parser_create + @format_name = @parser_configs.first['@type'] method(:parse_params_with_parser) end - (class << self; self; end).module_eval do + self.singleton_class.module_eval do define_method(:parse_params, m) end end @@ -100,7 +112,11 @@ def on_timer end def start - log.debug "listening http on #{@bind}:#{@port}" + @_event_loop_run_timeout = @blocking_timeout + + super + + log.debug "listening http", bind: @bind, port: @port socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? @@ -109,38 +125,24 @@ def start client = ServerEngine::SocketManager::Client.new(socket_manager_path) lsock = client.listen_tcp(@bind, @port) - detach_multi_process do - super - @km = KeepaliveManager.new(@keepalive_timeout) - @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), - @body_size_limit, @format, log, - @cors_allow_origins) - @lsock.listen(@backlog) unless @backlog.nil? - - @loop = Coolio::Loop.new - @loop.attach(@km) - @loop.attach(@lsock) - - @thread = Thread.new(&method(:run)) - end + @km = KeepaliveManager.new(@keepalive_timeout) + @lsock = Coolio::TCPServer.new( + lsock, nil, Handler, @km, method(:on_request), + @body_size_limit, @format_name, log, + @cors_allow_origins + ) + @lsock.listen(@backlog) unless @backlog.nil? + event_loop_attach(@km) + event_loop_attach(@lsock) + + @float_time_parser = Fluent::NumericTimeParser.new(:float) end - def shutdown - @loop.watchers.each {|w| w.detach } - @loop.stop + def close @lsock.close - @thread.join - super end - def run - @loop.run(@blocking_timeout) - rescue - log.error "unexpected error", error: $!.to_s - log.error_backtrace - end - def on_request(path_info, params) begin path = path_info[1..-1] # remove / @@ -170,9 +172,9 @@ def on_request(path_info, params) end time = if param_time = params['time'] param_time = param_time.to_f - param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time)) + param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time) else - record_time.nil? ? Engine.now : record_time + record_time.nil? ? Fluent::Engine.now : record_time end rescue return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"] @@ -182,7 +184,7 @@ def on_request(path_info, params) begin # Support batched requests if record.is_a?(Array) - mes = MultiEventStream.new + mes = Fluent::MultiEventStream.new record.each do |single_record| if @add_http_headers params.each_pair { |k,v| @@ -215,22 +217,23 @@ def on_request(path_info, params) private def parse_params_default(params) - record = if msgpack = params['msgpack'] - Engine.msgpack_factory.unpacker.feed(msgpack).read - elsif js = params['json'] - JSON.parse(js) - else - raise "'json' or 'msgpack' parameter is required" - end - return nil, record + if msgpack = params['msgpack'] + @parser_msgpack.parse(msgpack) do |_time, record| + return nil, record + end + elsif js = params['json'] + @parser_json.parse(js) do |_time, record| + return nil, record + end + else + raise "'json' or 'msgpack' parameter is required" + end end - EVENT_RECORD_PARAMETER = '_event_record' - def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] @parser.parse(content) { |time, record| - raise "Received event is not #{@format}: #{content}" if record.nil? + raise "Received event is not #{@format_name}: #{content}" if record.nil? return time, record } else @@ -241,13 +244,13 @@ def parse_params_with_parser(params) class Handler < Coolio::Socket attr_reader :content_type - def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins) + def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins) super(io) @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false - @format = format + @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @idle = 0 @@ -355,7 +358,7 @@ def on_message_complete uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) - if @format != 'default' + if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif @content_type =~ /^application\/x-www-form-urlencoded/ params.update WEBrick::HTTPUtils.parse_query(@body) diff --git a/lib/fluent/plugin/parser.rb b/lib/fluent/plugin/parser.rb index e03fcd2daa..66109fa215 100644 --- a/lib/fluent/plugin/parser.rb +++ b/lib/fluent/plugin/parser.rb @@ -96,7 +96,7 @@ def parse_partial_data(data, &block) end def parse_time(record) - if @time_key && record.has_key?(@time_key) + if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key) src = if @keep_time_key record[@time_key] else diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 1c9de44854..5daa4a7190 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -48,7 +48,7 @@ module CompatParameters } PARSER_PARAMS = { - "format" => "@type", + "format" => nil, "types" => nil, "types_delimiter" => nil, "types_label_delimiter" => nil, @@ -217,7 +217,7 @@ def compat_parameters_inject(conf) def compat_parameters_extract(conf) return unless conf.elements('extract').empty? - return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } + return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } && !conf.has_key?('format') # TODO: warn obsolete parameters if these are deprecated hash = compat_parameters_copy_to_subsection_attributes(conf, EXTRACT_PARAMS) @@ -225,6 +225,9 @@ def compat_parameters_extract(conf) if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) hash['time_key'] ||= 'time' hash['time_type'] = 'unixtime' + elsif conf.has_key?('format') && conf["format"].start_with?("/") && conf["format"].end_with?("/") # old-style regexp parser + hash['time_key'] ||= 'time' + hash['time_type'] ||= 'string' end if conf.has_key?('localtime') || conf.has_key?('utc') if conf.has_key?('localtime') && conf.has_key?('utc') @@ -253,6 +256,15 @@ def compat_parameters_parser(conf) # TODO: warn obsolete parameters if these are deprecated hash = compat_parameters_copy_to_subsection_attributes(conf, PARSER_PARAMS) + if conf["format"] + if conf["format"].start_with?("/") && conf["format"].end_with?("/") + hash["@type"] = "regexp" + hash["expression"] = conf["format"][1..-2] + else + hash["@type"] = conf["format"] + end + end + if conf["types"] delimiter = conf["types_delimiter"] || ',' label_delimiter = conf["types_label_delimiter"] || ':' diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb index 08c0e3ded8..1c22eaf361 100644 --- a/lib/fluent/time.rb +++ b/lib/fluent/time.rb @@ -50,6 +50,10 @@ def to_int @sec end + def to_f + @sec + @nsec / 1_000_000_000.0 + end + # for Time.at def to_r Rational(@sec * 1_000_000_000 + @nsec, 1_000_000_000) diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 7d95714a54..b4536b71c4 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -1,7 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_http' require 'net/http' +require 'timecop' class HttpInputTest < Test::Unit::TestCase class << self @@ -20,6 +21,10 @@ def setup Fluent::Test.setup end + def teardown + Timecop.return + end + PORT = unused_port CONFIG = %[ port #{PORT} @@ -30,7 +35,7 @@ def setup ] def create_driver(conf=CONFIG) - Fluent::Test::InputTestDriver.new(Fluent::HttpInput).configure(conf, true) + Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpInput).configure(conf) end def test_configure @@ -44,228 +49,314 @@ def test_configure def test_time d = create_driver + time = event_time("2011-01-02 13:14:15.123 UTC") + Timecop.freeze(Time.at(time)) - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - Fluent::Engine.now = time - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + events = [ + ["tag1", time, {"a" => 1}], + ["tag2", time, {"a" => 2}], + ] + res_codes = [] - d.run do - d.expected_emits.each {|tag,_time,record| + d.run(expect_records: 2) do + events.each do |tag, _time, record| res = post("/#{tag}", {"json"=>record.to_json}) - assert_equal "200", res.code - } + res_codes << res.code + end end + + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_time_as_float d = create_driver + time = event_time("2011-01-02 13:14:15.123 UTC") + float_time = time.to_f - float_time = Time.parse("2011-01-02 13:14:15.123 UTC").to_f - time = Fluent::EventTime.from_time(Time.at(float_time)) - - d.expect_emit "tag1", time, {"a"=>1} + events = [ + ["tag1", time, {"a"=>1}], + ] + res_codes = [] - d.run do - d.expected_emits.each {|tag,_time,record| + d.run(expect_records: 1) do + events.each do |tag, t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s}) - assert_equal "200", res.code - } + res_codes << res.code + end end + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] end def test_json d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s}) + res_codes << res.code + end end - - d.emit_streams.each { |tag, es| - assert !include_http_header?(es.first[1]) - } + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - events = [{"a"=>1},{"a"=>2}] + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time_i, records[0]], + ["tag1", time_i, records[1]], + ] tag = "tag1" - - events.each { |ev| - d.expect_emit tag, time, ev - } - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + res_codes = [] + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_json_with_add_remote_addr d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1} - d.expect_emit "tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1}], + ["tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2}], + ] + res_codes = [] + d.run(expect_records: 2) do + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code + end end - + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_remote_addr d = create_driver(CONFIG + "add_remote_addr true") + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] + records = [{"a"=>1},{"a"=>2}] tag = "tag1" + res_codes = [] - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1} - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2} - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end + assert_equal ["200"], res_codes + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] + assert{ d.events[0][2].has_key?("REMOTE_ADDR") && d.events[0][2]["REMOTE_ADDR"] =~ /^\d{1,4}(\.\d{1,4}){3}$/ } + + assert_equal "tag1", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] end def test_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - d.expect_emit "tag2", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}, d.events[0][2]) + + assert_equal "tag2", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}, d.events[1][2]) end def test_multi_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}], + ] tag = "tag1" + res_codes = [] - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2} - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) - assert_equal "200", res.code + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") - - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + records = [{"a"=>1},{"a"=>2}] tag = "tag1" + res_codes = [] - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end + assert_equal ["200"], res_codes + + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] - d.emit_streams.each { |_tag, es| - assert include_http_header?(es.first[1]) - } + assert_equal "tag1", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] + + assert include_http_header?(d.events[0][2]) + assert include_http_header?(d.events[1][2]) end def test_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code + end + end + assert_equal ["200", "200"], res_codes - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - records = [["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}]] + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] - d.run do - records.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - - } - end + assert_equal "tag2", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] - d.emit_streams.each { |tag, es| - assert include_http_header?(es.first[1]) - } + assert include_http_header?(d.events[0][2]) + assert include_http_header?(d.events[1][2]) end def test_application_json d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}?time=#{_time.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}?time=#{time_i.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_msgpack d = create_driver - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>_time.to_s}) - assert_equal "200", res.code - } + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>time_i.to_s}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_msgpack d = create_driver - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - events = [{"a"=>1},{"a"=>2}] + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, records[0]], + ["tag1", time, records[1]], + ] tag = "tag1" - - events.each { |ev| - d.expect_emit tag, time, ev - } - - d.run do - res = post("/#{tag}", {"msgpack"=>events.to_msgpack, "time"=>time.to_s}) - assert_equal "200", res.code + res_codes = [] + d.run(expect_records: 2) do + res = post("/#{tag}", {"msgpack"=>records.to_msgpack, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_with_regexp @@ -274,20 +365,27 @@ def test_with_regexp types field_1:integer ]) - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"field_1" => 1, "field_2" => 'str'}], + ["tag2", time, {"field_1" => 2, "field_2" => 'str'}], + ] + res_codes = [] - d.expect_emit "tag1", time, {"field_1" => 1, "field_2" => 'str'} - d.expect_emit "tag2", time, {"field_1" => 2, "field_2" => 'str'} - - d.run do - d.expected_emits.each { |tag, _time, record| + d.run(expect_records: 2) do + events.each do |tag, t, record| body = record.map { |k, v| v.to_s }.join(':') - res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'application/octet-stream'}) - assert_equal "200", res.code - } + res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'application/octet-stream'}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_with_csv @@ -297,72 +395,98 @@ def test_with_csv format csv keys foo,bar ]) - - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"foo" => "1", "bar" => 'st"r'} - d.expect_emit "tag2", time, {"foo" => "2", "bar" => 'str'} - - d.run do - d.expected_emits.each { |tag, _time, record| + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"foo" => "1", "bar" => 'st"r'}], + ["tag2", time, {"foo" => "2", "bar" => 'str'}], + ] + res_codes = [] + + d.run(expect_records: 2) do + events.each do |tag, t, record| body = record.map { |k, v| v }.to_csv - res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) - assert_equal "200", res.code - } + res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_resonse_with_empty_img d = create_driver(CONFIG + "respond_with_empty_img true") assert_equal true, d.instance.respond_with_empty_img - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_bodies = [] d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code # Ruby returns ASCII-8 encoded string for GIF. - assert_equal Fluent::HttpInput::EMPTY_GIF_IMAGE, res.body.force_encoding("UTF-8") - } + res_bodies << res.body.force_encoding("UTF-8") + end end + assert_equal ["200", "200"], res_codes + assert_equal [Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE, Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE], res_bodies + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_cors_allowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>1} + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_headers = [] d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}, {"Origin"=>"http://foo.com"}) - assert_equal "200", res.code - assert_equal "http://foo.com", res["Access-Control-Allow-Origin"] - } + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://foo.com"}) + res_codes << res.code + res_headers << res["Access-Control-Allow-Origin"] + end end + assert_equal ["200", "200"], res_codes + assert_equal ["http://foo.com", "http://foo.com"], res_headers + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_cors_disallowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + res_codes = [] - d.expected_emits_length = 0 + d.end_if{ res_codes.size == 2 } d.run do - res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time.to_s}, {"Origin"=>"http://bar.com"}) - assert_equal "403", res.code - - res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time.to_s}, {"Origin"=>"http://bar.com"}) - assert_equal "403", res.code + res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res_codes << res.code + res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res_codes << res.code end + assert_equal ["403", "403"], res_codes end $test_in_http_connection_object_ids = [] @@ -388,7 +512,7 @@ def on_message_begin end end - class Fluent::HttpInput::Handler + class Fluent::Plugin::HttpInput::Handler prepend ContentTypeHook end