diff --git a/lib/fluent/plugin/.in_monitor_agent.rb.swo b/lib/fluent/plugin/.in_monitor_agent.rb.swo new file mode 100644 index 0000000000..77d59ce0cb Binary files /dev/null and b/lib/fluent/plugin/.in_monitor_agent.rb.swo differ diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index 6d607c913f..9459a66702 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -226,6 +226,11 @@ def initialize @first_warn = false end + def configure(conf) + super + @port += fluentd_worker_id + end + def multi_workers_ready? true end @@ -233,7 +238,7 @@ def multi_workers_ready? def start super - log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" + log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}" @srv = WEBrick::HTTPServer.new({ BindAddress: @bind, Port: @port, diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index 0b5a98c6d0..717854b6be 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -204,10 +204,9 @@ def get(uri, header = {}) unless header.has_key?('Content-Type') header['Content-Type'] = 'application/octet-stream' end - res = Net::HTTP.start(url.host, url.port) {|http| + Net::HTTP.start(url.host, url.port) {|http| http.request(req) } - res.body end sub_test_case "servlets" do @@ -268,7 +267,7 @@ def get(uri, header = {}) expected_test_filter_response = "\ plugin_id:test_filter\tplugin_category:filter\ttype:test_filter\toutput_plugin:false\tretry_count:" - response = get("http://127.0.0.1:#{@port}/api/plugins") + response = get("http://127.0.0.1:#{@port}/api/plugins").body test_in = response.split("\n")[0] test_filter = response.split("\n")[3] assert_equal(expected_test_in_response, test_in) @@ -306,7 +305,7 @@ def get(uri, header = {}) } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry - response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json")) + response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) @@ -343,7 +342,7 @@ def get(uri, header = {}) } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry - response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}")) + response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) @@ -376,7 +375,7 @@ def get(uri, header = {}) "type" => "null", "instance_variables" => {"id" => "null", "num_errors" => 0} } - response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors")) + response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) @@ -394,7 +393,7 @@ def get(uri, header = {}) expected_response_regex = /pid:\d+\tppid:\d+\tconfig_path:\/etc\/fluent\/fluent.conf\tpid_file:\tplugin_dirs:\[\"\/etc\/fluent\/plugin\"\]\tlog_path:/ assert_match(expected_response_regex, - get("http://127.0.0.1:#{@port}/api/config")) + get("http://127.0.0.1:#{@port}/api/config").body) end test "/api/config.json" do @@ -405,7 +404,7 @@ def get(uri, header = {}) tag monitor ") d.instance.start - res = JSON.parse(get("http://127.0.0.1:#{@port}/api/config.json")) + res = JSON.parse(get("http://127.0.0.1:#{@port}/api/config.json").body) assert_equal("/etc/fluent/fluent.conf", res["config_path"]) assert_nil(res["pid_file"]) assert_equal(["/etc/fluent/plugin"], res["plugin_dirs"]) @@ -474,7 +473,7 @@ def write(chunk) output.submit_flush_once sleep 0.1 until output.buffer.queued? end - response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json")) + response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body) test_out_fail_write_response = response["plugins"][1] # remove dynamic keys response_retry_count = test_out_fail_write_response.delete("retry_count") @@ -486,4 +485,32 @@ def write(chunk) assert{ response_retry_count == response_retry["steps"] + 1 } end end + + sub_test_case "check the port number of http server" do + test "on single worker environment" do + port = unused_port + d = create_driver(" + @type monitor_agent + bind '127.0.0.1' + port #{port} +") + d.instance.start + assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code) + end + + test "worker_id = 2 on multi worker environment" do + port = unused_port + Fluent::SystemConfig.overwrite_system_config('workers' => 4) do + d = Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput) + d.instance.instance_eval{ @_fluentd_worker_id = 2 } + d.configure(" + @type monitor_agent + bind '127.0.0.1' + port #{port - 2} +") + d.instance.start + end + assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code) + end + end end