Skip to content

Commit

Permalink
Merge pull request #1708 from fluent/chunk_id-placeholder
Browse files Browse the repository at this point in the history
out_file/out_secondary_file: Support ${chunk_id} placeholder. fix #1705
  • Loading branch information
repeatedly authored Oct 13, 2017
2 parents b9b79e7 + 04f9058 commit f2736ac
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 29 deletions.
6 changes: 3 additions & 3 deletions lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ def configure(conf)
dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

test_meta1 = metadata_for_test(dummy_tag, Fluent::Engine.now, dummy_record)
test_path = extract_placeholders(@path_template, test_meta1)
test_chunk1 = chunk_for_test(dummy_tag, Fluent::Engine.now, dummy_record)
test_path = extract_placeholders(@path_template, test_chunk1)
unless ::Fluent::FileUtil.writable_p?(test_path)
raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
end
Expand Down Expand Up @@ -178,7 +178,7 @@ def format(tag, time, record)
end

def write(chunk)
path = extract_placeholders(@path_template, chunk.metadata)
path = extract_placeholders(@path_template, chunk)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

writer = case
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def multi_workers_ready?
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

Expand Down Expand Up @@ -106,7 +106,7 @@ def validate_compatible_with_primary_buffer!(path_without_suffix)
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory"
end

vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) }
vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) || (placeholder == 'chunk_id') }

if ph = vars.find { |v| !@chunk_keys.include?(v) }
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory"
Expand Down
36 changes: 32 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Output < Base
CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/
CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@$a-zA-Z0-9]+\}/
CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[\d+\])?)\}/
CHUNK_ID_PLACEHOLDER_PATTERN = /\$\{chunk_id\}/

CHUNKING_FIELD_WARN_NUM = 4

Expand Down Expand Up @@ -679,13 +680,27 @@ def get_placeholders_tag(str)
end

def get_placeholders_keys(str)
str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| s == "tag"}.sort
str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| (s == "tag") || (s == 'chunk_id') }.sort
end

# TODO: optimize this code
def extract_placeholders(str, metadata)
def extract_placeholders(str, chunk)
metadata = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk)
chunk_passed = true
chunk.metadata
else
chunk_passed = false
# For existing plugins. Old plugin passes Chunk.metadata instead of Chunk
chunk
end
if metadata.empty?
str
str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) {
if chunk_passed
dump_unique_id_hex(chunk.unique_id)
else
log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument"
end
}
else
rvalue = str.dup
# strftime formatting
Expand Down Expand Up @@ -720,7 +735,13 @@ def extract_placeholders(str, metadata)
if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN
log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}"
end
rvalue
rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) {
if chunk_passed
dump_unique_id_hex(chunk.unique_id)
else
log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument"
end
}
end
end

Expand Down Expand Up @@ -801,6 +822,13 @@ def metadata(tag, time, record)
end
end

def chunk_for_test(tag, time, record)
require 'fluent/plugin/buffer/memory_chunk'

m = metadata_for_test(tag, time, record)
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end

def metadata_for_test(tag, time, record)
raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty?
m = metadata(tag, time, record)
Expand Down
29 changes: 29 additions & 0 deletions test/plugin/test_out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,35 @@ def parse_system(text)
check_gzipped_result(path, formatted_lines * 3)
end

test '${chunk_id}' do
time = event_time("2011-01-02 13:14:15 UTC")
formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]

write_once = ->(){
d = create_driver %[
path #{TMP_DIR}/out_file_chunk_id_${chunk_id}
utc
append true
<buffer>
timekey_use_utc true
</buffer>
]
d.run(default_tag: 'test'){
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
}
d.instance.last_written_path
}

path = write_once.call
if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).20110102.log/
unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate)
assert_equal unique_id.size, $1.size, "chunk_id size is mismatched"
else
flunk "chunk_id is not included in the path"
end
end

test 'symlink' do
omit "Windows doesn't support symlink" if Fluent.windows?
conf = CONFIG + %[
Expand Down
14 changes: 14 additions & 0 deletions test/plugin/test_out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,20 @@ def create_chunk(primary, metadata, es)
assert_equal "#{TMP_DIR}/dump.bin", path
end

test 'path with ${chunk_id}' do
d = create_driver %[
directory #{TMP_DIR}
basename out_file_chunk_id_${chunk_id}
]
path = d.instance.write(@c)
if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/
unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate)
assert_equal unique_id.size, $1.size, "chunk_id size is mismatched"
else
flunk "chunk_id is not included in the path"
end
end

data(
invalid_tag: [/tag/, '${tag}'],
invalid_tag0: [/tag\[0\]/, '${tag[0]}'],
Expand Down
97 changes: 77 additions & 20 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def create_output(type=:full)
def create_metadata(timekey: nil, tag: nil, variables: nil)
Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
end
def create_chunk(timekey: nil, tag: nil, variables: nil)
m = Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end
def waiting(seconds)
begin
Timeout.timeout(seconds) do
Expand Down Expand Up @@ -219,88 +223,141 @@ def waiting(seconds)
assert @i.terminated?
end

test '#extract_placeholders does nothing if chunk key is not specified' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders does nothing if chunk key is not specified' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
assert !@i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal [], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal tmpl, @i.extract_placeholders(tmpl, m)
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal tmpl, @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders can extract time if time key and range are configured' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders can extract time if time key and range are configured' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time', {'timekey' => 60*30, 'timekey_zone' => "+0900"})]))
assert @i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal [], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/2016/04/11/20-30/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, m)
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal "/mypath/2016/04/11/20-30/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders can extract tag and parts of tag if tag is configured' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders can extract tag and parts of tag if tag is configured' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'tag', {})]))
assert !@i.chunk_key_time
assert @i.chunk_key_tag
assert_equal [], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/%Y/%m/%d/%H-%M/fluentd.test.output/test/output/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, m)
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal "/mypath/%Y/%m/%d/%H-%M/fluentd.test.output/test/output/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders can extract variables if variables are configured' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders can extract variables if variables are configured' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'key1,key2', {})]))
assert !@i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal ['key1','key2'], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m)
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders can extract nested variables if variables are configured with dot notation' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders can extract nested variables if variables are configured with dot notation' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'key,$.nest.key', {})]))
assert !@i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal ['key','$.nest.key'], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key}/${$.nest.key}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {:key => "value1", :"$.nest.key" => "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m)
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders can extract all chunk keys if configured' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders can extract all chunk keys if configured' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,key1,key2', {'timekey' => 60*30, 'timekey_zone' => "+0900"})]))
assert @i.chunk_key_time
assert @i.chunk_key_tag
assert_equal ['key1','key2'], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/test/output/value1/value2/tail", @i.extract_placeholders(tmpl, m)
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/test/output/value1/value2/tail", @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders removes out-of-range tag part and unknown variable placeholders' do
data(:new_api => :chunk,
:old_api => :metadata)
test '#extract_placeholders removes out-of-range tag part and unknown variable placeholders' do |api|
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,key1,key2', {'timekey' => 60*30, 'timekey_zone' => "+0900"})]))
assert @i.chunk_key_time
assert @i.chunk_key_tag
assert_equal ['key1','key2'], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[3]}/${tag[4]}/${key3}/${key4}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
c = if api == :chunk
create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v)
else
create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
end
assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/////tail", @i.extract_placeholders(tmpl, c)
end

test '#extract_placeholders logs warn message if metadata is passed for ${chunk_id} placeholder' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')]))
tmpl = "/mypath/${chunk_id}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {key1: "value1", key2: "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/////tail", @i.extract_placeholders(tmpl, m)
@i.extract_placeholders(tmpl, m)
logs = @i.log.out.logs
assert { logs.any? { |log| log.include?("${chunk_id} is not allowed in this plugin") } }
end

sub_test_case '#placeholder_validators' do
Expand Down

0 comments on commit f2736ac

Please sign in to comment.