Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buf_file_single plugin #2579

Merged
merged 13 commits into from
Aug 20, 2019
Merged
209 changes: 209 additions & 0 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fileutils'

require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/file_single_chunk'
require 'fluent/system_config'

module Fluent
module Plugin
class FileSingleBuffer < Fluent::Plugin::Buffer
Plugin.register_buffer('file_single', self)

include SystemConfig::Mixin

DEFAULT_CHUNK_LIMIT_SIZE = 256 * 1024 * 1024 # 256MB
DEFAULT_TOTAL_LIMIT_SIZE = 64 * 1024 * 1024 * 1024 # 64GB

PATH_SUFFIX = ".#{Fluent::Plugin::Buffer::FileSingleChunk::PATH_EXT}"
DIR_PERMISSION = 0755

desc 'The path where buffer chunks are stored.'
config_param :path, :string, default: nil
desc 'Calculate the number of record in chunk during resume'
config_param :calc_num_records, :bool, default: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do users turn it false?I'd like to know a use case.

Copy link
Member Author

@repeatedly repeatedly Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some plugins don't use chunk's size for data flush, e.g. kafka, S3 and more.
So users can choose it to speedup restart time.

desc 'The format of chunk. This is used to calculate the number of record'
config_param :chunk_format, :enum, list: [:msgpack, :text, :auto], default: :auto

config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE
config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE

config_param :file_permission, :string, default: nil # '0644'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment(0644) mean? (ditto in L47)
If this comment has meaning, Will you please write more specific?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add desc for it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you use default parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use default, hard to check user specified or not. We need to use <system> setting when no specified.

config_param :dir_permission, :string, default: nil # '0755'

@@buffer_paths = {}

def initialize
super

@multi_workers_available = false
@additional_resume_path = nil
end

def configure(conf)
super

if @chunk_format == :auto
@chunk_format = owner.formatted_to_msgpack_binary? ? :msgpack : :text
end

@key_in_path = nil
if owner.chunk_keys.empty?
log.debug "use event tag for buffer key"
else
if owner.chunk_key_tag
raise Fluent::ConfigError, "chunk keys must be tag or one field"
else
@key_in_path = owner.chunk_keys.first.to_sym
end
end

multi_workers_configured = owner.system_config.workers > 1 ? true : false

using_plugin_root_dir = false
unless @path
if root_dir = owner.plugin_root_dir
@path = File.join(root_dir, 'buffer')
using_plugin_root_dir = true # plugin_root_dir path contains worker id
else
raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>"
end
end

type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
if @@buffer_paths.has_key?(@path) && !called_in_test?
type_using_this_path = @@buffer_paths[@path]
raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
end

@@buffer_paths[@path] = type_of_owner

specified_directory_exists = File.exist?(@path) && File.directory?(@path)
unexisting_path_for_directory = !File.exist?(@path) && [email protected]?('.*')

if specified_directory_exists || unexisting_path_for_directory # directory
if using_plugin_root_dir || !multi_workers_configured
@path = File.join(@path, "fsb.*#{PATH_SUFFIX}")
else
@path = File.join(@path, "worker#{fluentd_worker_id}", "fsb.*#{PATH_SUFFIX}")
if fluentd_worker_id == 0
# worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration)
@additional_resume_path = File.join(File.expand_path("../../", @path), "fsb.*#{PATH_SUFFIX}")
end
end
@multi_workers_available = true
else # specified path is file path
if File.basename(@path).include?('.*.')
# valid file path
log.warn "file_single doesn't allow user specified 'prefix.*.suffix' style path. Use 'fsb.*#{PATH_SUFFIX}' instead: #{@path}"
@path = "fsb.*#{PATH_SUFFIX}"
elsif File.basename(@path).end_with?('.*')
@path = @path + PATH_SUFFIX
else
# existing file will be ignored
@path = @path + ".*#{PATH_SUFFIX}"
end
@multi_workers_available = false
end

@dir_permission = if @dir_permission
@dir_permission.to_i(8)
else
system_config.dir_permission || DIR_PERMISSION
end
end

# This method is called only when multi worker is configured
def multi_workers_ready?
unless @multi_workers_available
log.error "file buffer with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
end
@multi_workers_available
end

def start
FileUtils.mkdir_p(File.dirname(@path), mode: @dir_permission)

super
end

def persistent?
true
end

def resume
stage = {}
queue = []

patterns = [@path]
patterns.unshift @additional_resume_path if @additional_resume_path
Dir.glob(patterns) do |path|
next unless File.file?(path)

log.debug { "restoring buffer file: path = #{path}" }

m = new_metadata() # this metadata will be overwritten by resuming .meta file content
# so it should not added into @metadata_list for now
mode = Fluent::Plugin::Buffer::FileSingleChunk.assume_chunk_state(path)
if mode == :unknown
log.debug "unknown state chunk found", path: path
next
end

begin
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path) # file chunk resumes contents of metadata
chunk.restore_size(@chunk_format) if @calc_num_records
rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e
handle_broken_files(path, mode, e)
next
end

case chunk.state
when :staged
stage[chunk.metadata] = chunk
when :queued
queue << chunk
end
end

queue.sort_by! { |chunk| chunk.modified_at }

return stage, queue
end

def generate_chunk(metadata)
# FileChunk generates real path with unique_id
if @file_permission
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, perm: @file_permission, compress: @compress)
else
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(metadata, @path, :create, @key_in_path, compress: @compress)
end

log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata

chunk
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Delete corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
File.unlink(path) rescue nil
end
end
end
end
Loading