Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to get real retry state of buffered output #1387

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class MonitorAgentInput < Input
config_param :tag, :string, default: nil
config_param :emit_interval, :time, default: 60
config_param :include_config, :bool, default: true
config_param :include_retry, :bool, default: true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be not necessary


class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
Expand Down Expand Up @@ -78,7 +79,7 @@ def build_object(req, res)

# if ?debug=1 is set, set :with_debug_info for get_monitor_info
# and :pretty_json for render_json_error
opts = {with_config: @agent.include_config}
opts = {with_config: @agent.include_config, with_retry: @agent.include_retry}
if s = qs['debug'] and s[0]
opts[:with_debug_info] = true
opts[:pretty_json] = true
Expand All @@ -88,6 +89,10 @@ def build_object(req, res)
opts[:with_config] = Fluent::Config.bool_value(with_config)
end

if with_retry = get_search_parameter(qs, 'with_retry'.freeze)
opts[:with_retry] = Fluent::Config.bool_value(with_retry)
end

if tag = get_search_parameter(qs, 'tag'.freeze)
# ?tag= to search an output plugin by match pattern
if obj = @agent.plugin_info_by_tag(tag, opts)
Expand Down Expand Up @@ -231,7 +236,7 @@ def start
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

opts = {with_config: false}
opts = {with_config: false, with_retry: false}
timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
es = Fluent::MultiEventStream.new
now = Fluent::Engine.now
Expand Down Expand Up @@ -346,6 +351,8 @@ def get_monitor_info(pe, opts={})
end
}

obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)

# include all instance variables if :with_debug_info is set
if opts[:with_debug_info]
iv = {}
Expand All @@ -362,6 +369,24 @@ def get_monitor_info(pe, opts={})
obj
end

RETRY_INFO = {
'start' => '@start',
'steps' => '@steps',
'next_time' => '@next_time',
}

def get_retry_info(pe_retry)
retry_variables = {}

if pe_retry
RETRY_INFO.each_pair { |key, param|
retry_variables[key] = pe_retry.instance_variable_get(param)
}
end

retry_variables
end

def plugin_category(pe)
case pe
when Fluent::Plugin::Input
Expand Down
85 changes: 78 additions & 7 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,16 @@ def get(uri, header = {})
assert_equal(expected_test_filter_response, test_filter)
end

data(:include_config_yes => [true, "include_config yes"],
:include_config_no => [false, "include_config no"])
test "/api/plugins.json" do |(with_config, include_conf)|

data(:include_config_and_retry_yes => [true, true, "include_config yes", "include_retry yes"],
:include_config_and_retry_no => [false, false, "include_config no", "include_retry no"],)
test "/api/plugins.json" do |(with_config, with_retry, include_conf, retry_conf)|
d = create_driver("
@type monitor_agent
bind '127.0.0.1'
port #{@port}
tag monitor
#{include_conf}
#{retry_conf}
")
d.instance.start
expected_test_in_response = {
Expand All @@ -305,16 +305,17 @@ def get(uri, header = {})
"type" => "null"
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
assert_equal(expected_null_response, null_response)
end

data(:with_config_yes => [true, "?with_config=yes"],
:with_config_no => [false, "?with_config=no"])
test "/api/plugins.json with query parameter. query parameter is preferred than include_config" do |(with_config, query_param)|
data(:with_config_and_retry_yes => [true, true, "?with_config=yes&with_retry"],
:with_config_and_retry_no => [false, false, "?with_config=no&with_retry=no"])
test "/api/plugins.json with query parameter. query parameter is preferred than include_config" do |(with_config, with_retry, query_param)|

d = create_driver("
@type monitor_agent
Expand All @@ -341,6 +342,7 @@ def get(uri, header = {})
"type" => "null"
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}"))
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
Expand Down Expand Up @@ -377,4 +379,73 @@ def get(uri, header = {})
assert_nil(res["log_path"])
end
end

sub_test_case "check retry of buffered plugins" do
class FluentTestFailWriteOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_fail_write', self)

def write(chunk)
raise "chunk error!"
end
end

setup do
@port = unused_port
# check @type and type in one configuration
conf = <<-EOC
<source>
@type monitor_agent
@id monitor_agent
bind "127.0.0.1"
port #{@port}
</source>
<match **>
@type test_out_fail_write
@id test_out_fail_write
<buffer>
flush_mode immediate
</buffer>
</match>
EOC
@ra = Fluent::RootAgent.new(log: $log)
stub(Fluent::Engine).root_agent { @ra }
@ra = configure_ra(@ra, conf)
end

test "/api/plugins.json retry object should be filled if flush was failed" do

d = create_driver("
@type monitor_agent
bind '127.0.0.1'
port #{@port}
include_config no
")
d.instance.start
expected_test_out_fail_write_response = {
"buffer_queue_length" => 1,
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "test_out_fail_write",
"retry_count" => 2,
"type" => "test_out_fail_write",
"retry" => {
"steps" => 1
}
}
output = @ra.outputs[0]
output.start
output.after_start
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
2.times do
output.force_flush
end
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
test_out_fail_write_response = response["plugins"][1]
# remove dynamic keys
["start", "next_time"].each { |key| test_out_fail_write_response["retry"].delete(key) }
assert_equal(expected_test_out_fail_write_response, test_out_fail_write_response)
end
end
end