Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event router for secondary output #1283

Merged
merged 2 commits into from
Oct 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def add_match(type, pattern, conf)
log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

output = Plugin.new_output(type)
output.router = @event_router if output.respond_to?(:router=)
output.context_router = @event_router
output.configure(conf)
@outputs << output
if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
Expand All @@ -144,7 +144,7 @@ def add_filter(type, pattern, conf)
log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

filter = Plugin.new_filter(type)
filter.router = @event_router
filter.context_router = @event_router
filter.configure(conf)
@filters << filter
@event_router.add_rule(pattern, filter)
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Base
def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false, false)
@_context_router = nil
@under_plugin_development = false
end

Expand All @@ -45,6 +46,14 @@ def configure(conf)
self
end

def context_router=(router)
@_context_router = router
end

def context_router
@_context_router
end

def start
@_state.start = true
self
Expand Down
4 changes: 1 addition & 3 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ def configure(conf)
log.debug "adding store", type: type

output = Fluent::Plugin.new_output(type)
if output.has_router?
output.router = router
end
output.context_router = self.context_router
output.configure(store_conf)
@outputs << output
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def acts_as_secondary(primary)
@timekey_zone = @primary_instance.timekey_zone
@output_time_formatter_cache = {}
end
self.context_router = primary.context_router

(class << self; self; end).module_eval do
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
Expand Down Expand Up @@ -317,7 +318,6 @@ def configure(conf)
@secondary = Plugin.new_output(secondary_type)
@secondary.acts_as_secondary(self)
@secondary.configure(secondary_conf)
@secondary.router = router if @secondary.has_router?
if (self.class != @secondary.class) && (@custom_format || @secondary.implement?(:custom_format))
log.warn "secondary type should be same with primary one", primary: self.class.to_s, secondary: @secondary.class.to_s
end
Expand Down
15 changes: 14 additions & 1 deletion lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ module EventEmitter

def router
@_event_emitter_used_actually = true
if @_event_emitter_lazy_init
@router = @primary_instance.router
end
@router
end

def router=(r)
# not recommended now...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about logging warn message here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it at first, but didn't it finally. Because we can't remove #router= method between v0.x and v1.x for compatibility reason. So displaying warning doesn't help anything.
IMO we should show warning here at the same time with deprecating v0.12 plugin APIs (and remove this method at v2).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I understood the situation.

@router = r
end

Expand All @@ -44,14 +48,23 @@ def event_emitter_used_actually?
def event_emitter_router(label_name)
if label_name
Engine.root_agent.find_label(label_name).event_router
elsif self.respond_to?(:as_secondary) && self.as_secondary
if @primary_instance.has_router?
@_event_emitter_lazy_init = true
nil # primary plugin's event router is not initialized yet, here.
else
@primary_instance.context_router
end
else
Engine.root_agent.event_router
# `Engine.root_agent.event_router` is for testing
self.context_router || Engine.root_agent.event_router
end
end

def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@router = nil
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def add_source(type, conf)
# <source> emits events to the top-level event router (RootAgent#event_router).
# Input#configure overwrites event_router to a label's event_router if it has `@label` parameter.
# See also 'fluentd/plugin/input.rb'
input.router = @event_router
input.context_router = @event_router
input.configure(conf)
@inputs << input

Expand Down
20 changes: 20 additions & 0 deletions test/test_plugin_classes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ def process(tag, es)
end
end

class FluentTestBufferedOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_buffered', self)
def write(chunk)
# drop everything
end
end

class FluentTestEmitOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_emit', self)
helpers :event_emitter
def write(chunk)
tag = chunk.metadata.tag || 'test'
array = []
chunk.each do |time, record|
array << [time, record]
end
router.emit_array(tag, array)
end
end

class FluentTestErrorOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_error', self)

Expand Down
139 changes: 139 additions & 0 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,145 @@ def configure_ra(conf_str)
end
end

sub_test_case 'configured with label and secondary plugin' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent{ @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
<source>
@type test_in
@label @route_a
</source>
<label @route_a>
<match a.**>
@type test_out_buffered
<secondary>
@type test_out_emit
</secondary>
</match>
</label>
<label @route_b>
<match b.**>
@type test_out
</match>
</label>
EOC
end

test 'secondary plugin has an event router for the label which the plugin is in' do
assert_equal 1, @ra.inputs.size
assert_equal 2, @ra.labels.size
assert_equal ['@route_a', '@route_b'], @ra.labels.keys
assert_equal '@route_a', @ra.labels['@route_a'].context
assert_equal '@route_b', @ra.labels['@route_b'].context

c1 = @ra.labels['@route_a']

assert_equal 1, c1.outputs.size
assert !c1.outputs.first.has_router?

assert c1.outputs.first.secondary
assert c1.outputs.first.secondary.has_router?
assert_equal c1.event_router, c1.outputs.first.secondary.router
end
end

sub_test_case 'configured with label and secondary plugin with @label specifier' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent{ @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
<source>
@type test_in
@label @route_a
</source>
<label @route_a>
<match a.**>
@type test_out_buffered
<secondary>
@type test_out_emit
@label @route_b
</secondary>
</match>
</label>
<label @route_b>
<match b.**>
@type test_out
</match>
</label>
EOC
end

test 'secondary plugin has an event router for the label specified in secondary section' do
assert_equal 1, @ra.inputs.size
assert_equal 2, @ra.labels.size
assert_equal ['@route_a', '@route_b'], @ra.labels.keys
assert_equal '@route_a', @ra.labels['@route_a'].context
assert_equal '@route_b', @ra.labels['@route_b'].context

c1 = @ra.labels['@route_a']
c2 = @ra.labels['@route_b']

assert_equal 1, c1.outputs.size
assert !c1.outputs.first.has_router?

assert c1.outputs.first.secondary
assert c1.outputs.first.secondary.has_router?
assert_equal c2.event_router, c1.outputs.first.secondary.router
end
end

sub_test_case 'configured with label and secondary plugin with @label specifier in primary output' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent{ @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
<source>
@type test_in
@label @route_a
</source>
<label @route_a>
<match a.**>
@type test_out_emit
@label @route_b
<secondary>
@type test_out_emit
</secondary>
</match>
</label>
<label @route_b>
<match b.**>
@type test_out
</match>
</label>
EOC
end

test 'secondary plugin has an event router for the label specified in secondary section' do
assert_equal 1, @ra.inputs.size
assert_equal 2, @ra.labels.size
assert_equal ['@route_a', '@route_b'], @ra.labels.keys
assert_equal '@route_a', @ra.labels['@route_a'].context
assert_equal '@route_b', @ra.labels['@route_b'].context

c1 = @ra.labels['@route_a']
c2 = @ra.labels['@route_b']

assert_equal 1, c1.outputs.size
assert c1.outputs.first.secondary

p1 = c1.outputs.first
assert p1.has_router?
assert_equal c1.event_router, p1.context_router
assert_equal c2.event_router, p1.router

s1 = p1.secondary
assert s1.has_router?
assert_equal c1.event_router, s1.context_router
assert_equal c2.event_router, s1.router
end
end

sub_test_case 'configured with MultiOutput plugins' do
setup do
@ra = RootAgent.new(log: $log)
Expand Down