Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support service discovery with SRV record #2876

Merged
merged 5 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions lib/fluent/plugin/sd_srv.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'resolv'

require 'fluent/plugin_helper'
require 'fluent/plugin/service_discovery'

module Fluent
module Plugin
class SrvServiceDiscovery < ServiceDiscovery
include PluginHelper::Mixin

Plugin.register_sd('srv', self)

helpers :timer

desc 'Service without underscore in RFC2782'
config_param :service, :string
desc 'Proto without underscore in RFC2782'
config_param :proto, :string, default: 'tcp'
desc 'Name without underscore in RFC2782'
config_param :hostname, :string
desc 'hostname of DNS server to request the SRV record'
config_param :dns_server_host, :string, default: nil
desc 'interval of requesting to DNS server'
config_param :interval, :integer, default: 60
desc "resolve hostname to IP addr of SRV's Target"
config_param :dns_lookup, :bool, default: true
desc 'The shared key per server'
config_param :shared_key, :string, default: nil, secret: true
desc 'The username for authentication'
config_param :username, :string, default: ''
desc 'The password for authentication'
config_param :password, :string, default: '', secret: true

def initialize
super
@target = nil
end

def configure(conf)
super

@target = "_#{@service}._#{@proto}.#{@hostname}"
@dns_resolve =
if @dns_server_host.nil?
Resolv::DNS.new
elsif @dns_server_host.include?(':') # e.g. 127.0.0.1:8600
host, port = @dns_server_host.split(':', 2)
Resolv::DNS.new(nameserver_port: [[host, port.to_i]])
else
Resolv::DNS.new(nameserver: @dns_server_host)
end

@services = fetch_srv_record
end

def start(queue)
timer_execute(:"sd_srv_record_#{@target}", @interval) do
refresh_srv_records(queue)
end

super()
end

private

def refresh_srv_records(queue)
s = begin
fetch_srv_record
rescue => e
@log.error("sd_srv: #{e}")
return
end

if s.nil? || s.empty?
return
end

diff = []
join = s - @services
# Need service_in first to guarantee that server exist at least one all time.
join.each do |j|
diff << ServiceDiscovery.service_in_msg(j)
end

drain = @services - s
drain.each do |d|
diff << ServiceDiscovery.service_out_msg(d)
end

@services = s

diff.each do |a|
queue.push(a)
end
end

def fetch_srv_record
adders = @dns_resolve.getresources(@target, Resolv::DNS::Resource::IN::SRV)

services = []

adders.each do |addr|
host = @dns_lookup ? dns_lookup!(addr.target) : addr.target
services << [
addr.priority,
Service.new(:srv, host.to_s, addr.port.to_i, addr.target.to_s, addr.weight, false, @username, @password, @shared_key)
]
end

services.sort_by(&:first).flat_map { |s| s[1] }
end

def dns_lookup!(host)
# may need to cache the result
@dns_resolve.getaddress(host) # get first result for now
end
end
end
end
230 changes: 230 additions & 0 deletions test/plugin/test_sd_srv.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
require_relative '../helper'
require 'fluent/plugin/sd_srv'
require 'fileutils'
require 'flexmock/test_unit'
require 'json'

class SrvServiceDiscoveryTest < ::Test::Unit::TestCase
SRV_RECORD1 = Resolv::DNS::Resource::IN::SRV.new(1, 10, 8081, 'service1.example.com')
SRV_RECORD2 = Resolv::DNS::Resource::IN::SRV.new(2, 20, 8082, 'service2.example.com')

sub_test_case 'configure' do
test 'set services ordered by priority' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD2, SRV_RECORD1], getaddress: '127.0.0.1') }

sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
end

test 'reuturn host name without revolving name when dns_lookup is flase' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD1, SRV_RECORD2], getaddress: '127.0.0.1') }

sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, 'service1.example.com', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, 'service2.example.com', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
end

test 'pass a value as :nameserver to Resolve::DNS when dns_server_host is given' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock(Resolv::DNS).new(nameserver: '8.8.8.8') { flexmock('dns_resolver', getresources: [SRV_RECORD1, SRV_RECORD2], getaddress: '127.0.0.1') }

sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_server_host' => '8.8.8.8' }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
end

test 'pass a value as :nameserver_port to Resolve::DNS when dns_server_host has port' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock(Resolv::DNS).new(nameserver_port: [['8.8.8.8', 8080]]) { flexmock('dns_resolver', getresources: [SRV_RECORD1, SRV_RECORD2], getaddress: '127.0.0.1') }

sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_server_host' => '8.8.8.8:8080' }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
end

test 'target follows RFC2782' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources).with("_service1._tcp.example.com", Resolv::DNS::Resource::IN::SRV)
.and_return([SRV_RECORD1, SRV_RECORD2])
.mock

mock(Resolv::DNS).new { mock }
sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
end

test 'can change protocol' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources).with("_service1._udp.example.com", Resolv::DNS::Resource::IN::SRV)
.and_return([SRV_RECORD1, SRV_RECORD2])
.mock

mock(Resolv::DNS).new { mock }
sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'proto' => 'udp' }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, '', '', nil), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, '', '', nil), sdf.services[1]
end
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved

test 'can set password, username, password' do
sdf = Fluent::Plugin::SrvServiceDiscovery.new
mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD2, SRV_RECORD1], getaddress: '127.0.0.1') }

sdf.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'shared_key' => 'key', 'username' => 'user', 'password' => 'pass' }))
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8081, 'service1.example.com', 10, false, 'user', 'pass', 'key'), sdf.services[0]
assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:srv, '127.0.0.1', 8082, 'service2.example.com', 20, false, 'user', 'pass', 'key'), sdf.services[1]
end
end

sub_test_case '#start' do
module TestTimerEventHelperWrapper
# easy to control statsevent
def timer_execute(_name, _interval, &block)
@test_timer_event_helper_wrapper_context = Fiber.new do
loop do
block.call

if Fiber.yield == :finish
break
end
end
end

resume
end

def resume
@test_timer_event_helper_wrapper_context.resume(:resume)
end

def shutdown
super

if @test_timer_event_helper_wrapper_context
@test_timer_event_helper_wrapper_context.resume(:finish)
end

end
end

setup do
sds = Fluent::Plugin::SrvServiceDiscovery.new
@sd_srv = sds
end

teardown do
if @sd_srv
@sd_srv.stop unless @sd_srv.stopped?
@sd_srv.before_shutdown unless @sd_srv.before_shutdown?
@sd_srv.shutdown unless @sd_srv.shutdown?
@sd_srv.after_shutdown unless @sd_srv.after_shutdown?
@sd_srv.close unless @sd_srv.closed?
@sd_srv.terminate unless @sd_srv.terminated?
end
end

test 'Skip if srv record is not updated' do
@sd_srv.extend(TestTimerEventHelperWrapper)
mock(Resolv::DNS).new { flexmock('dns_resolver', getresources: [SRV_RECORD2, SRV_RECORD1], getaddress: '127.0.0.1') }
@sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
queue = []

@sd_srv.start(queue)
assert_empty queue

@sd_srv.resume
assert_empty queue
end

test 'Skip if DNS resolver raise an error' do
@sd_srv.extend(TestTimerEventHelperWrapper)
mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources)
.and_return([SRV_RECORD1, SRV_RECORD2])
.and_return { raise 'some error' } # for start
.and_return { raise 'some error' } # for resume
.mock

mock(Resolv::DNS).new { mock }
@sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com' }))
queue = []

@sd_srv.start(queue)
assert_empty queue

@sd_srv.resume
assert_empty queue
end

test 'if service is updated, service_in and service_out event happen' do
@sd_srv.extend(TestTimerEventHelperWrapper)
mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources)
.and_return([SRV_RECORD1])
.and_return([SRV_RECORD2])
.mock

mock(Resolv::DNS).new { mock }
@sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
queue = []

@sd_srv.start(queue)
join = queue.shift
drain = queue.shift
assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type
assert_equal 8082, join.service.port
assert_equal 'service2.example.com', join.service.host

assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type
assert_equal 8081, drain.service.port
assert_equal 'service1.example.com', drain.service.host
end

test 'if service is deleted, service_out event happens' do
@sd_srv.extend(TestTimerEventHelperWrapper)
mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources)
.and_return([SRV_RECORD1, SRV_RECORD2])
.and_return([SRV_RECORD2])
.mock

mock(Resolv::DNS).new { mock }
@sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
queue = []

@sd_srv.start(queue)

assert_equal 1, queue.size
drain = queue.shift
assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type
assert_equal 8081, drain.service.port
assert_equal 'service1.example.com', drain.service.host
end

test 'if new service is added, service_in event happens' do
@sd_srv.extend(TestTimerEventHelperWrapper)
mock = flexmock('dns_resolver', getaddress: '127.0.0.1')
.should_receive(:getresources)
.and_return([SRV_RECORD2])
.and_return([SRV_RECORD1, SRV_RECORD2])
.mock

mock(Resolv::DNS).new { mock }
@sd_srv.configure(config_element('service_discovery', '', { 'service' => 'service1', 'hostname' => 'example.com', 'dns_lookup' => false }))
queue = []

@sd_srv.start(queue)

assert_equal 1, queue.size
join = queue.shift
assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type
assert_equal 8081, join.service.port
assert_equal 'service1.example.com', join.service.host
end
end
end