Skip to content

Commit

Permalink
Merge pull request #1387 from CSharpRU/get-real-retry-info-from-buffe…
Browse files Browse the repository at this point in the history
…red-output

Ability to get real retry state of buffered output
  • Loading branch information
repeatedly authored Dec 25, 2016
2 parents cbcdc5a + f65b24f commit 948e26b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 9 deletions.
29 changes: 27 additions & 2 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class MonitorAgentInput < Input
config_param :tag, :string, default: nil
config_param :emit_interval, :time, default: 60
config_param :include_config, :bool, default: true
config_param :include_retry, :bool, default: true

class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
Expand Down Expand Up @@ -78,7 +79,7 @@ def build_object(req, res)

# if ?debug=1 is set, set :with_debug_info for get_monitor_info
# and :pretty_json for render_json_error
opts = {with_config: @agent.include_config}
opts = {with_config: @agent.include_config, with_retry: @agent.include_retry}
if s = qs['debug'] and s[0]
opts[:with_debug_info] = true
opts[:pretty_json] = true
Expand All @@ -88,6 +89,10 @@ def build_object(req, res)
opts[:with_config] = Fluent::Config.bool_value(with_config)
end

if with_retry = get_search_parameter(qs, 'with_retry'.freeze)
opts[:with_retry] = Fluent::Config.bool_value(with_retry)
end

if tag = get_search_parameter(qs, 'tag'.freeze)
# ?tag= to search an output plugin by match pattern
if obj = @agent.plugin_info_by_tag(tag, opts)
Expand Down Expand Up @@ -231,7 +236,7 @@ def start
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

opts = {with_config: false}
opts = {with_config: false, with_retry: false}
timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
es = Fluent::MultiEventStream.new
now = Fluent::Engine.now
Expand Down Expand Up @@ -346,6 +351,8 @@ def get_monitor_info(pe, opts={})
end
}

obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)

# include all instance variables if :with_debug_info is set
if opts[:with_debug_info]
iv = {}
Expand All @@ -362,6 +369,24 @@ def get_monitor_info(pe, opts={})
obj
end

RETRY_INFO = {
'start' => '@start',
'steps' => '@steps',
'next_time' => '@next_time',
}

def get_retry_info(pe_retry)
retry_variables = {}

if pe_retry
RETRY_INFO.each_pair { |key, param|
retry_variables[key] = pe_retry.instance_variable_get(param)
}
end

retry_variables
end

def plugin_category(pe)
case pe
when Fluent::Plugin::Input
Expand Down
85 changes: 78 additions & 7 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,16 @@ def get(uri, header = {})
assert_equal(expected_test_filter_response, test_filter)
end

data(:include_config_yes => [true, "include_config yes"],
:include_config_no => [false, "include_config no"])
test "/api/plugins.json" do |(with_config, include_conf)|

data(:include_config_and_retry_yes => [true, true, "include_config yes", "include_retry yes"],
:include_config_and_retry_no => [false, false, "include_config no", "include_retry no"],)
test "/api/plugins.json" do |(with_config, with_retry, include_conf, retry_conf)|
d = create_driver("
@type monitor_agent
bind '127.0.0.1'
port #{@port}
tag monitor
#{include_conf}
#{retry_conf}
")
d.instance.start
expected_test_in_response = {
Expand All @@ -305,16 +305,17 @@ def get(uri, header = {})
"type" => "null"
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
assert_equal(expected_null_response, null_response)
end

data(:with_config_yes => [true, "?with_config=yes"],
:with_config_no => [false, "?with_config=no"])
test "/api/plugins.json with query parameter. query parameter is preferred than include_config" do |(with_config, query_param)|
data(:with_config_and_retry_yes => [true, true, "?with_config=yes&with_retry"],
:with_config_and_retry_no => [false, false, "?with_config=no&with_retry=no"])
test "/api/plugins.json with query parameter. query parameter is preferred than include_config" do |(with_config, with_retry, query_param)|

d = create_driver("
@type monitor_agent
Expand All @@ -341,6 +342,7 @@ def get(uri, header = {})
"type" => "null"
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}"))
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
Expand Down Expand Up @@ -377,4 +379,73 @@ def get(uri, header = {})
assert_nil(res["log_path"])
end
end

sub_test_case "check retry of buffered plugins" do
class FluentTestFailWriteOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_fail_write', self)

def write(chunk)
raise "chunk error!"
end
end

setup do
@port = unused_port
# check @type and type in one configuration
conf = <<-EOC
<source>
@type monitor_agent
@id monitor_agent
bind "127.0.0.1"
port #{@port}
</source>
<match **>
@type test_out_fail_write
@id test_out_fail_write
<buffer>
flush_mode immediate
</buffer>
</match>
EOC
@ra = Fluent::RootAgent.new(log: $log)
stub(Fluent::Engine).root_agent { @ra }
@ra = configure_ra(@ra, conf)
end

test "/api/plugins.json retry object should be filled if flush was failed" do

d = create_driver("
@type monitor_agent
bind '127.0.0.1'
port #{@port}
include_config no
")
d.instance.start
expected_test_out_fail_write_response = {
"buffer_queue_length" => 1,
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "test_out_fail_write",
"retry_count" => 2,
"type" => "test_out_fail_write",
"retry" => {
"steps" => 1
}
}
output = @ra.outputs[0]
output.start
output.after_start
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
2.times do
output.force_flush
end
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
test_out_fail_write_response = response["plugins"][1]
# remove dynamic keys
["start", "next_time"].each { |key| test_out_fail_write_response["retry"].delete(key) }
assert_equal(expected_test_out_fail_write_response, test_out_fail_write_response)
end
end
end

0 comments on commit 948e26b

Please sign in to comment.