diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index c7033968e9..18b546e0e2 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -142,7 +142,7 @@ def configure(conf) dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)] test_meta1 = metadata_for_test(dummy_tag, Fluent::Engine.now, dummy_record) - test_path = extract_placeholders(@path_template, test_meta1) + test_path = extract_placeholders(@path_template.gsub(CHUNK_ID_PLACEHOLDER_PATTERN, 'test'), test_meta1) unless ::Fluent::FileUtil.writable_p?(test_path) raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" end @@ -178,7 +178,7 @@ def format(tag, time, record) end def write(chunk) - path = extract_placeholders(@path_template, chunk.metadata) + path = extract_placeholders(@path_template.gsub(CHUNK_ID_PLACEHOLDER_PATTERN, dump_unique_id_hex(chunk.unique_id)), chunk.metadata) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm writer = case diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index b59d531993..0444316356 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -71,7 +71,7 @@ def multi_workers_ready? end def write(chunk) - path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata) + path_without_suffix = extract_placeholders(@path_without_suffix.gsub(CHUNK_ID_PLACEHOLDER_PATTERN, dump_unique_id_hex(chunk.unique_id)), chunk.metadata) path = generate_path(path_without_suffix) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm @@ -106,7 +106,7 @@ def validate_compatible_with_primary_buffer!(path_without_suffix) raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory" end - vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) } + vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) || (placeholder == 'chunk_id') } if ph = vars.find { |v| !@chunk_keys.include?(v) } raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory" diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b622720b37..51ce587cb6 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -39,6 +39,7 @@ class Output < Base CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@$a-zA-Z0-9]+\}/ CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[\d+\])?)\}/ + CHUNK_ID_PLACEHOLDER_PATTERN = /\$\{chunk_id\}/ CHUNKING_FIELD_WARN_NUM = 4 @@ -679,7 +680,7 @@ def get_placeholders_tag(str) end def get_placeholders_keys(str) - str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| s == "tag"}.sort + str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| (s == "tag") || (s == 'chunk_id') }.sort end # TODO: optimize this code diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index c77fa101e2..f68624effb 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -557,6 +557,35 @@ def parse_system(text) check_gzipped_result(path, formatted_lines * 3) end + test '${chunk_id}' do + time = event_time("2011-01-02 13:14:15 UTC") + formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + + write_once = ->(){ + d = create_driver %[ + path #{TMP_DIR}/out_file_chunk_id_${chunk_id} + utc + append true + + timekey_use_utc true + + ] + d.run(default_tag: 'test'){ + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + } + d.instance.last_written_path + } + + path = write_once.call + if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).20110102.log/ + unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) + assert_equal unique_id.size, $1.size, "chunk_id size is mismatched" + else + flunk "chunk_id is not included in the path" + end + end + test 'symlink' do omit "Windows doesn't support symlink" if Fluent.windows? conf = CONFIG + %[ diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 0e06cb9be9..9bd88968d5 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -260,6 +260,20 @@ def create_chunk(primary, metadata, es) assert_equal "#{TMP_DIR}/dump.bin", path end + test 'path with ${chunk_id}' do + d = create_driver %[ + directory #{TMP_DIR} + basename out_file_chunk_id_${chunk_id} + ] + path = d.instance.write(@c) + if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/ + unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) + assert_equal unique_id.size, $1.size, "chunk_id size is mismatched" + else + flunk "chunk_id is not included in the path" + end + end + data( invalid_tag: [/tag/, '${tag}'], invalid_tag0: [/tag\[0\]/, '${tag[0]}'],