Skip to content

Commit

Permalink
Add StaticConfigAnalysis
Browse files Browse the repository at this point in the history
to check if config is valid before invoking configure

Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Dec 18, 2019
1 parent 3d59cc2 commit fb88222
Show file tree
Hide file tree
Showing 2 changed files with 339 additions and 0 deletions.
182 changes: 182 additions & 0 deletions lib/fluent/static_config_analysis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/config'
require 'fluent/plugin'

module Fluent
# Static Analysis means anlyis All plugins and Fluent::Element without invokeing Plugin#configure
class StaticConfigAnalysis
module Elem
Input = Struct.new(:plugin, :config)
Output = Struct.new(:plugin, :config)
Format = Struct.new(:plugin, :config)
Label = Struct.new(:name, :config, :nodes)
Worker = Struct.new(:ids, :config, :nodes)
end

Result = Struct.new(:tree, :outputs, :inputs, :filters, :labels) do
def all_plugins
(outputs + inputs + filters).map(&:plugin)
end
end

# @param workers [Integer] Number of workers
def self.call(conf, workers: 1)
new(workers).call(conf)
end

def initialize(workers)
@workers = workers

reset
end

def call(config)
reset

tree = [
static_worker_analyse(config),
static_label_analyse(config),
static_filter_and_output_analyse(config),
static_input_analyse(config),
].flatten

Result.new(tree, @outputs, @inputs, @filters, @labels.values)
end

private

def reset
@outputs = []
@inputs = []
@filters = []
@labels = {}
end

def static_worker_analyse(conf)
ret = []
conf.elements(name: 'worker').each do |config|
ids = parse_worker_id(config)

config.elements.each do |elem|
unless %w[source match filter label].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
end

nodes = [
static_label_analyse(config),
static_filter_and_output_analyse(config),
static_input_analyse(config),
].flatten
ret << Elem::Worker.new(ids, config, nodes)
end

ret
end

def parse_worker_id(conf)
worker_id_str = conf.arg

if worker_id_str.empty?
raise Fluent::ConfigError, 'Missing worker id on <worker> directive'
end

l, r =
begin
worker_id_str.split('-', 2).map { |v| Integer(v) }
rescue TypeError, ArgumentError
raise Fluent::ConfigError, "worker id should be integer: #{worker_id_str}"
end

# Negative nubmer can not be passed here becuase split by '-'.
if l < 0 || l >= @workers
raise Fluent::ConfigError, "worker id #{l} specified by <worker> directive is not allowed. Available worker id is between 0 and #{@workers}"
end

# e.g. specifed one worker id like `<worker 0>`
if r.nil?
return [l]
end

if r < 0 || r >= @workers
raise Fluent::ConfigError, "worker id #{r} specified by <worker> directive is not allowed. Available worker id is between 0 and #{@workers-1}"
end

if l > r
raise Fluent::ConfigError, "greater first_worker_id<#{l}> than last_worker_id<#{r}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
end

[l, r]
end

def static_label_analyse(conf)
ret = []
conf.elements(name: 'label').each do |e|
name = e.arg

if @labels[name]
raise ConfigError, "Section <label #{name}> appears twice"
end

l = Elem::Label.new(name, e, static_filter_and_output_analyse(e))
ret << l
@labels[name] = l
end

ret
end

def static_filter_and_output_analyse(conf)
ret = []
conf.elements('filter', 'match').each do |e|
type = e['@type']
if type.nil? || type.empty?
raise Fluent::ConfigError, "Missing '@type' parameter on <#{e.name}> directive"
end

if e.name == 'filter'
f = Elem::Format.new(Fluent::Plugin.new_filter(type), e)
ret << f
@filters << f
else
o = Elem::Output.new(Fluent::Plugin.new_output(type), e)
ret << o
@outputs << o
end
end

ret
end

def static_input_analyse(conf)
ret = []
conf.elements(name: 'source').each do |e|
type = e['@type']
if type.nil? || type.empty?
raise Fluent::ConfigError, "Missing '@type' parameter on <#{e.name}> directive"
end

i = Elem::Input.new(Fluent::Plugin.new_input(type), e)
@inputs << i
ret << i
end

ret
end
end
end
157 changes: 157 additions & 0 deletions test/test_static_config_analysis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
require_relative 'helper'

require 'fluent/config'
require 'fluent/static_config_analysis'
# require 'fluent/plugin'
require 'fluent/plugin/out_forward'
require 'fluent/plugin/out_stdout'
require 'fluent/plugin/out_exec'
require 'fluent/plugin/in_forward'
require 'fluent/plugin/in_dummy'
require 'fluent/plugin/filter_grep'
require 'fluent/plugin/filter_stdout'
require 'fluent/plugin/filter_parser'

class StaticConfigAnalysisTest < ::Test::Unit::TestCase
sub_test_case '.call' do
test 'returns outputs, inputs and filters' do
conf_data = <<-CONF
<source>
@type forward
</source>
<filter>
@type grep
</filter>
<match>
@type forward
</match>
CONF

c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
ret = Fluent::StaticConfigAnalysis.call(c)
assert_equal 1, ret.outputs.size
assert_kind_of Fluent::Plugin::ForwardOutput, ret.outputs[0].plugin
assert_equal 1, ret.inputs.size
assert_kind_of Fluent::Plugin::ForwardInput, ret.inputs[0].plugin
assert_equal 1, ret.filters.size
assert_kind_of Fluent::Plugin::GrepFilter, ret.filters[0].plugin
assert_empty ret.labels

assert_equal [Fluent::Plugin::ForwardOutput, Fluent::Plugin::ForwardInput, Fluent::Plugin::GrepFilter], ret.all_plugins.map(&:class)
end

test 'returns wrapped element with worker and label section' do
conf_data = <<-CONF
<source>
@type forward
</source>
<filter>
@type grep
</filter>
<match>
@type forward
</match>
<worker 0>
<source>
@type dummy
</source>
<filter>
@type parser
</filter>
<match>
@type exec
</match>
</worker>
<label @test>
<filter>
@type stdout
</filter>
<match>
@type stdout
</match>
</label>
CONF

c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
ret = Fluent::StaticConfigAnalysis.call(c)
assert_equal [Fluent::Plugin::ExecOutput, Fluent::Plugin::StdoutOutput, Fluent::Plugin::ForwardOutput], ret.outputs.map(&:plugin).map(&:class)
assert_equal [Fluent::Plugin::DummyInput, Fluent::Plugin::ForwardInput], ret.inputs.map(&:plugin).map(&:class)
assert_equal [Fluent::Plugin::ParserFilter, Fluent::Plugin::StdoutFilter, Fluent::Plugin::GrepFilter], ret.filters.map(&:plugin).map(&:class)
assert_equal 1, ret.labels.size
assert_equal '@test', ret.labels[0].name
end

sub_test_case 'raises config error' do
data(
'empty' => ['', 'Missing worker id on <worker> directive'],
'invalid number' => ['a', 'worker id should be integer: a'],
'worker id is negative' => ['-1', 'worker id should be integer: -1'],
'min worker id is less than 0' => ['-1-1', 'worker id should be integer: -1-1'],
'max worker id is less than 0' => ['1--1', 'worker id -1 specified by <worker> directive is not allowed. Available worker id is between 0 and 1'],
'min worker id is greater than workers' => ['0-2', 'worker id 2 specified by <worker> directive is not allowed. Available worker id is between 0 and 1'],
'max worker is less than min worker' => ['1-0', "greater first_worker_id<1> than last_worker_id<0> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"],
)
test 'when worker number is invalid' do |v|
val, msg = v
conf_data = <<-CONF
<worker #{val}>
</worker>
CONF

c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
assert_raise(Fluent::ConfigError.new(msg)) do
Fluent::StaticConfigAnalysis.call(c, workers: 2)
end
end

test 'duplicated label exits' do
conf_data = <<-CONF
<label @dup>
</label>
<label @dup>
</label>
CONF

c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
assert_raise(Fluent::ConfigError.new('Section <label @dup> appears twice')) do
Fluent::StaticConfigAnalysis.call(c, workers: 2)
end
end

data(
'in filter' => 'filter',
'in source' => 'source',
'in match' => 'match',
)
test 'when @type is missing' do |name|
conf_data = <<-CONF
<#{name}>
@type
</#{name}>
CONF
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
assert_raise(Fluent::ConfigError.new("Missing '@type' parameter on <#{name}> directive")) do
Fluent::StaticConfigAnalysis.call(c)
end
end

data(
'in filter' => 'filter',
'in source' => 'source',
'in match' => 'match',
)
test 'when worker has worker section' do |name|
conf_data = <<-CONF
<worker 0>
<worker 0>
</worker>
</worker>
CONF
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
assert_raise(Fluent::ConfigError.new("<worker> section cannot have <worker> directive")) do
Fluent::StaticConfigAnalysis.call(c)
end
end
end
end
end

0 comments on commit fb88222

Please sign in to comment.