Skip to content

Commit

Permalink
Merge pull request #3299 from fluent/tweak-service-discovery-plugin-h…
Browse files Browse the repository at this point in the history
…elper

Configure Service Discovery using plugin helper with automatically loaded configurations
  • Loading branch information
ashie authored May 7, 2021
2 parents cc3ea06 + f55d9d9 commit 5d9c9ac
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 20 deletions.
40 changes: 39 additions & 1 deletion lib/fluent/plugin_helper/service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ module PluginHelper
module ServiceDiscovery
include Fluent::PluginHelper::Timer

# For the compatibility with older versions without `param_name: :service_discovery_configs`
attr_reader :service_discovery

def self.included(mod)
mod.include ServiceDiscoveryParams
end

def configure(conf)
super
# For the compatibility with older versions without `param_name: :service_discovery_configs`
@service_discovery = @service_discovery_configs
end

def start
unless @discovery_manager
log.warn('There is no discovery_manager. skip start them')
Expand All @@ -52,6 +61,35 @@ def start

private

# @param title [Symbol] the thread name. this value should be unique.
# @param static_default_service_directive [String] the directive name of each service when "static" service discovery is enabled in default
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
# @param custom_build_method [Proc]
def service_discovery_configure(title, static_default_service_directive: nil, load_balancer: nil, custom_build_method: nil, interval: 3)
configs = @service_discovery_configs.map(&:corresponding_config_element)
if static_default_service_directive
configs << Fluent::Config::Element.new(
'service_discovery',
'',
{'@type' => 'static'},
@config.elements(name: static_default_service_directive.to_s).map{|e| Fluent::Config::Element.new('service', e.arg, e.dup, e.elements, e.unused) }
)
end
service_discovery_create_manager(title, configurations: configs, load_balancer: load_balancer, custom_build_method: custom_build_method, interval: interval)
end

def service_discovery_select_service(&block)
@discovery_manager.select_service(&block)
end

def service_discovery_services
@discovery_manager.services
end

def service_discovery_rebalance
@discovery_manager.rebalance
end

# @param title [Symbol] the thread name. this value should be unique.
# @param configurations [Hash] hash which must has discivery_service type and its configuration like `{ type: :static, conf: <Fluent::Config::Element> }`
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
Expand All @@ -78,7 +116,7 @@ def discovery_manager
module ServiceDiscoveryParams
include Fluent::Configurable

config_section :service_discovery do
config_section :service_discovery, multi: true, param_name: :service_discovery_configs do
config_param :@type, :string
end
end
Expand Down
16 changes: 11 additions & 5 deletions lib/fluent/plugin_helper/service_discovery/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,23 @@ def initialize(log:, load_balancer: nil, custom_build_method: nil)
@static_config = true
end

def configure(opts, parent: nil)
opts.each do |opt|
sd = Fluent::Plugin.new_sd(opt[:type], parent: parent)
sd.configure(opt[:conf])
def configure(configs, parent: nil)
configs.each do |config|
type, conf = if config.has_key?(:conf) # for compatibility with initial API
[config[:type], config[:conf]]
else
[config['@type'], config]
end

sd = Fluent::Plugin.new_sd(type, parent: parent)
sd.configure(conf)

sd.services.each do |s|
@services[s.discovery_id] = build_service(s)
end
@discoveries << sd

if @static_config && opt[:type] != :static
if @static_config && type.to_sym != :static
@static_config = false
end
end
Expand Down
88 changes: 74 additions & 14 deletions test/plugin_helper/test_service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ def discovery_manager
end
end

class DummyPlugin < Fluent::Plugin::TestBase
helpers :service_discovery

def configure(conf)
super
service_discovery_configure(:service_discovery_helper_test, static_default_service_directive: 'node')
end

def select_service(&block)
service_discovery_select_service(&block)
end

# Make these mehtod public
def discovery_manager
super
end
end

setup do
@sd_file_dir = File.expand_path('../plugin/data/sd_file', __dir__)

Expand All @@ -33,7 +51,7 @@ def discovery_manager
end
end

test 'start discovery manager' do
test 'support calling #service_discovery_create_manager and #discovery_manager from plugin' do
d = @d = Dummy.new

d.service_discovery_create_manager(
Expand All @@ -55,13 +73,30 @@ def discovery_manager
assert_equal 1234, services[0].port
end

test 'call timer_execute if dynamic configuration' do
d = @d = Dummy.new
test 'start discovery manager' do
d = @d = DummyPlugin.new

d.service_discovery_create_manager(
:service_discovery_helper_test,
configurations: [{ type: :file, conf: config_element('file_config', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) }],
)
services = [config_element('service', '', { 'host' => '127.0.0.1', 'port' => '1234' })]
d.configure(config_element('root', '', {}, [config_element('service_discovery', '', {'@type' => 'static'}, services)]))

assert_true !!d.discovery_manager

mock.proxy(d.discovery_manager).start.once
mock.proxy(d).timer_execute(:service_discovery_helper_test, anything).never

d.start
d.event_loop_wait_until_start

assert_equal 1, d.discovery_manager.services.size
d.select_service do |serv|
assert_equal "127.0.0.1", serv.host
assert_equal 1234, serv.port
end
end

test 'call timer_execute if dynamic configuration' do
d = @d = DummyPlugin.new
d.configure(config_element('root', '', {}, [config_element('service_discovery', '', { '@type' => 'file', 'path' => File.join(@sd_file_dir, 'config.yml' )})]))

assert_true !!d.discovery_manager
mock.proxy(d.discovery_manager).start.once
Expand All @@ -71,25 +106,22 @@ def discovery_manager
end

test 'exits service discovery instances without any errors' do
d = @d = Dummy.new
d = @d = DummyPlugin.new
mockv = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources)
.and_return([Resolv::DNS::Resource::IN::SRV.new(1, 10, 8081, 'service1.example.com')])
.mock
mock(Resolv::DNS).new { mockv }

d.service_discovery_create_manager(
:service_discovery_helper_test2,
configurations: [{ type: :srv, conf: config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }) }],
)
d.configure(config_element('root', '', {}, [config_element('service_discovery', '', { '@type' => 'srv', 'service' => 'service1', 'hostname' => 'example.com' })]))

assert_true !!d.discovery_manager
mock.proxy(d.discovery_manager).start.once
mock(d).timer_execute(:service_discovery_helper_test2, anything).once
mock(d).timer_execute(:service_discovery_helper_test, anything).once

# To avoid claring `@logs` during `terminate` step
# https://github.com/fluent/fluentd/blob/bc78d889f93dad8c2a4e0ad1ca802546185dacba/lib/fluent/test/log.rb#L33
mock(d.log).reset.twice
mock(d.log).reset.times(3)

d.start
d.event_loop_wait_until_start
Expand All @@ -102,4 +134,32 @@ def discovery_manager

assert_false(d.log.out.logs.any? { |e| e.match?(/thread doesn't exit correctly/) })
end

test 'static service discovery will be configured automatically when default service directive is specified' do
d = @d = DummyPlugin.new

nodes = [
config_element('node', '', { 'host' => '192.168.0.1', 'port' => '24224' }),
config_element('node', '', { 'host' => '192.168.0.2', 'port' => '24224' })
]
d.configure(config_element('root', '', {}, nodes))

assert_true !!d.discovery_manager

mock.proxy(d.discovery_manager).start.once
mock.proxy(d).timer_execute(:service_discovery_helper_test, anything).never

d.start
d.event_loop_wait_until_start

assert_equal 2, d.discovery_manager.services.size
d.select_service do |serv|
assert_equal "192.168.0.1", serv.host
assert_equal 24224, serv.port
end
d.select_service do |serv|
assert_equal "192.168.0.2", serv.host
assert_equal 24224, serv.port
end
end
end

0 comments on commit 5d9c9ac

Please sign in to comment.