Skip to content

Commit

Permalink
migrate out_file to v0.14 API
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Oct 4, 2016
1 parent 17ff860 commit 3bb7671
Show file tree
Hide file tree
Showing 2 changed files with 763 additions and 254 deletions.
224 changes: 147 additions & 77 deletions lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,59 @@

require 'fileutils'
require 'zlib'
require 'time'

require 'fluent/output'
require 'fluent/plugin/output'
require 'fluent/config/error'
require 'fluent/system_config'
# TODO remove ...
require 'fluent/plugin/file_util'

module Fluent
class FileOutput < TimeSlicedOutput
include SystemConfig::Mixin
module Fluent::Plugin
class FileOutput < Output
Fluent::Plugin.register_output('file', self)

Plugin.register_output('file', self)
helpers :formatter, :inject, :compat_parameters

SUPPORTED_COMPRESS = {
'gz' => :gz,
'gzip' => :gz,
SUPPORTED_COMPRESS = [:text, :gz, :gzip]
SUPPORTED_COMPRESS_MAP = {
text: nil,
gz: :gzip,
gzip: :gzip,
}

FILE_PERMISSION = 0644
DIR_PERMISSION = 0755

DEFAULT_TIMEKEY = 60 * 60 * 24

desc "The Path of the file."
config_param :path, :string
desc "The format of the file content. The default is out_file."
config_param :format, :string, default: 'out_file', skip_accessor: true

desc "Specify to add file suffix for bare file path or not."
config_param :add_path_suffix, :bool, default: true
desc "The file suffix added to bare file path."
config_param :path_suffix, :string, default: '.log'
desc "The flushed chunk is appended to existence file or not."
config_param :append, :bool, default: false
desc "Compress flushed file."
config_param :compress, default: nil do |val|
c = SUPPORTED_COMPRESS[val]
unless c
raise ConfigError, "Unsupported compression algorithm '#{val}'"
end
c
end
config_param :compress, :enum, list: SUPPORTED_COMPRESS, default: :text
desc "Execute compression again even when buffer chunk is already compressed."
config_param :recompress, :bool, default: false
desc "Create symlink to temporary buffered file when buffer_type is file."
config_param :symlink_path, :string, default: nil

config_section :format, init: true do
config_set_default :@type, 'out_file'
end

config_section :buffer do
config_set_default :@type, 'file'
config_set_default :chunk_keys, ['time']
config_set_default :timekey, DEFAULT_TIMEKEY
end

attr_accessor :last_written_path # for tests

module SymlinkBufferMixin
def symlink_path=(path)
@_symlink_path = path
Expand All @@ -63,48 +80,56 @@ def generate_chunk(metadata)
# timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT
# have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12.
# These chunks will be enqueued immediately, and will be flushed soon.
latest_chunk = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last
if chunk.metadata == latest_chunk
latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last
if chunk.metadata == latest_metadata
FileUtils.ln_sf(chunk.path, @_symlink_path)
end
chunk
end
end

def initialize
require 'zlib'
require 'time'
require 'fluent/plugin/file_util'
super
end

def configure(conf)
if path = conf['path']
@path = path
compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

configured_time_slice_format = conf['time_slice_format']

# v0.14 file buffer handles path as directory if '*' is missing
# 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
# but raise it in this plugin.
if conf.elements(name: 'buffer').empty?
conf.add_element('buffer', 'time')
end
unless @path
raise ConfigError, "'path' parameter is required on file output"
buffer_conf = conf.elements(name: 'buffer').first
unless buffer_conf.has_key?('path')
buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
end

if pos = @path.index('*')
@path_prefix = @path[0,pos]
@path_suffix = @path[pos+1..-1]
conf['buffer_path'] ||= "#{@path}"
else
@path_prefix = @path+"."
@path_suffix = ".log"
conf['buffer_path'] ||= "#{@path}.*"
super

@compress_method = SUPPORTED_COMPRESS_MAP[@compress]

if @path.include?('*') && !@buffer_config.timekey
raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
end

test_path = generate_path(Time.now.strftime(@time_slice_format))
path_suffix = @add_path_suffix ? @path_suffix : ''
@path_template = generate_path_template(@path, @buffer_config.timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)

placeholder_validate!(:path, @path_template)

max_tag_index = get_placeholders_tag(@path_template).max || 1
max_tag_index = 1 if max_tag_index < 1
dummy_tag = (['a'] * max_tag_index).join('.')
dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

test_meta1 = metadata_for_test(dummy_tag, Fluent::Engine.now, dummy_record)
test_path = extract_placeholders(@path_template, test_meta1)
unless ::Fluent::FileUtil.writable_p?(test_path)
raise ConfigError, "out_file: `#{test_path}` is not writable"
raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
end

super

@formatter = Plugin.new_formatter(@format)
@formatter.configure(conf)
@formatter = formatter_create(conf: conf.elements('format').first)

if @symlink_path && @buffer.respond_to?(:path)
@buffer.extend SymlinkBufferMixin
Expand All @@ -116,56 +141,101 @@ def configure(conf)
end

def format(tag, time, record)
@formatter.format(tag, time, record)
r = inject_values_to_record(tag, time, record)
@formatter.format(tag, time, r)
end

def write(chunk)
path = generate_path(chunk.key)
path = extract_placeholders(@path_template, chunk.metadata)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
unless @append
path = find_filepath_available(path)
end

case @compress_method
when nil
File.open(path, "ab", @file_perm) {|f|
File.open(path, "ab", @file_perm) do |f|
chunk.write_to(f)
}
when :gz
File.open(path, "ab", @file_perm) {|f|
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
when :gzip
if @buffer.compress != :gzip || @recompress
File.open(path, "ab", @file_perm) do |f|
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz, compressed: :text)
gz.close
end
else
File.open(path, "ab", @file_perm) do |f|
chunk.write_to(f, compressed: :gzip)
end
end
else
raise "BUG: unknown compression method #{@compress_method}"
end

return path # for test
@last_written_path = path
end

def secondary_init(primary)
# don't warn even if primary.class is not FileOutput
def timekey_to_timeformat(timekey)
case timekey
when nil then ''
when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive
when 60...3600 then '%Y%m%d%H%M'
when 3600...86400 then '%Y%m%d%H'
else '%Y%m%d'
end
end

private

def suffix
case @compress
when nil
''
when :gz
".gz"
def compression_suffix(compress)
case compress
when :gzip then '.gz'
when nil then ''
else
raise ArgumentError, "unknown compression type #{compress}"
end
end

def generate_path(time_string)
if @append
"#{@path_prefix}#{time_string}#{@path_suffix}#{suffix}"
# /path/to/dir/file.* -> /path/to/dir/file.%Y%m%d
# /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data
# /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log
# %Y%m%d -> %Y%m%d_** (non append)
# + .gz (gzipped)
## TODO: remove time_slice_format when end of support of compat_parameters
def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
comp_suffix = compression_suffix(compress)
index_placeholder = append ? '' : '_**'
if original.index('*')
raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
else
path = nil
i = 0
begin
path = "#{@path_prefix}#{time_string}_#{i}#{@path_suffix}#{suffix}"
i += 1
end while File.exist?(path)
path
if timekey
if time_slice_format
"#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
else
time_placeholders = timekey_to_timeformat(timekey)
if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
"#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
else
"#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
end
end
else
"#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
end
end
end

def find_filepath_available(path_with_placeholder) # for non-append
raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
i = 0
while path = path_with_placeholder.sub('_**', "_#{i}")
break unless File.exist?(path)
i += 1
end
path
end
end
end
Loading

0 comments on commit 3bb7671

Please sign in to comment.