diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 9faabad5f0..c2cd73cc26 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -14,17 +14,20 @@ # limitations under the License. # +require 'monitor' + module Fluent module Plugin class BufferError < StandardError; end - class BufferChunkLimitError < BufferError; end - class BufferQueueLimitError < BufferError; end + class BufferOverflowError < BufferError; end + class BufferChunkOverflowError < BufferError; end # A record size is larger than chunk size limit # Buffer is to define an interface for all buffer plugins. - # Use BasicBuffer as a superclass for 3rd party buffer plugins. - DEFAULT_CHUNK_SIZE = 8 * 1024 * 1024 # 8MB for memory - DEFAULT_QUEUE_LENGTH = 256 # (8MB * 256 ==) 2GB for memory + DEFAULT_CHUNK_BYTES_LIMIT = 8 * 1024 * 1024 # 8MB for memory + DEFAULT_QUEUE_LENGTH_LIMIT = 256 # (8MB * 256 ==) 2GB for memory + + MINIMUM_APPEND_ATTEMPT_SIZE = 10 # Buffers are built on 2 element: # * stage: Array of chunks under writing, specified by metadata @@ -32,31 +35,34 @@ class BufferQueueLimitError < BufferError; end # Queue of a Buffer instance is shared by variations of metadata class Buffer include Configurable + include MonitorMixin config_section :buffer, param_name: :buffer_config, required: false, multi: false do - config_param :chunk_size, :size, default: DEFAULT_CHUNK_SIZE - config_param :total_size, :size, default: DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH + config_param :chunk_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT + config_param :total_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT config_param :flush_interval, :time, default: nil # If user specify this value and (chunk_size * queue_length) is smaller than total_size, # then total_size is automatically configured to that value - config_param :queue_length, :integer, default: nil + config_param :queue_length_limit, :integer, default: nil # optional new limitations - config_param :chunk_records, :integer, default: nil + config_param :chunk_records_limit, :integer, default: nil # TODO: pipeline mode? to flush ASAP after emit end + Metadata = Struct.new(:timekey, :tag, :variables) + def initialize(logger) super() @log = logger - @chunk_size = nil + @chunk_size_limit = nil @chunk_records = nil - @total_size = nil + @total_size_limit = nil @queue_length = nil @flush_interval = nil @@ -66,34 +72,92 @@ def configure(conf) super if @buffer_config - @chunk_size = @buffer_config.chunk_size - @chunk_records = @buffer_config.chunk_records - @total_size = @buffer_config.total_size - @queue_length = @buffer_config.queue_length - if @queue_length && @total_size > @chunk_size * @queue_length - @total_size = @chunk_size * @queue_length + @chunk_bytes_limit = @buffer_config.chunk_bytes_limit + @total_bytes_limit = @buffer_config.total_bytes_limit + + @chunk_records_limit = @buffer_config.chunk_records_limit + + @queue_length_limit = @buffer_config.queue_length_limit + if @queue_length_limit && @total_bytes_limit > @chunk_bytes_limit * @queue_length_limit + @total_bytes_limit = @chunk_bytes_limit * @queue_length_limit end @flush_interval = @buffer_config.flush_interval else - @chunk_size = DEFAULT_CHUNK_SIZE - @total_size = DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH - @queue_length = DEFAULT_QUEUE_LENGTH + @chunk_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT + @total_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT + @queue_length_limit = DEFAULT_QUEUE_LENGTH_LIMIT end end - def allow_concurrent_pop? + def start + super + @stage, @queue = resume + @queue.extend(MonitorMixin) + + @stage_size = @queue_size = 0 + @metadata_list = [] # keys of @stage + end + + def storable? + @total_size_limit > @stage_size + @queue_size + end + + def used?(ratio) + @total_size_limit * ratio > @stage_size + @queue_size + end + + def resume raise NotImplementedError, "Implement this method in child class" end - def start - super + def metadata(key_value_pairs={}) + timekey = key_value_pairs.delete(:timekey) + tag = key_value_pairs.delete(:tag) + variables = key_value_pairs.keys.sort.map{|k| key_value_pairs[k] } + + meta = Metadata.new(timekey, tag, variables) + synchronize do + if i = @metadata_list.index(meta) + @metadata_list[i] + else + @metadata_list << meta + meta + end + end end - def emit(data, metadata) + # metadata MUST have consistent object_id for each variation + # data MUST be Array of serialized events + def emit(metadata, data) + return if data.size < 1 + raise BufferOverflowError unless storable? + + stored = false + data_size = data.size + + # the case whole data can be stored in staged chunk: almost all emits will success + chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) } + chunk.synchronize do + begin + chunk.append(data) + unless size_over?(chunk) + chunk.commit + stored = true + end + ensure + chunk.rollback + end + end + return if stored + + emit_step_by_step(metadata, data) + end + + def generate_chunk(metadata) raise NotImplementedError, "Implement this method in child class" end - def enqueue_chunk(key) + def enqueue_chunk(metadata) raise NotImplementedError, "Implement this method in child class" end @@ -113,15 +177,84 @@ def stop end def before_shutdown(out) + # at here, buffer may be flushed w/ flush_at_shutdown end def shutdown end def close + synchronize do + @queue.synchronize do + until @queue.empty? + @queue.shift.close + end + end + @stage.each_pair do |key, chunk| + chunk.close + end + end end def terminate + @stage = @queue = nil + end + + def size_over?(chunk) + chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit) + end + + def emit_step_by_step(metadata, data) + attempt_size = data.size / 3 + + synchronize do # critical section for buffer (stage/queue) + while data.size > 0 + if attempt_size < MINIMUM_APPEND_ATTEMPT_SIZE + attempt_size = MINIMUM_APPEND_ATTEMPT_SIZE + end + + chunk = @stage[metadata] + unless chunk + chunk = @stage[metadata] = generate_chunk(metadata) + end + + chunk.synchronize do # critical section for chunk (chunk append/commit/rollback) + begin + empty_chunk = chunk.empty? + + attempt = data.slice(0, attempt_size) + chunk.append(attempt) + + if size_over?(chunk) + chunk.rollback + + if attempt_size <= MINIMUM_APPEND_ATTEMPT_SIZE + if empty_chunk # record is too large even for empty chunk + raise BufferChunkOverflowError, "minimum append butch exceeds chunk bytes limit" + end + # no more records for this chunk -> enqueue -> to be flushed + enqueue_chunk(metadata) # `chunk` will be removed from stage + attempt_size = data.size # fresh chunk may have enough space + else + # whole data can be processed by twice operation + # ( by using apttempt /= 2, 3 operations required for odd numbers of data) + attempt_size = (attempt_size / 2) + 1 + end + + next + end + + chunk.commit + data.slice!(0, attempt_size) + # same attempt size + nil # discard return value of data.slice!() immediately + ensure + chunk.rollback + end + end + end + end + nil end end end diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 5b35e43e10..239f6d162d 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -14,6 +14,8 @@ # limitations under the License. # +require 'monitor' + module Fluent module Plugin class Buffer @@ -22,7 +24,7 @@ class Chunk # Chunks has 2 part: # * metadata: contains metadata which should be restored after resume (if possible) - # v: [metadata_variable] (required) + # v: [metadata_variable, ...] (required) # t: tag as string (optional) # k: time slice key (optional) # diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 547620c54f..c7a5fb725a 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -23,9 +23,6 @@ module Fluent module Plugin class Buffer class FileChunk < Chunk - ### TODO: buf_file MUST refer the process global buffer directory path configuration, ex: buffer_storage_path - - ### buffer path user specified : /path/to/directory/user_specified_prefix.*.log ### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log ### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta @@ -38,7 +35,7 @@ class FileChunk < Chunk # path_suffix: path suffix string, like '.log' (or any other user specified) # mode: 'w+'(newly created/on stage), 'r+'(already exists/on stage), 'r'(already exists/enqueued) - def initialize(metadata, path, mode, perm=DEFAULT_FILE_PERMISSION) + def initialize(metadata, path, mode, perm=DEFAULT_FILE_PERMISSION) # TODO: DEFAULT_FILE_PERMISSION is obsolete super # state: stage/blocked/queued @@ -104,82 +101,21 @@ def initialize(metadata, path, mode, perm=DEFAULT_FILE_PERMISSION) end end - def generate_stage_chunk_path(path) - prefix = path - suffix = '.log' - if pos = path.index('*') - prefix = path[0...pos] - suffix = path[(pos+1)..-1] - end - - chunk_id = @unique_id.unpack('N*').map{|n| n.to_s(16)}.join - state = 'b' - "#{prefix}#{state}#{chunk_id}#{suffix}" - end - - def generate_queued_chunk_path(path) - if pos = path.index('b' + @unique_id) - path.sub('b' + @unique_id, 'q' + @unique_id) - else - path + 'q' + @unique_id + '.log' - end - end - - # used only for queued buffer path - def unique_id_from_path(path) - if /\.q([0-9a-f]+)\.[a-zA-Z]\Z/ =~ path - return $1.scan(/../).map{|x| x.to_i(16) }.pack('C*') - end - nil - end - - def restore_metadata(data) - @unique_id = data.delete('id') || unique_id_from_path(@chunk_path) || @unique_id - @records = data.delete('r') || 0 - @created_at = Time.at(data.delete('c') || Time.now.to_i) - @modified_at = Time.at(data.delete('m') || Time.now.to_i) - @metadata.update(data) - end - - def write_metadata(try_update: false) - data = @metadata.merge({ - 'id' => @unique_id, - 'r' => (try_update ? @records + @adding_records : @records), - 'c' => @created_at.to_i, - 'm' => (try_update ? Time.now.to_i : @modified_at.to_i) - }) - @meta.seek(0, IO::SEEK_SET) - @meta.truncate(0) - @meta.write data - end - - def enqueued! - new_chunk_path = generate_queued_chunk_path(@chunk_path) - new_meta_path = new_chunk_path + '.meta' - - write_metadata # re-write metadata w/ finalized records - - @chunk.rename(new_chunk_path) - @meta.rename(new_meta_path) - - @chunk_path = new_chunk_path - @meta_path = new_meta_path - @state = :enqueued - end - + # MUST be called in critical section def append(data) raise "BUG: appending to non-staged chunk, now '#{@state}'" unless @state == :staged adding = data.join.force_encoding('ASCII-8BIT') @chunk.write adding - @adding_bytes = adding.bytesize - @adding_records = data.size + @adding_bytes += adding.bytesize + @adding_records += data.size write_metadata(try_update: true) # of course, this operation may fail true end + # MUST be called in critical section def commit @commit_position = @chunk.pos @records += @adding_records @@ -190,9 +126,12 @@ def commit true end + # MUST be called in critical section def rollback - @chunk.seek(@commit_position, IO::SEEK_SET) - @chunk.truncate(@commit_position) + if @chunk.pos == @commit_position + @chunk.seek(@commit_position, IO::SEEK_SET) + @chunk.truncate(@commit_position) + end @adding_bytes = @adding_records = 0 true end @@ -219,6 +158,7 @@ def close end end + # MUST be called in critical section def purge @state = :closed @chunk.close @@ -255,6 +195,71 @@ def msgpack_each(&block) end } end + + # methods for FileBuffer operations + + def generate_stage_chunk_path(path) + prefix = path + suffix = '.log' + if pos = path.index('*') + prefix = path[0...pos] + suffix = path[(pos+1)..-1] + end + + chunk_id = @unique_id.unpack('N*').map{|n| n.to_s(16)}.join + state = 'b' + "#{prefix}#{state}#{chunk_id}#{suffix}" + end + + def generate_queued_chunk_path(path) + if pos = path.index('b' + @unique_id) + path.sub('b' + @unique_id, 'q' + @unique_id) + else + path + 'q' + @unique_id + '.log' + end + end + + # used only for queued buffer path + def unique_id_from_path(path) + if /\.q([0-9a-f]+)\.[a-zA-Z]\Z/ =~ path + return $1.scan(/../).map{|x| x.to_i(16) }.pack('C*') + end + nil + end + + def restore_metadata(data) + @unique_id = data.delete('id') || unique_id_from_path(@chunk_path) || @unique_id + @records = data.delete('r') || 0 + @created_at = Time.at(data.delete('c') || Time.now.to_i) + @modified_at = Time.at(data.delete('m') || Time.now.to_i) + @metadata.update(data) + end + + def write_metadata(try_update: false) + data = @metadata.merge({ + 'id' => @unique_id, + 'r' => (try_update ? @records + @adding_records : @records), + 'c' => @created_at.to_i, + 'm' => (try_update ? Time.now.to_i : @modified_at.to_i) + }) + @meta.seek(0, IO::SEEK_SET) + @meta.truncate(0) + @meta.write data + end + + def enqueued! + new_chunk_path = generate_queued_chunk_path(@chunk_path) + new_meta_path = new_chunk_path + '.meta' + + write_metadata # re-write metadata w/ finalized records + + @chunk.rename(new_chunk_path) + @meta.rename(new_meta_path) + + @chunk_path = new_chunk_path + @meta_path = new_meta_path + @state = :enqueued + end end end end diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index cbbcc2da0e..e6b743f2d0 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -31,14 +31,16 @@ def initialize(metadata) @adding_records = 0 end + # MUST be called in critical section def append(data) adding = data.join.force_encoding('ASCII-8BIT') @chunk << adding - @adding_bytes = adding.bytesize - @adding_records = data.size + @adding_bytes += adding.bytesize + @adding_records += data.size true end + # MUST be called in critical section def commit @records += @adding_records @chunk_bytes += @adding_bytes @@ -47,6 +49,7 @@ def commit true end + # MUST be called in critical section def rollback @chunk.slice!(@chunk_bytes, @adding_bytes) @adding_bytes = @adding_records = 0 @@ -69,6 +72,7 @@ def close true end + # MUST be called in critical section def purge @chunk = ''.force_encoding("ASCII-8BIT") @chunk_bytes = @adding_bytes = @adding_records = 0