Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validators for buffering key extraction #1255

Merged
merged 2 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 148 additions & 6 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Output < Base

CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/
CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@a-zA-Z0-9]+\}/
CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[\d+\])?)\}/

CHUNKING_FIELD_WARN_NUM = 4

Expand Down Expand Up @@ -486,27 +487,165 @@ def implement?(feature)
end
end

def placeholder_validate!(name, str)
placeholder_validators(name, str).each do |v|
v.validate!
end
end

def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys)
validators = []

sec, title, example = get_placeholders_time(str)
if sec || time_key
validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key})
end

parts = get_placeholders_tag(str)
if tag_key || !parts.empty?
validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key})
end

keys = get_placeholders_keys(str)
if chunk_keys && !chunk_keys.empty? || !keys.empty?
validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys})
end

validators
end

class PlaceholderValidator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think separating PlaceholderValidator into TagPlaceholderValidator, TimePlaceholderValidator, KeysPlaceholderValidator is better.
It removes time?, tag? and keys? helpers and reduces method call overhead in validate!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't work. We should provide the possibility to plugin authors, for selecting which validator should be called or not.
For example, table might not have timestamp placeholders even when time chunk key is configured for any databases which can handle automatically generated time partitions. In such cases, the validator for time can be skipped by plugin authors (by using validator.time?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

attr_reader :name, :string, :type, :argument

def initialize(name, str, type, arg)
@name = name
@string = str
@type = type
raise ArgumentError, "invalid type:#{type}" if @type != :time && @type != :tag && @type != :keys
@argument = arg
end

def time?
@type == :time
end

def tag?
@type == :tag
end

def keys?
@type == :keys
end

def validate!
case @type
when :time then validate_time!
when :tag then validate_tag!
when :keys then validate_keys!
end
end

def validate_time!
sec = @argument[:sec]
title = @argument[:title]
example = @argument[:example]
timekey = @argument[:timekey]
if !sec && timekey
raise Fluent::ConfigError, "Parameter '#{name}' doesn't have timestamp placeholders for timekey #{timekey.to_i}"
end
if sec && !timekey
raise Fluent::ConfigError, "Parameter '#{name}' has timestamp placeholders, but chunk key 'time' is not configured"
end
if sec && timekey && timekey < sec
raise Fluent::ConfigError, "Parameter '#{@name}' doesn't have timestamp placeholder for #{title}('#{example}') for timekey #{timekey.to_i}"
end
end

def validate_tag!
parts = @argument[:parts]
tagkey = @argument[:tagkey]
if tagkey && parts.empty?
raise Fluent::ConfigError, "Parameter '#{@name}' doesn't have tag placeholder"
end
if !tagkey && !parts.empty?
raise Fluent::ConfigError, "Parameter '#{@name}' has tag placeholders, but chunk key 'tag' is not configured"
end
end

def validate_keys!
keys = @argument[:keys]
chunk_keys = @argument[:chunkkeys]
if (chunk_keys - keys).size > 0
not_specified = (chunk_keys - keys).sort
raise Fluent::ConfigError, "Parameter '#{@name}' doesn't have enough placeholders for keys #{not_specified.join(',')}"
end
if (keys - chunk_keys).size > 0
not_satisfied = (keys - chunk_keys).sort
raise Fluent::ConfigError, "Parameter '#{@name}' has placeholders, but chunk keys doesn't have keys #{not_satisfied.join(',')}"
end
end
end

TIME_KEY_PLACEHOLDER_THRESHOLDS = [
[1, :second, '%S'],
[60, :minute, '%M'],
[3600, :hour, '%H'],
[86400, :day, '%d'],
]
TIMESTAMP_CHECK_BASE_TIME = Time.parse("2016-01-01 00:00:00 UTC")
# it's not validated to use timekey larger than 1 day
def get_placeholders_time(str)
base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str)
TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple|
sec = triple.first
return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str
end
nil
end

# -1 means whole tag
def get_placeholders_tag(str)
# [["tag"],["tag[0]"]]
parts = []
str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph|
if ph == "tag"
parts << -1
elsif ph =~ /^tag\[(\d+)\]$/
parts << $1.to_i
end
end
parts.sort
end

def get_placeholders_keys(str)
str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| s == "tag"}.sort
end

# TODO: optimize this code
def extract_placeholders(str, metadata)
if metadata.empty?
str
else
rvalue = str
rvalue = str.dup
# strftime formatting
if @chunk_key_time # this section MUST be earlier than rest to use raw 'str'
@output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str)
rvalue = @output_time_formatter_cache[str].call(metadata.timekey)
end
# ${tag}, ${tag[0]}, ${tag[1]}, ...
if @chunk_key_tag
if str =~ /\$\{tag\[\d+\]\}/
hash = {'${tag}' => metadata.tag}
if str.include?('${tag}')
rvalue = rvalue.gsub('${tag}', metadata.tag)
end
if str =~ CHUNK_TAG_PLACEHOLDER_PATTERN
hash = {}
metadata.tag.split('.').each_with_index do |part, i|
hash["${tag[#{i}]}"] = part
end
rvalue = rvalue.gsub(/\$\{tag(\[\d+\])?\}/, hash)
elsif str.include?('${tag}')
rvalue = rvalue.gsub('${tag}', metadata.tag)
rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash)
end
if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN
log.warn "tag placeholder '#{$1}' not replaced. tag:#{metadata.tag}, template:#{str}"
end
end
# ${a_chunk_key}, ...
Expand All @@ -517,6 +656,9 @@ def extract_placeholders(str, metadata)
end
rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN, hash)
end
if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN
log.warn "chunk key placeholder '#{$1}' not replaced. templace:#{str}"
end
rvalue
end
end
Expand Down
202 changes: 202 additions & 0 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,208 @@ def waiting(seconds)
assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/////tail", @i.extract_placeholders(tmpl, m)
end

sub_test_case '#placeholder_validators' do
test 'returns validators for time, tag and keys when a template has placeholders even if plugin is not configured with these keys' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
validators = @i.placeholder_validators(:path, "/my/path/${tag}/${username}/file.%Y%m%d_%H%M.log")
assert_equal 3, validators.size
assert_equal 1, validators.select(&:time?).size
assert_equal 1, validators.select(&:tag?).size
assert_equal 1, validators.select(&:keys?).size
end

test 'returns validators for time, tag and keys when a plugin is configured with these keys even if a template does not have placeholders' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,username', {'timekey' => 60})]))
validators = @i.placeholder_validators(:path, "/my/path/file.log")
assert_equal 3, validators.size
assert_equal 1, validators.select(&:time?).size
assert_equal 1, validators.select(&:tag?).size
assert_equal 1, validators.select(&:keys?).size
end

test 'returns a validator for time if a template has timestamp placeholders' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
validators = @i.placeholder_validators(:path, "/my/path/file.%Y-%m-%d.log")
assert_equal 1, validators.size
assert_equal 1, validators.select(&:time?).size
assert_raise Fluent::ConfigError.new("Parameter 'path' has timestamp placeholders, but chunk key 'time' is not configured") do
validators.first.validate!
end
end

test 'returns a validator for time if a plugin is configured with time key' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time', {'timekey' => '30'})]))
validators = @i.placeholder_validators(:path, "/my/path/to/file.log")
assert_equal 1, validators.size
assert_equal 1, validators.select(&:time?).size
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have timestamp placeholders for timekey 30") do
validators.first.validate!
end
end

test 'returns a validator for tag if a template has tag placeholders' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
validators = @i.placeholder_validators(:path, "/my/path/${tag}/file.log")
assert_equal 1, validators.size
assert_equal 1, validators.select(&:tag?).size
assert_raise Fluent::ConfigError.new("Parameter 'path' has tag placeholders, but chunk key 'tag' is not configured") do
validators.first.validate!
end
end

test 'returns a validator for tag if a plugin is configured with tag key' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'tag')]))
validators = @i.placeholder_validators(:path, "/my/path/file.log")
assert_equal 1, validators.size
assert_equal 1, validators.select(&:tag?).size
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have tag placeholder") do
validators.first.validate!
end
end

test 'returns a validator for variable keys if a template has variable placeholders' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
validators = @i.placeholder_validators(:path, "/my/path/${username}/file.${group}.log")
assert_equal 1, validators.size
assert_equal 1, validators.select(&:keys?).size
assert_raise Fluent::ConfigError.new("Parameter 'path' has placeholders, but chunk keys doesn't have keys group,username") do
validators.first.validate!
end
end

test 'returns a validator for variable keys if a plugin is configured with variable keys' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'username,group')]))
validators = @i.placeholder_validators(:path, "/my/path/file.log")
assert_equal 1, validators.size
assert_equal 1, validators.select(&:keys?).size
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have enough placeholders for keys group,username") do
validators.first.validate!
end
end
end

sub_test_case '#placeholder_validate!' do
test 'raises configuration error for a templace when timestamp placeholders exist but time key is missing' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
assert_raise Fluent::ConfigError.new("Parameter 'path' has timestamp placeholders, but chunk key 'time' is not configured") do
@i.placeholder_validate!(:path, "/path/without/timestamp/file.%Y%m%d-%H%M.log")
end
end

test 'raises configuration error for a template without timestamp placeholders when timekey is configured' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time', {"timekey" => 180})]))
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have timestamp placeholders for timekey 180") do
@i.placeholder_validate!(:path, "/my/path/file.log")
end
assert_nothing_raised do
@i.placeholder_validate!(:path, "/my/path/%Y%m%d/file.%H%M.log")
end
end

test 'raises configuration error for a template with timestamp placeholders when plugin is configured more fine timekey' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time', {"timekey" => 180})]))
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have timestamp placeholder for hour('%H') for timekey 180") do
@i.placeholder_validate!(:path, "/my/path/file.%Y%m%d_%H.log")
end
assert_nothing_raised do
@i.placeholder_validate!(:path, "/my/path/file.%Y%m%d_%H%M.log")
end
end

test 'raises configuration error for a template when tag placeholders exist but tag key is missing' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
assert_raise Fluent::ConfigError.new("Parameter 'path' has tag placeholders, but chunk key 'tag' is not configured") do
@i.placeholder_validate!(:path, "/my/path/${tag}/file.${tag[2]}.log")
end
end

test 'raises configuration error for a template without tag placeholders when tagkey is configured' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'tag')]))
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have tag placeholder") do
@i.placeholder_validate!(:path, "/my/path/file.log")
end
assert_nothing_raised do
@i.placeholder_validate!(:path, "/my/path/${tag}/file.${tag[2]}.log")
end
end

test 'raises configuration error for a template when variable key placeholders exist but chunk keys are missing' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
assert_raise Fluent::ConfigError.new("Parameter 'path' has placeholders, but chunk keys doesn't have keys service,username") do
@i.placeholder_validate!(:path, "/my/path/${service}/file.${username}.log")
end
end

test 'raises configuration error for a template without variable key placeholders when chunk keys are configured' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'username,service')]))
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have enough placeholders for keys service,username") do
@i.placeholder_validate!(:path, "/my/path/file.log")
end
assert_nothing_raised do
@i.placeholder_validate!(:path, "/my/path/${service}/file.${username}.log")
end
end

test 'raise configuration error for a template and configuration with keys mismatch' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'username,service')]))
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have enough placeholders for keys service") do
@i.placeholder_validate!(:path, "/my/path/file.${username}.log")
end
assert_raise Fluent::ConfigError.new("Parameter 'path' doesn't have enough placeholders for keys username") do
@i.placeholder_validate!(:path, "/my/path/${service}/file.log")
end
assert_nothing_raised do
@i.placeholder_validate!(:path, "/my/path/${service}/file.${username}.log")
end
end
end

test '#get_placeholders_time returns seconds,title and example placeholder for a template' do
s, t, e = @i.get_placeholders_time("/path/to/dir/yay")
assert_nil s
assert_nil t
assert_nil e

s, t, e = @i.get_placeholders_time("/path/to/%Y%m%d/yay")
assert_equal 86400, s
assert_equal :day, t
assert_equal '%d', e
s, t, e = @i.get_placeholders_time("my birthiday! at %F")
assert_equal 86400, s
assert_equal :day, t
assert_equal '%d', e

s, t, e = @i.get_placeholders_time("myfile.%Y-%m-%d_%H.log")
assert_equal 3600, s
assert_equal :hour, t
assert_equal '%H', e

s, t, e = @i.get_placeholders_time("part-%Y%m%d-%H%M.ts")
assert_equal 60, s
assert_equal :minute, t
assert_equal '%M', e

s, t, e = @i.get_placeholders_time("my first data at %F %T %z")
assert_equal 1, s
assert_equal :second, t
assert_equal '%S', e
end

test '#get_placeholders_tag returns a list of tag part position for a template' do
assert_equal [], @i.get_placeholders_tag("db.table")
assert_equal [], @i.get_placeholders_tag("db.table_${non_tag}")
assert_equal [-1], @i.get_placeholders_tag("table_${tag}")
assert_equal [0, 1], @i.get_placeholders_tag("db_${tag[0]}.table_${tag[1]}")
assert_equal [-1, 0], @i.get_placeholders_tag("/treedir/${tag[0]}/${tag}")
end

test '#get_placeholders_keys returns a list of keys for a template' do
assert_equal [], @i.get_placeholders_keys("/path/to/my/data/file.log")
assert_equal [], @i.get_placeholders_keys("/path/to/my/${tag}/file.log")
assert_equal ['key1', 'key2'], @i.get_placeholders_keys("/path/to/${key2}/${tag}/file.${key1}.log")
assert_equal ['.hidden', '0001', '@timestamp', 'a_key', 'my-domain'], @i.get_placeholders_keys("http://${my-domain}/${.hidden}/${0001}/${a_key}?timestamp=${@timestamp}")
end

test '#metadata returns object which contains tag/timekey/variables from records as specified in configuration' do
tag = 'test.output'
time = event_time('2016-04-12 15:31:23 -0700')
Expand Down