Skip to content

Commit

Permalink
re-implement symlink_path feature on v0.14 buffer APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Apr 28, 2016
1 parent 7604852 commit e3f04ff
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
23 changes: 22 additions & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ class FileOutput < TimeSlicedOutput
desc "Create symlink to temporary buffered file when buffer_type is file."
config_param :symlink_path, :string, default: nil

module SymlinkBufferMixin
def symlink_path=(path)
@_symlink_path = path
end

def generate_chunk(metadata)
chunk = super
latest_chunk = metadata_list.sort_by(&:timekey).last
if chunk.metadata == latest_chunk
FileUtils.ln_sf(chunk.path, @_symlink_path)
end
chunk
end
end

def initialize
require 'zlib'
require 'time'
Expand Down Expand Up @@ -87,7 +102,13 @@ def configure(conf)
@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)

@buffer.symlink_path = @symlink_path if @symlink_path
if @symlink_path && @buffer.respond_to?(:path)
(class << @buffer; self; end).module_eval do
prepend SymlinkBufferMixin
end
@buffer.symlink_path = @symlink_path
end

@dir_perm = system_config.dir_permission || DIR_PERMISSION
@file_perm = system_config.file_permission || FILE_PERMISSION
end
Expand Down
11 changes: 8 additions & 3 deletions test/plugin/test_out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,22 @@ def test_write_with_symlink
begin
d.instance.start
10.times { sleep 0.05 }

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
es = Fluent::OneEventStream.new(time, {"a"=>1})
d.instance.emit('tag', es, Fluent::NullOutputChain.instance)
d.instance.emit_events('tag', es)

assert File.exist?(symlink_path)
assert File.symlink?(symlink_path)

d.instance.enqueue_buffer
es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2})
d.instance.emit_events('tag', es)

assert !File.exist?(symlink_path)
assert File.exist?(symlink_path)
assert File.symlink?(symlink_path)

meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {})
assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path)
ensure
d.instance.shutdown
FileUtils.rm_rf(symlink_path)
Expand Down

0 comments on commit e3f04ff

Please sign in to comment.