-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
to check if config is valid before invoking configure Signed-off-by: Yuta Iwama <[email protected]>
- Loading branch information
Showing
2 changed files
with
339 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
# | ||
# Fluentd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
require 'fluent/config' | ||
require 'fluent/plugin' | ||
|
||
module Fluent | ||
# Static Analysis means anlyis All plugins and Fluent::Element without invokeing Plugin#configure | ||
class StaticConfigAnalysis | ||
module Elem | ||
Input = Struct.new(:plugin, :config) | ||
Output = Struct.new(:plugin, :config) | ||
Format = Struct.new(:plugin, :config) | ||
Label = Struct.new(:name, :config, :nodes) | ||
Worker = Struct.new(:ids, :config, :nodes) | ||
end | ||
|
||
Result = Struct.new(:tree, :outputs, :inputs, :filters, :labels) do | ||
def all_plugins | ||
(outputs + inputs + filters).map(&:plugin) | ||
end | ||
end | ||
|
||
# @param workers [Integer] Number of workers | ||
def self.call(conf, workers: 1) | ||
new(workers).call(conf) | ||
end | ||
|
||
def initialize(workers) | ||
@workers = workers | ||
|
||
reset | ||
end | ||
|
||
def call(config) | ||
reset | ||
|
||
tree = [ | ||
static_worker_analyse(config), | ||
static_label_analyse(config), | ||
static_filter_and_output_analyse(config), | ||
static_input_analyse(config), | ||
].flatten | ||
|
||
Result.new(tree, @outputs, @inputs, @filters, @labels.values) | ||
end | ||
|
||
private | ||
|
||
def reset | ||
@outputs = [] | ||
@inputs = [] | ||
@filters = [] | ||
@labels = {} | ||
end | ||
|
||
def static_worker_analyse(conf) | ||
ret = [] | ||
conf.elements(name: 'worker').each do |config| | ||
ids = parse_worker_id(config) | ||
|
||
config.elements.each do |elem| | ||
unless %w[source match filter label].include?(elem.name) | ||
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive" | ||
end | ||
end | ||
|
||
nodes = [ | ||
static_label_analyse(config), | ||
static_filter_and_output_analyse(config), | ||
static_input_analyse(config), | ||
].flatten | ||
ret << Elem::Worker.new(ids, config, nodes) | ||
end | ||
|
||
ret | ||
end | ||
|
||
def parse_worker_id(conf) | ||
worker_id_str = conf.arg | ||
|
||
if worker_id_str.empty? | ||
raise Fluent::ConfigError, 'Missing worker id on <worker> directive' | ||
end | ||
|
||
l, r = | ||
begin | ||
worker_id_str.split('-', 2).map { |v| Integer(v) } | ||
rescue TypeError, ArgumentError | ||
raise Fluent::ConfigError, "worker id should be integer: #{worker_id_str}" | ||
end | ||
|
||
# Negative nubmer can not be passed here becuase split by '-'. | ||
if l < 0 || l >= @workers | ||
raise Fluent::ConfigError, "worker id #{l} specified by <worker> directive is not allowed. Available worker id is between 0 and #{@workers}" | ||
end | ||
|
||
# e.g. specifed one worker id like `<worker 0>` | ||
if r.nil? | ||
return [l] | ||
end | ||
|
||
if r < 0 || r >= @workers | ||
raise Fluent::ConfigError, "worker id #{r} specified by <worker> directive is not allowed. Available worker id is between 0 and #{@workers-1}" | ||
end | ||
|
||
if l > r | ||
raise Fluent::ConfigError, "greater first_worker_id<#{l}> than last_worker_id<#{r}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>" | ||
end | ||
|
||
[l, r] | ||
end | ||
|
||
def static_label_analyse(conf) | ||
ret = [] | ||
conf.elements(name: 'label').each do |e| | ||
name = e.arg | ||
|
||
if @labels[name] | ||
raise ConfigError, "Section <label #{name}> appears twice" | ||
end | ||
|
||
l = Elem::Label.new(name, e, static_filter_and_output_analyse(e)) | ||
ret << l | ||
@labels[name] = l | ||
end | ||
|
||
ret | ||
end | ||
|
||
def static_filter_and_output_analyse(conf) | ||
ret = [] | ||
conf.elements('filter', 'match').each do |e| | ||
type = e['@type'] | ||
if type.nil? || type.empty? | ||
raise Fluent::ConfigError, "Missing '@type' parameter on <#{e.name}> directive" | ||
end | ||
|
||
if e.name == 'filter' | ||
f = Elem::Format.new(Fluent::Plugin.new_filter(type), e) | ||
ret << f | ||
@filters << f | ||
else | ||
o = Elem::Output.new(Fluent::Plugin.new_output(type), e) | ||
ret << o | ||
@outputs << o | ||
end | ||
end | ||
|
||
ret | ||
end | ||
|
||
def static_input_analyse(conf) | ||
ret = [] | ||
conf.elements(name: 'source').each do |e| | ||
type = e['@type'] | ||
if type.nil? || type.empty? | ||
raise Fluent::ConfigError, "Missing '@type' parameter on <#{e.name}> directive" | ||
end | ||
|
||
i = Elem::Input.new(Fluent::Plugin.new_input(type), e) | ||
@inputs << i | ||
ret << i | ||
end | ||
|
||
ret | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
require_relative 'helper' | ||
|
||
require 'fluent/config' | ||
require 'fluent/static_config_analysis' | ||
# require 'fluent/plugin' | ||
require 'fluent/plugin/out_forward' | ||
require 'fluent/plugin/out_stdout' | ||
require 'fluent/plugin/out_exec' | ||
require 'fluent/plugin/in_forward' | ||
require 'fluent/plugin/in_dummy' | ||
require 'fluent/plugin/filter_grep' | ||
require 'fluent/plugin/filter_stdout' | ||
require 'fluent/plugin/filter_parser' | ||
|
||
class StaticConfigAnalysisTest < ::Test::Unit::TestCase | ||
sub_test_case '.call' do | ||
test 'returns outputs, inputs and filters' do | ||
conf_data = <<-CONF | ||
<source> | ||
@type forward | ||
</source> | ||
<filter> | ||
@type grep | ||
</filter> | ||
<match> | ||
@type forward | ||
</match> | ||
CONF | ||
|
||
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) | ||
ret = Fluent::StaticConfigAnalysis.call(c) | ||
assert_equal 1, ret.outputs.size | ||
assert_kind_of Fluent::Plugin::ForwardOutput, ret.outputs[0].plugin | ||
assert_equal 1, ret.inputs.size | ||
assert_kind_of Fluent::Plugin::ForwardInput, ret.inputs[0].plugin | ||
assert_equal 1, ret.filters.size | ||
assert_kind_of Fluent::Plugin::GrepFilter, ret.filters[0].plugin | ||
assert_empty ret.labels | ||
|
||
assert_equal [Fluent::Plugin::ForwardOutput, Fluent::Plugin::ForwardInput, Fluent::Plugin::GrepFilter], ret.all_plugins.map(&:class) | ||
end | ||
|
||
test 'returns wrapped element with worker and label section' do | ||
conf_data = <<-CONF | ||
<source> | ||
@type forward | ||
</source> | ||
<filter> | ||
@type grep | ||
</filter> | ||
<match> | ||
@type forward | ||
</match> | ||
<worker 0> | ||
<source> | ||
@type dummy | ||
</source> | ||
<filter> | ||
@type parser | ||
</filter> | ||
<match> | ||
@type exec | ||
</match> | ||
</worker> | ||
<label @test> | ||
<filter> | ||
@type stdout | ||
</filter> | ||
<match> | ||
@type stdout | ||
</match> | ||
</label> | ||
CONF | ||
|
||
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) | ||
ret = Fluent::StaticConfigAnalysis.call(c) | ||
assert_equal [Fluent::Plugin::ExecOutput, Fluent::Plugin::StdoutOutput, Fluent::Plugin::ForwardOutput], ret.outputs.map(&:plugin).map(&:class) | ||
assert_equal [Fluent::Plugin::DummyInput, Fluent::Plugin::ForwardInput], ret.inputs.map(&:plugin).map(&:class) | ||
assert_equal [Fluent::Plugin::ParserFilter, Fluent::Plugin::StdoutFilter, Fluent::Plugin::GrepFilter], ret.filters.map(&:plugin).map(&:class) | ||
assert_equal 1, ret.labels.size | ||
assert_equal '@test', ret.labels[0].name | ||
end | ||
|
||
sub_test_case 'raises config error' do | ||
data( | ||
'empty' => ['', 'Missing worker id on <worker> directive'], | ||
'invalid number' => ['a', 'worker id should be integer: a'], | ||
'worker id is negative' => ['-1', 'worker id should be integer: -1'], | ||
'min worker id is less than 0' => ['-1-1', 'worker id should be integer: -1-1'], | ||
'max worker id is less than 0' => ['1--1', 'worker id -1 specified by <worker> directive is not allowed. Available worker id is between 0 and 1'], | ||
'min worker id is greater than workers' => ['0-2', 'worker id 2 specified by <worker> directive is not allowed. Available worker id is between 0 and 1'], | ||
'max worker is less than min worker' => ['1-0', "greater first_worker_id<1> than last_worker_id<0> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"], | ||
) | ||
test 'when worker number is invalid' do |v| | ||
val, msg = v | ||
conf_data = <<-CONF | ||
<worker #{val}> | ||
</worker> | ||
CONF | ||
|
||
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) | ||
assert_raise(Fluent::ConfigError.new(msg)) do | ||
Fluent::StaticConfigAnalysis.call(c, workers: 2) | ||
end | ||
end | ||
|
||
test 'duplicated label exits' do | ||
conf_data = <<-CONF | ||
<label @dup> | ||
</label> | ||
<label @dup> | ||
</label> | ||
CONF | ||
|
||
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) | ||
assert_raise(Fluent::ConfigError.new('Section <label @dup> appears twice')) do | ||
Fluent::StaticConfigAnalysis.call(c, workers: 2) | ||
end | ||
end | ||
|
||
data( | ||
'in filter' => 'filter', | ||
'in source' => 'source', | ||
'in match' => 'match', | ||
) | ||
test 'when @type is missing' do |name| | ||
conf_data = <<-CONF | ||
<#{name}> | ||
@type | ||
</#{name}> | ||
CONF | ||
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) | ||
assert_raise(Fluent::ConfigError.new("Missing '@type' parameter on <#{name}> directive")) do | ||
Fluent::StaticConfigAnalysis.call(c) | ||
end | ||
end | ||
|
||
data( | ||
'in filter' => 'filter', | ||
'in source' => 'source', | ||
'in match' => 'match', | ||
) | ||
test 'when worker has worker section' do |name| | ||
conf_data = <<-CONF | ||
<worker 0> | ||
<worker 0> | ||
</worker> | ||
</worker> | ||
CONF | ||
c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) | ||
assert_raise(Fluent::ConfigError.new("<worker> section cannot have <worker> directive")) do | ||
Fluent::StaticConfigAnalysis.call(c) | ||
end | ||
end | ||
end | ||
end | ||
end |