diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index c7033968e9..e4b6ddeaa2 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -141,8 +141,8 @@ def configure(conf) dummy_record_keys = get_placeholders_keys(@path_template) || ['message'] dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)] - test_meta1 = metadata_for_test(dummy_tag, Fluent::Engine.now, dummy_record) - test_path = extract_placeholders(@path_template, test_meta1) + test_chunk1 = chunk_for_test(dummy_tag, Fluent::Engine.now, dummy_record) + test_path = extract_placeholders(@path_template, test_chunk1) unless ::Fluent::FileUtil.writable_p?(test_path) raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" end @@ -178,7 +178,7 @@ def format(tag, time, record) end def write(chunk) - path = extract_placeholders(@path_template, chunk.metadata) + path = extract_placeholders(@path_template, chunk) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm writer = case diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index b59d531993..f188354dac 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -71,7 +71,7 @@ def multi_workers_ready? end def write(chunk) - path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata) + path_without_suffix = extract_placeholders(@path_without_suffix, chunk) path = generate_path(path_without_suffix) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm @@ -106,7 +106,7 @@ def validate_compatible_with_primary_buffer!(path_without_suffix) raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory" end - vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) } + vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) || (placeholder == 'chunk_id') } if ph = vars.find { |v| !@chunk_keys.include?(v) } raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory" diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b622720b37..5c19e88b08 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -39,6 +39,7 @@ class Output < Base CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@$a-zA-Z0-9]+\}/ CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[\d+\])?)\}/ + CHUNK_ID_PLACEHOLDER_PATTERN = /\$\{chunk_id\}/ CHUNKING_FIELD_WARN_NUM = 4 @@ -679,13 +680,27 @@ def get_placeholders_tag(str) end def get_placeholders_keys(str) - str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| s == "tag"}.sort + str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| (s == "tag") || (s == 'chunk_id') }.sort end # TODO: optimize this code - def extract_placeholders(str, metadata) + def extract_placeholders(str, chunk) + metadata = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk) + chunk_passed = true + chunk.metadata + else + chunk_passed = false + # For existing plugins. Old plugin passes Chunk.metadata instead of Chunk + chunk + end if metadata.empty? - str + str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { + if chunk_passed + dump_unique_id_hex(chunk.unique_id) + else + log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" + end + } else rvalue = str.dup # strftime formatting @@ -720,7 +735,13 @@ def extract_placeholders(str, metadata) if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}" end - rvalue + rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { + if chunk_passed + dump_unique_id_hex(chunk.unique_id) + else + log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" + end + } end end @@ -801,6 +822,13 @@ def metadata(tag, time, record) end end + def chunk_for_test(tag, time, record) + require 'fluent/plugin/buffer/memory_chunk' + + m = metadata_for_test(tag, time, record) + Fluent::Plugin::Buffer::MemoryChunk.new(m) + end + def metadata_for_test(tag, time, record) raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty? m = metadata(tag, time, record) diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index c77fa101e2..f68624effb 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -557,6 +557,35 @@ def parse_system(text) check_gzipped_result(path, formatted_lines * 3) end + test '${chunk_id}' do + time = event_time("2011-01-02 13:14:15 UTC") + formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + + write_once = ->(){ + d = create_driver %[ + path #{TMP_DIR}/out_file_chunk_id_${chunk_id} + utc + append true + + timekey_use_utc true + + ] + d.run(default_tag: 'test'){ + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + } + d.instance.last_written_path + } + + path = write_once.call + if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).20110102.log/ + unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) + assert_equal unique_id.size, $1.size, "chunk_id size is mismatched" + else + flunk "chunk_id is not included in the path" + end + end + test 'symlink' do omit "Windows doesn't support symlink" if Fluent.windows? conf = CONFIG + %[ diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 0e06cb9be9..9bd88968d5 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -260,6 +260,20 @@ def create_chunk(primary, metadata, es) assert_equal "#{TMP_DIR}/dump.bin", path end + test 'path with ${chunk_id}' do + d = create_driver %[ + directory #{TMP_DIR} + basename out_file_chunk_id_${chunk_id} + ] + path = d.instance.write(@c) + if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/ + unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) + assert_equal unique_id.size, $1.size, "chunk_id size is mismatched" + else + flunk "chunk_id is not included in the path" + end + end + data( invalid_tag: [/tag/, '${tag}'], invalid_tag0: [/tag\[0\]/, '${tag[0]}'], diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 48dde91d3f..22497ed24a 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -125,6 +125,10 @@ def create_output(type=:full) def create_metadata(timekey: nil, tag: nil, variables: nil) Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) end + def create_chunk(timekey: nil, tag: nil, variables: nil) + m = Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) + Fluent::Plugin::Buffer::MemoryChunk.new(m) + end def waiting(seconds) begin Timeout.timeout(seconds) do @@ -219,7 +223,9 @@ def waiting(seconds) assert @i.terminated? end - test '#extract_placeholders does nothing if chunk key is not specified' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders does nothing if chunk key is not specified' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')])) assert !@i.chunk_key_time assert !@i.chunk_key_tag @@ -227,11 +233,17 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {key1: "value1", key2: "value2"} - m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal tmpl, @i.extract_placeholders(tmpl, m) + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal tmpl, @i.extract_placeholders(tmpl, c) end - test '#extract_placeholders can extract time if time key and range are configured' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders can extract time if time key and range are configured' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time', {'timekey' => 60*30, 'timekey_zone' => "+0900"})])) assert @i.chunk_key_time assert !@i.chunk_key_tag @@ -239,11 +251,17 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {key1: "value1", key2: "value2"} - m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal "/mypath/2016/04/11/20-30/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, m) + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal "/mypath/2016/04/11/20-30/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, c) end - test '#extract_placeholders can extract tag and parts of tag if tag is configured' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders can extract tag and parts of tag if tag is configured' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'tag', {})])) assert !@i.chunk_key_time assert @i.chunk_key_tag @@ -251,11 +269,17 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {key1: "value1", key2: "value2"} - m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal "/mypath/%Y/%m/%d/%H-%M/fluentd.test.output/test/output/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, m) + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal "/mypath/%Y/%m/%d/%H-%M/fluentd.test.output/test/output/${key1}/${key2}/tail", @i.extract_placeholders(tmpl, c) end - test '#extract_placeholders can extract variables if variables are configured' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders can extract variables if variables are configured' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'key1,key2', {})])) assert !@i.chunk_key_time assert !@i.chunk_key_tag @@ -263,11 +287,17 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {key1: "value1", key2: "value2"} - m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m) + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, c) end - test '#extract_placeholders can extract nested variables if variables are configured with dot notation' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders can extract nested variables if variables are configured with dot notation' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'key,$.nest.key', {})])) assert !@i.chunk_key_time assert !@i.chunk_key_tag @@ -275,11 +305,17 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key}/${$.nest.key}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {:key => "value1", :"$.nest.key" => "value2"} - m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m) + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, c) end - test '#extract_placeholders can extract all chunk keys if configured' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders can extract all chunk keys if configured' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,key1,key2', {'timekey' => 60*30, 'timekey_zone' => "+0900"})])) assert @i.chunk_key_time assert @i.chunk_key_tag @@ -287,11 +323,17 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key1}/${key2}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {key1: "value1", key2: "value2"} - m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/test/output/value1/value2/tail", @i.extract_placeholders(tmpl, m) + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/test/output/value1/value2/tail", @i.extract_placeholders(tmpl, c) end - test '#extract_placeholders removes out-of-range tag part and unknown variable placeholders' do + data(:new_api => :chunk, + :old_api => :metadata) + test '#extract_placeholders removes out-of-range tag part and unknown variable placeholders' do |api| @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,key1,key2', {'timekey' => 60*30, 'timekey_zone' => "+0900"})])) assert @i.chunk_key_time assert @i.chunk_key_tag @@ -299,8 +341,23 @@ def waiting(seconds) tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[3]}/${tag[4]}/${key3}/${key4}/tail" t = event_time('2016-04-11 20:30:00 +0900') v = {key1: "value1", key2: "value2"} + c = if api == :chunk + create_chunk(timekey: t, tag: 'fluentd.test.output', variables: v) + else + create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + end + assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/////tail", @i.extract_placeholders(tmpl, c) + end + + test '#extract_placeholders logs warn message if metadata is passed for ${chunk_id} placeholder' do + @i.configure(config_element('ROOT', '', {}, [config_element('buffer', '')])) + tmpl = "/mypath/${chunk_id}/tail" + t = event_time('2016-04-11 20:30:00 +0900') + v = {key1: "value1", key2: "value2"} m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) - assert_equal "/mypath/2016/04/11/20-30/fluentd.test.output/////tail", @i.extract_placeholders(tmpl, m) + @i.extract_placeholders(tmpl, m) + logs = @i.log.out.logs + assert { logs.any? { |log| log.include?("${chunk_id} is not allowed in this plugin") } } end sub_test_case '#placeholder_validators' do