Skip to content

Commit

Permalink
in_monitor_agent: Start one HTTP server per worker on sequential port…
Browse files Browse the repository at this point in the history
… numbers
  • Loading branch information
Yuki Ito committed Mar 9, 2017
1 parent 9618ca1 commit dcee14f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
7 changes: 6 additions & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,19 @@ def initialize
@first_warn = false
end

def configure(conf)
super
@port += fluentd_worker_id
end

def multi_workers_ready?
true
end

def start
super

log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}"
@srv = WEBrick::HTTPServer.new({
BindAddress: @bind,
Port: @port,
Expand Down
45 changes: 36 additions & 9 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ def get(uri, header = {})
unless header.has_key?('Content-Type')
header['Content-Type'] = 'application/octet-stream'
end
res = Net::HTTP.start(url.host, url.port) {|http|
Net::HTTP.start(url.host, url.port) {|http|
http.request(req)
}
res.body
end

sub_test_case "servlets" do
Expand Down Expand Up @@ -268,7 +267,7 @@ def get(uri, header = {})
expected_test_filter_response = "\
plugin_id:test_filter\tplugin_category:filter\ttype:test_filter\toutput_plugin:false\tretry_count:"

response = get("http://127.0.0.1:#{@port}/api/plugins")
response = get("http://127.0.0.1:#{@port}/api/plugins").body
test_in = response.split("\n")[0]
test_filter = response.split("\n")[3]
assert_equal(expected_test_in_response, test_in)
Expand Down Expand Up @@ -306,7 +305,7 @@ def get(uri, header = {})
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body)
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
Expand Down Expand Up @@ -343,7 +342,7 @@ def get(uri, header = {})
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}"))
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}").body)
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
Expand Down Expand Up @@ -376,7 +375,7 @@ def get(uri, header = {})
"type" => "null",
"instance_variables" => {"id" => "null", "num_errors" => 0}
}
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors"))
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body)
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
Expand All @@ -394,7 +393,7 @@ def get(uri, header = {})
expected_response_regex = /pid:\d+\tppid:\d+\tconfig_path:\/etc\/fluent\/fluent.conf\tpid_file:\tplugin_dirs:\[\"\/etc\/fluent\/plugin\"\]\tlog_path:/

assert_match(expected_response_regex,
get("http://127.0.0.1:#{@port}/api/config"))
get("http://127.0.0.1:#{@port}/api/config").body)
end

test "/api/config.json" do
Expand All @@ -405,7 +404,7 @@ def get(uri, header = {})
tag monitor
")
d.instance.start
res = JSON.parse(get("http://127.0.0.1:#{@port}/api/config.json"))
res = JSON.parse(get("http://127.0.0.1:#{@port}/api/config.json").body)
assert_equal("/etc/fluent/fluent.conf", res["config_path"])
assert_nil(res["pid_file"])
assert_equal(["/etc/fluent/plugin"], res["plugin_dirs"])
Expand Down Expand Up @@ -474,7 +473,7 @@ def write(chunk)
output.submit_flush_once
sleep 0.1 until output.buffer.queued?
end
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body)
test_out_fail_write_response = response["plugins"][1]
# remove dynamic keys
response_retry_count = test_out_fail_write_response.delete("retry_count")
Expand All @@ -486,4 +485,32 @@ def write(chunk)
assert{ response_retry_count == response_retry["steps"] + 1 }
end
end

sub_test_case "check the port number of http server" do
test "on single worker environment" do
port = unused_port
d = create_driver("
@type monitor_agent
bind '127.0.0.1'
port #{port}
")
d.instance.start
assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code)
end

test "worker_id = 2 on multi worker environment" do
port = unused_port
Fluent::SystemConfig.overwrite_system_config('workers' => 4) do
d = Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput)
d.instance.instance_eval{ @_fluentd_worker_id = 2 }
d.configure("
@type monitor_agent
bind '127.0.0.1'
port #{port - 2}
")
d.instance.start
end
assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code)
end
end
end

0 comments on commit dcee14f

Please sign in to comment.