Skip to content

Commit

Permalink
Implementing new Buffer APIs, especially emitting data
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Apr 23, 2015
1 parent 67afd69 commit 913d7ac
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 99 deletions.
183 changes: 158 additions & 25 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,55 @@
# limitations under the License.
#

require 'monitor'

module Fluent
module Plugin
class BufferError < StandardError; end
class BufferChunkLimitError < BufferError; end
class BufferQueueLimitError < BufferError; end
class BufferOverflowError < BufferError; end
class BufferChunkOverflowError < BufferError; end # A record size is larger than chunk size limit

# Buffer is to define an interface for all buffer plugins.
# Use BasicBuffer as a superclass for 3rd party buffer plugins.

DEFAULT_CHUNK_SIZE = 8 * 1024 * 1024 # 8MB for memory
DEFAULT_QUEUE_LENGTH = 256 # (8MB * 256 ==) 2GB for memory
DEFAULT_CHUNK_BYTES_LIMIT = 8 * 1024 * 1024 # 8MB for memory
DEFAULT_QUEUE_LENGTH_LIMIT = 256 # (8MB * 256 ==) 2GB for memory

MINIMUM_APPEND_ATTEMPT_SIZE = 10

# Buffers are built on 2 element:
# * stage: Array of chunks under writing, specified by metadata
# * queue: FIFO list of chunks, which are already fulfilled, and to be flushed
# Queue of a Buffer instance is shared by variations of metadata
class Buffer
include Configurable
include MonitorMixin

config_section :buffer, param_name: :buffer_config, required: false, multi: false do
config_param :chunk_size, :size, default: DEFAULT_CHUNK_SIZE
config_param :total_size, :size, default: DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH
config_param :chunk_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT
config_param :total_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT

config_param :flush_interval, :time, default: nil

# If user specify this value and (chunk_size * queue_length) is smaller than total_size,
# then total_size is automatically configured to that value
config_param :queue_length, :integer, default: nil
config_param :queue_length_limit, :integer, default: nil

# optional new limitations
config_param :chunk_records, :integer, default: nil
config_param :chunk_records_limit, :integer, default: nil

# TODO: pipeline mode? to flush ASAP after emit
end

Metadata = Struct.new(:timekey, :tag, :variables)

def initialize(logger)
super()
@log = logger

@chunk_size = nil
@chunk_size_limit = nil
@chunk_records = nil

@total_size = nil
@total_size_limit = nil
@queue_length = nil

@flush_interval = nil
Expand All @@ -66,34 +72,92 @@ def configure(conf)
super

if @buffer_config
@chunk_size = @buffer_config.chunk_size
@chunk_records = @buffer_config.chunk_records
@total_size = @buffer_config.total_size
@queue_length = @buffer_config.queue_length
if @queue_length && @total_size > @chunk_size * @queue_length
@total_size = @chunk_size * @queue_length
@chunk_bytes_limit = @buffer_config.chunk_bytes_limit
@total_bytes_limit = @buffer_config.total_bytes_limit

@chunk_records_limit = @buffer_config.chunk_records_limit

@queue_length_limit = @buffer_config.queue_length_limit
if @queue_length_limit && @total_bytes_limit > @chunk_bytes_limit * @queue_length_limit
@total_bytes_limit = @chunk_bytes_limit * @queue_length_limit
end
@flush_interval = @buffer_config.flush_interval
else
@chunk_size = DEFAULT_CHUNK_SIZE
@total_size = DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH
@queue_length = DEFAULT_QUEUE_LENGTH
@chunk_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT
@total_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT
@queue_length_limit = DEFAULT_QUEUE_LENGTH_LIMIT
end
end

def allow_concurrent_pop?
def start
super
@stage, @queue = resume
@queue.extend(MonitorMixin)

@stage_size = @queue_size = 0
@metadata_list = [] # keys of @stage
end

def storable?
@total_size_limit > @stage_size + @queue_size
end

def used?(ratio)
@total_size_limit * ratio > @stage_size + @queue_size
end

def resume
raise NotImplementedError, "Implement this method in child class"
end

def start
super
def metadata(key_value_pairs={})
timekey = key_value_pairs.delete(:timekey)
tag = key_value_pairs.delete(:tag)
variables = key_value_pairs.keys.sort.map{|k| key_value_pairs[k] }

meta = Metadata.new(timekey, tag, variables)
synchronize do
if i = @metadata_list.index(meta)
@metadata_list[i]
else
@metadata_list << meta
meta
end
end
end

def emit(data, metadata)
# metadata MUST have consistent object_id for each variation
# data MUST be Array of serialized events
def emit(metadata, data)
return if data.size < 1
raise BufferOverflowError unless storable?

stored = false
data_size = data.size

# the case whole data can be stored in staged chunk: almost all emits will success
chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
chunk.synchronize do
begin
chunk.append(data)
unless size_over?(chunk)
chunk.commit
stored = true
end
ensure
chunk.rollback
end
end
return if stored

emit_step_by_step(metadata, data)
end

def generate_chunk(metadata)
raise NotImplementedError, "Implement this method in child class"
end

def enqueue_chunk(key)
def enqueue_chunk(metadata)
raise NotImplementedError, "Implement this method in child class"
end

Expand All @@ -113,15 +177,84 @@ def stop
end

def before_shutdown(out)
# at here, buffer may be flushed w/ flush_at_shutdown
end

def shutdown
end

def close
synchronize do
@queue.synchronize do
until @queue.empty?
@queue.shift.close
end
end
@stage.each_pair do |key, chunk|
chunk.close
end
end
end

def terminate
@stage = @queue = nil
end

def size_over?(chunk)
chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit)
end

def emit_step_by_step(metadata, data)
attempt_size = data.size / 3

synchronize do # critical section for buffer (stage/queue)
while data.size > 0
if attempt_size < MINIMUM_APPEND_ATTEMPT_SIZE
attempt_size = MINIMUM_APPEND_ATTEMPT_SIZE
end

chunk = @stage[metadata]
unless chunk
chunk = @stage[metadata] = generate_chunk(metadata)
end

chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
begin
empty_chunk = chunk.empty?

attempt = data.slice(0, attempt_size)
chunk.append(attempt)

if size_over?(chunk)
chunk.rollback

if attempt_size <= MINIMUM_APPEND_ATTEMPT_SIZE
if empty_chunk # record is too large even for empty chunk
raise BufferChunkOverflowError, "minimum append butch exceeds chunk bytes limit"
end
# no more records for this chunk -> enqueue -> to be flushed
enqueue_chunk(metadata) # `chunk` will be removed from stage
attempt_size = data.size # fresh chunk may have enough space
else
# whole data can be processed by twice operation
# ( by using apttempt /= 2, 3 operations required for odd numbers of data)
attempt_size = (attempt_size / 2) + 1
end

next
end

chunk.commit
data.slice!(0, attempt_size)
# same attempt size
nil # discard return value of data.slice!() immediately
ensure
chunk.rollback
end
end
end
end
nil
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#

require 'monitor'

module Fluent
module Plugin
class Buffer
Expand All @@ -22,7 +24,7 @@ class Chunk

# Chunks has 2 part:
# * metadata: contains metadata which should be restored after resume (if possible)
# v: [metadata_variable] (required)
# v: [metadata_variable, ...] (required)
# t: tag as string (optional)
# k: time slice key (optional)
#
Expand Down
Loading

0 comments on commit 913d7ac

Please sign in to comment.