diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb new file mode 100644 index 0000000000..9faabad5f0 --- /dev/null +++ b/lib/fluent/plugin/buffer.rb @@ -0,0 +1,128 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Plugin + class BufferError < StandardError; end + class BufferChunkLimitError < BufferError; end + class BufferQueueLimitError < BufferError; end + + # Buffer is to define an interface for all buffer plugins. + # Use BasicBuffer as a superclass for 3rd party buffer plugins. + + DEFAULT_CHUNK_SIZE = 8 * 1024 * 1024 # 8MB for memory + DEFAULT_QUEUE_LENGTH = 256 # (8MB * 256 ==) 2GB for memory + + # Buffers are built on 2 element: + # * stage: Array of chunks under writing, specified by metadata + # * queue: FIFO list of chunks, which are already fulfilled, and to be flushed + # Queue of a Buffer instance is shared by variations of metadata + class Buffer + include Configurable + + config_section :buffer, param_name: :buffer_config, required: false, multi: false do + config_param :chunk_size, :size, default: DEFAULT_CHUNK_SIZE + config_param :total_size, :size, default: DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH + + config_param :flush_interval, :time, default: nil + + # If user specify this value and (chunk_size * queue_length) is smaller than total_size, + # then total_size is automatically configured to that value + config_param :queue_length, :integer, default: nil + + # optional new limitations + config_param :chunk_records, :integer, default: nil + + # TODO: pipeline mode? to flush ASAP after emit + end + + def initialize(logger) + super() + @log = logger + + @chunk_size = nil + @chunk_records = nil + + @total_size = nil + @queue_length = nil + + @flush_interval = nil + end + + def configure(conf) + super + + if @buffer_config + @chunk_size = @buffer_config.chunk_size + @chunk_records = @buffer_config.chunk_records + @total_size = @buffer_config.total_size + @queue_length = @buffer_config.queue_length + if @queue_length && @total_size > @chunk_size * @queue_length + @total_size = @chunk_size * @queue_length + end + @flush_interval = @buffer_config.flush_interval + else + @chunk_size = DEFAULT_CHUNK_SIZE + @total_size = DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH + @queue_length = DEFAULT_QUEUE_LENGTH + end + end + + def allow_concurrent_pop? + raise NotImplementedError, "Implement this method in child class" + end + + def start + super + end + + def emit(data, metadata) + raise NotImplementedError, "Implement this method in child class" + end + + def enqueue_chunk(key) + raise NotImplementedError, "Implement this method in child class" + end + + def dequeue_chunk + raise NotImplementedError, "Implement this method in child class" + end + + def purge_chunk(chunk_id) + raise NotImplementedError, "Implement this method in child class" + end + + def clear! + raise NotImplementedError, "Implement this method in child class" + end + + def stop + end + + def before_shutdown(out) + end + + def shutdown + end + + def close + end + + def terminate + end + end + end +end diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb new file mode 100644 index 0000000000..5b35e43e10 --- /dev/null +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -0,0 +1,119 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Plugin + class Buffer + class Chunk + include MonitorMixin + + # Chunks has 2 part: + # * metadata: contains metadata which should be restored after resume (if possible) + # v: [metadata_variable] (required) + # t: tag as string (optional) + # k: time slice key (optional) + # + # id: unique_id of chunk (*) + # r: number of records (*) + # c: created_at as unix time (*) + # m: modified_at as unix time (*) + # (*): fields automatically injected by chunk itself + # * data: binary data, combined records represented as String, maybe compressed + + # NOTE: keys of metadata are named with a single letter + # to decread bytesize of metadata I/O + + # TODO: CompressedPackedMessage of forward protocol? + + def initialize(metadata) + super() + @unique_id = generate_unique_id + @metadata = metadata + + @records = 0 + @created_at = Time.now + @modified_at = Time.now + end + + attr_reader :unique_id, :metadata, :created_at, :modified_at + + def generate_unique_id + now = Time.now.utc + u1 = ((now.to_i * 1000 * 1000 + now.usec) << 12 | rand(0xfff)) + [u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN') + end + + # data is array of formatted record string + def append(data) + raise NotImplementedError, "Implement this method in child class" + end + + def commit + raise NotImplementedError, "Implement this method in child class" + end + + def rollback + raise NotImplementedError, "Implement this method in child class" + end + + def size + raise NotImplementedError, "Implement this method in child class" + end + + def records + raise NotImplementedError, "Implement this method in child class" + end + + def empty? + size == 0 + end + + def close + raise NotImplementedError, "Implement this method in child class" + end + + def purge + raise NotImplementedError, "Implement this method in child class" + end + + def read + raise NotImplementedError, "Implement this method in child class" + end + + def open(&block) + raise NotImplementedError, "Implement this method in child class" + end + + ### TODO: fixit + def write_to(io) + open {|i| + FileUtils.copy_stream(i, io) + } + end + + def msgpack_each(&block) + open {|io| + u = MessagePack::Unpacker.new(io) + begin + u.each(&block) + rescue EOFError + end + } + end + end + end + end +end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb new file mode 100644 index 0000000000..547620c54f --- /dev/null +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -0,0 +1,261 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/buffer/chunk' +require 'fluent/env' + +require 'msgpack' + +module Fluent + module Plugin + class Buffer + class FileChunk < Chunk + ### TODO: buf_file MUST refer the process global buffer directory path configuration, ex: buffer_storage_path + + + ### buffer path user specified : /path/to/directory/user_specified_prefix.*.log + ### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log + ### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta + + # NOTE: Old style buffer path of time sliced output plugins had a part of key: prefix.20150414.b513b61...suffix + # But this part is not used for any purpose. (Now metadata variable is used instead.) + + # state: b/q - 'b'(on stage), 'q'(enqueued) + # path_prefix: path prefix string, ended with '.' + # path_suffix: path suffix string, like '.log' (or any other user specified) + + # mode: 'w+'(newly created/on stage), 'r+'(already exists/on stage), 'r'(already exists/enqueued) + def initialize(metadata, path, mode, perm=DEFAULT_FILE_PERMISSION) + super + + # state: stage/blocked/queued + # blocked is internal status for chunks waiting to be enqueued + + @path = path + @state = nil + + case mode + when 'w+' # newly created / on stage + @chunk_path = generate_stage_chunk_path(path) + @meta_path = @chunk_path + '.meta' + @chunk = File.open(@chunk_path, mode, perm) + @chunk.sync = true + @meta = File.open(@meta_path, 'w', perm) + @meta.sync = true + + @state = :staged + @bytesize = 0 + @commit_position = @chunk.pos # must be 0 + @adding_bytes = 0 + @adding_records = 0 + + when 'r+' # already exists / on stage + @chunk_path = path + @meta_path = @chunk_path + '.meta' + + @chunk = File.open(@chunk_path, mode) + @chunk.sync = true + @chunk.seek(0, IO::SEEK_END) + + @bytesize = @chunk.size + + @meta = nil + # staging buffer chunk without metadata is classic buffer chunk file + # and it should be enqueued immediately + if File.exist?(@meta_path) + @meta = File.open(@meta_path, 'r+') + data = MessagePack.unpack(@meta.read) + restore_metadata(data) + @meta.seek(0, IO::SEEK_SET) + @meta.sync = true + @state = :staged + @commit_position = @chunk.pos + @adding_bytes = 0 + @adding_records = 0 + else + @state = :blocked # for writing + @unique_id = unique_id_from_path(@chunk_path) || @unique_id + end + + when 'r' # already exists / enqueued + @chunk_path = path + @chunk = File.open(@chunk_path, mode) + @bytesize = @chunk.size + + @meta_path = @chunk_path + '.meta' + if File.readable?(@meta_path) + data = MessagePack.unpack(File.open(@meta_path){|f| f.read }) + restore_metadata(data) + end + @state = :queued + end + end + + def generate_stage_chunk_path(path) + prefix = path + suffix = '.log' + if pos = path.index('*') + prefix = path[0...pos] + suffix = path[(pos+1)..-1] + end + + chunk_id = @unique_id.unpack('N*').map{|n| n.to_s(16)}.join + state = 'b' + "#{prefix}#{state}#{chunk_id}#{suffix}" + end + + def generate_queued_chunk_path(path) + if pos = path.index('b' + @unique_id) + path.sub('b' + @unique_id, 'q' + @unique_id) + else + path + 'q' + @unique_id + '.log' + end + end + + # used only for queued buffer path + def unique_id_from_path(path) + if /\.q([0-9a-f]+)\.[a-zA-Z]\Z/ =~ path + return $1.scan(/../).map{|x| x.to_i(16) }.pack('C*') + end + nil + end + + def restore_metadata(data) + @unique_id = data.delete('id') || unique_id_from_path(@chunk_path) || @unique_id + @records = data.delete('r') || 0 + @created_at = Time.at(data.delete('c') || Time.now.to_i) + @modified_at = Time.at(data.delete('m') || Time.now.to_i) + @metadata.update(data) + end + + def write_metadata(try_update: false) + data = @metadata.merge({ + 'id' => @unique_id, + 'r' => (try_update ? @records + @adding_records : @records), + 'c' => @created_at.to_i, + 'm' => (try_update ? Time.now.to_i : @modified_at.to_i) + }) + @meta.seek(0, IO::SEEK_SET) + @meta.truncate(0) + @meta.write data + end + + def enqueued! + new_chunk_path = generate_queued_chunk_path(@chunk_path) + new_meta_path = new_chunk_path + '.meta' + + write_metadata # re-write metadata w/ finalized records + + @chunk.rename(new_chunk_path) + @meta.rename(new_meta_path) + + @chunk_path = new_chunk_path + @meta_path = new_meta_path + @state = :enqueued + end + + def append(data) + raise "BUG: appending to non-staged chunk, now '#{@state}'" unless @state == :staged + + adding = data.join.force_encoding('ASCII-8BIT') + @chunk.write adding + + @adding_bytes = adding.bytesize + @adding_records = data.size + write_metadata(try_update: true) # of course, this operation may fail + + true + end + + def commit + @commit_position = @chunk.pos + @records += @adding_records + @bytesize += @adding_bytes + @modified_at = Time.now + + @adding_bytes = @adding_records = 0 + true + end + + def rollback + @chunk.seek(@commit_position, IO::SEEK_SET) + @chunk.truncate(@commit_position) + @adding_bytes = @adding_records = 0 + true + end + + def size + @bytesize + end + + def records + @records + @adding_records + end + + def empty? + @bytesize == 0 + end + + def close + @state = :closed + size = @chunk.size + @chunk.close + @meta.close + if size == 0 + File.unlink(@chunk_path, @meta_path) + end + end + + def purge + @state = :closed + @chunk.close + @meta.close + File.unlink(@chunk_path, @meta_path) + end + + def read + @chunk.seek(0, IO::SEEK_SET) + @chunk.read + end + + def open(&block) # TODO: check whether used or not + @chunk.seek(0, IO::SEEK_SET) + val = yield @chunk + @chunk.seek(0, IO::SEEK_END) if @state == :stage + val + end + + ### TODO: fixit + def write_to(io) + open {|i| + FileUtils.copy_stream(i, io) + } + end + + ### TODO: fixit + def msgpack_each(&block) + open {|io| + u = MessagePack::Unpacker.new(io) + begin + u.each(&block) + rescue EOFError + end + } + end + end + end + end +end diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb new file mode 100644 index 0000000000..cbbcc2da0e --- /dev/null +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -0,0 +1,106 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/buffer/chunk' + +require 'fileutils' +require 'msgpack' + +module Fluent + module Plugin + class Buffer + class MemoryChunk < Chunk + def initialize(metadata) + super + @chunk = ''.force_encoding('ASCII-8BIT') + @chunk_bytes = 0 + @adding_bytes = 0 + @adding_records = 0 + end + + def append(data) + adding = data.join.force_encoding('ASCII-8BIT') + @chunk << adding + @adding_bytes = adding.bytesize + @adding_records = data.size + true + end + + def commit + @records += @adding_records + @chunk_bytes += @adding_bytes + + @adding_bytes = @adding_records = 0 + true + end + + def rollback + @chunk.slice!(@chunk_bytes, @adding_bytes) + @adding_bytes = @adding_records = 0 + true + end + + def size + @chunk_bytes + @adding_bytes + end + + def records + @records + @adding_records + end + + def empty? + @chunk.empty? + end + + def close + true + end + + def purge + @chunk = ''.force_encoding("ASCII-8BIT") + @chunk_bytes = @adding_bytes = @adding_records = 0 + true + end + + def read + @chunk + end + + def open(&block) + StringIO.open(@chunk, &block) + end + + ### TODO: fixit + def write_to(io) + open {|i| + FileUtils.copy_stream(i, io) + } + end + + ### TODO: fixit + def msgpack_each(&block) + open {|io| + u = MessagePack::Unpacker.new(io) + begin + u.each(&block) + rescue EOFError + end + } + end + end + end + end +end