diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 621a4212b0..4f96be5e66 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -52,6 +52,21 @@ class FileOutput < TimeSlicedOutput desc "Create symlink to temporary buffered file when buffer_type is file." config_param :symlink_path, :string, default: nil + module SymlinkBufferMixin + def symlink_path=(path) + @_symlink_path = path + end + + def generate_chunk(metadata) + chunk = super + latest_chunk = metadata_list.sort_by(&:timekey).last + if chunk.metadata == latest_chunk + FileUtils.ln_sf(chunk.path, @_symlink_path) + end + chunk + end + end + def initialize require 'zlib' require 'time' @@ -87,7 +102,13 @@ def configure(conf) @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) - @buffer.symlink_path = @symlink_path if @symlink_path + if @symlink_path && @buffer.respond_to?(:path) + (class << @buffer; self; end).module_eval do + prepend SymlinkBufferMixin + end + @buffer.symlink_path = @symlink_path + end + @dir_perm = system_config.dir_permission || DIR_PERMISSION @file_perm = system_config.file_permission || FILE_PERMISSION end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index 6e35fde315..0fe34e8975 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -306,17 +306,22 @@ def test_write_with_symlink begin d.instance.start 10.times { sleep 0.05 } + time = Time.parse("2011-01-02 13:14:15 UTC").to_i es = Fluent::OneEventStream.new(time, {"a"=>1}) - d.instance.emit('tag', es, Fluent::NullOutputChain.instance) + d.instance.emit_events('tag', es) assert File.exist?(symlink_path) assert File.symlink?(symlink_path) - d.instance.enqueue_buffer + es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2}) + d.instance.emit_events('tag', es) - assert !File.exist?(symlink_path) + assert File.exist?(symlink_path) assert File.symlink?(symlink_path) + + meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {}) + assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path) ensure d.instance.shutdown FileUtils.rm_rf(symlink_path)