Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure Service Discovery using plugin helper with automatically loaded configurations #3299

Merged
merged 4 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion lib/fluent/plugin_helper/service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ module PluginHelper
module ServiceDiscovery
include Fluent::PluginHelper::Timer

# For the compatibility with older versions without `param_name: :service_discovery_configs`
attr_reader :service_discovery

def self.included(mod)
mod.include ServiceDiscoveryParams
end

def configure(conf)
super
# For the compatibility with older versions without `param_name: :service_discovery_configs`
@service_discovery = @service_discovery_configs
end

def start
unless @discovery_manager
log.warn('There is no discovery_manager. skip start them')
Expand All @@ -52,6 +61,35 @@ def start

private

# @param title [Symbol] the thread name. this value should be unique.
# @param static_default_service_directive [String] the directive name of each service when "static" service discovery is enabled in default
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
# @param custom_build_method [Proc]
def service_discovery_configure(title, static_default_service_directive: nil, load_balancer: nil, custom_build_method: nil, interval: 3)
configs = @service_discovery_configs.map(&:corresponding_config_element)
if static_default_service_directive
configs << Fluent::Config::Element.new(
'service_discovery',
'',
{'@type' => 'static'},
@config.elements(name: static_default_service_directive.to_s).map{|e| Fluent::Config::Element.new('service', e.arg, e.dup, e.elements, e.unused) }
)
end
service_discovery_create_manager(title, configurations: configs, load_balancer: load_balancer, custom_build_method: custom_build_method, interval: interval)
end

def service_discovery_select_service(&block)
@discovery_manager.select_service(&block)
end

def service_discovery_services
@discovery_manager.services
end

def service_discovery_rebalance
@discovery_manager.rebalance
end

# @param title [Symbol] the thread name. this value should be unique.
# @param configurations [Hash] hash which must has discivery_service type and its configuration like `{ type: :static, conf: <Fluent::Config::Element> }`
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
Expand All @@ -78,7 +116,7 @@ def discovery_manager
module ServiceDiscoveryParams
include Fluent::Configurable

config_section :service_discovery do
config_section :service_discovery, multi: true, param_name: :service_discovery_configs do
config_param :@type, :string
end
end
Expand Down
16 changes: 11 additions & 5 deletions lib/fluent/plugin_helper/service_discovery/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,23 @@ def initialize(log:, load_balancer: nil, custom_build_method: nil)
@static_config = true
end

def configure(opts, parent: nil)
opts.each do |opt|
sd = Fluent::Plugin.new_sd(opt[:type], parent: parent)
sd.configure(opt[:conf])
def configure(configs, parent: nil)
configs.each do |config|
type, conf = if config.has_key?(:conf) # for compatibility with initial API
[config[:type], config[:conf]]
else
[config['@type'], config]
end

sd = Fluent::Plugin.new_sd(type, parent: parent)
sd.configure(conf)

sd.services.each do |s|
@services[s.discovery_id] = build_service(s)
end
@discoveries << sd

if @static_config && opt[:type] != :static
if @static_config && type.to_sym != :static
@static_config = false
end
end
Expand Down
88 changes: 74 additions & 14 deletions test/plugin_helper/test_service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ def discovery_manager
end
end

class DummyPlugin < Fluent::Plugin::TestBase
helpers :service_discovery

def configure(conf)
super
service_discovery_configure(:service_discovery_helper_test, static_default_service_directive: 'node')
end

def select_service(&block)
service_discovery_select_service(&block)
end

# Make these mehtod public
def discovery_manager
super
end
end

setup do
@sd_file_dir = File.expand_path('../plugin/data/sd_file', __dir__)

Expand All @@ -33,7 +51,7 @@ def discovery_manager
end
end

test 'start discovery manager' do
test 'support calling #service_discovery_create_manager and #discovery_manager from plugin' do
d = @d = Dummy.new

d.service_discovery_create_manager(
Expand All @@ -55,13 +73,30 @@ def discovery_manager
assert_equal 1234, services[0].port
end

test 'call timer_execute if dynamic configuration' do
d = @d = Dummy.new
test 'start discovery manager' do
d = @d = DummyPlugin.new

d.service_discovery_create_manager(
:service_discovery_helper_test,
configurations: [{ type: :file, conf: config_element('file_config', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) }],
)
services = [config_element('service', '', { 'host' => '127.0.0.1', 'port' => '1234' })]
d.configure(config_element('root', '', {}, [config_element('service_discovery', '', {'@type' => 'static'}, services)]))

assert_true !!d.discovery_manager

mock.proxy(d.discovery_manager).start.once
mock.proxy(d).timer_execute(:service_discovery_helper_test, anything).never

d.start
d.event_loop_wait_until_start

assert_equal 1, d.discovery_manager.services.size
d.select_service do |serv|
assert_equal "127.0.0.1", serv.host
assert_equal 1234, serv.port
end
end

test 'call timer_execute if dynamic configuration' do
d = @d = DummyPlugin.new
d.configure(config_element('root', '', {}, [config_element('service_discovery', '', { '@type' => 'file', 'path' => File.join(@sd_file_dir, 'config.yml' )})]))

assert_true !!d.discovery_manager
mock.proxy(d.discovery_manager).start.once
Expand All @@ -71,25 +106,22 @@ def discovery_manager
end

test 'exits service discovery instances without any errors' do
d = @d = Dummy.new
d = @d = DummyPlugin.new
mockv = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources)
.and_return([Resolv::DNS::Resource::IN::SRV.new(1, 10, 8081, 'service1.example.com')])
.mock
mock(Resolv::DNS).new { mockv }

d.service_discovery_create_manager(
:service_discovery_helper_test2,
configurations: [{ type: :srv, conf: config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }) }],
)
d.configure(config_element('root', '', {}, [config_element('service_discovery', '', { '@type' => 'srv', 'service' => 'service1', 'hostname' => 'example.com' })]))

assert_true !!d.discovery_manager
mock.proxy(d.discovery_manager).start.once
mock(d).timer_execute(:service_discovery_helper_test2, anything).once
mock(d).timer_execute(:service_discovery_helper_test, anything).once

# To avoid claring `@logs` during `terminate` step
# https://github.com/fluent/fluentd/blob/bc78d889f93dad8c2a4e0ad1ca802546185dacba/lib/fluent/test/log.rb#L33
mock(d.log).reset.twice
mock(d.log).reset.times(3)

d.start
d.event_loop_wait_until_start
Expand All @@ -102,4 +134,32 @@ def discovery_manager

assert_false(d.log.out.logs.any? { |e| e.match?(/thread doesn't exit correctly/) })
end

test 'static service discovery will be configured automatically when default service directive is specified' do
d = @d = DummyPlugin.new

nodes = [
config_element('node', '', { 'host' => '192.168.0.1', 'port' => '24224' }),
config_element('node', '', { 'host' => '192.168.0.2', 'port' => '24224' })
]
d.configure(config_element('root', '', {}, nodes))

assert_true !!d.discovery_manager

mock.proxy(d.discovery_manager).start.once
mock.proxy(d).timer_execute(:service_discovery_helper_test, anything).never

d.start
d.event_loop_wait_until_start

assert_equal 2, d.discovery_manager.services.size
d.select_service do |serv|
assert_equal "192.168.0.1", serv.host
assert_equal 24224, serv.port
end
d.select_service do |serv|
assert_equal "192.168.0.2", serv.host
assert_equal 24224, serv.port
end
end
end