Skip to content

Commit

Permalink
Merge pull request #914 from fluent/add-standard-chunking-format
Browse files Browse the repository at this point in the history
Add standard chunking format
  • Loading branch information
tagomoris committed Apr 26, 2016
2 parents 6f320e5 + 2845bad commit 433a82b
Show file tree
Hide file tree
Showing 15 changed files with 935 additions and 89 deletions.
48 changes: 42 additions & 6 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class EventStream
include Enumerable
include MessagePackFactory::Mixin

def records
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def repeatable?
false
end
Expand All @@ -29,7 +33,8 @@ def each(&block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def to_msgpack_stream
def to_msgpack_stream(time_int: false)
return to_msgpack_stream_forced_integer if time_int
out = msgpack_packer
each {|time,record|
out.write([time,record])
Expand All @@ -46,7 +51,6 @@ def to_msgpack_stream_forced_integer
end
end


class OneEventStream < EventStream
def initialize(time, record)
@time = time
Expand All @@ -57,6 +61,10 @@ def dup
OneEventStream.new(@time, @record.dup)
end

def records
1
end

def repeatable?
true
end
Expand All @@ -81,6 +89,10 @@ def dup
ArrayEventStream.new(entries)
end

def records
@entries.size
end

def repeatable?
true
end
Expand All @@ -102,7 +114,7 @@ def each(&block)
#
# Use this class as below, in loop of data-enumeration:
# 1. initialize blank stream:
# streams[tag] ||= MultiEventStream
# streams[tag] ||= MultiEventStream.new
# 2. add events
# stream[tag].add(time, record)
class MultiEventStream < EventStream
Expand All @@ -119,6 +131,10 @@ def dup
es
end

def records
@time_array.size
end

def add(time, record)
@time_array << time
@record_array << record
Expand All @@ -144,16 +160,20 @@ def each(&block)

class MessagePackEventStream < EventStream
# Keep cached_unpacker argument for existence plugins
def initialize(data, cached_unpacker = nil)
def initialize(data, records = 0, cached_unpacker = nil)
@data = data
@records = records
end

def records
@records
end

def repeatable?
true
end

def each(&block)
# TODO format check
msgpack_unpacker.feed_each(@data, &block)
nil
end
Expand All @@ -162,5 +182,21 @@ def to_msgpack_stream
@data
end
end
end

module ChunkMessagePackEventStreamer
include MessagePackFactory::Mixin
# chunk.extend(ChunkEventStreamer)
# => chunk.each{|time, record| ... }
def each(&block)
open do |io|
msgpack_unpacker(io).each(&block)
end
nil
end
alias :msgpack_each :each

def to_msgpack_stream
read
end
end
end
53 changes: 50 additions & 3 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def emit(metadata, data, force: false)
chunk.synchronize do
begin
chunk.append(data)
if !size_over?(chunk) || force
if !chunk_size_over?(chunk) || force
chunk.commit
stored = true
@stage_size += (chunk.size - original_size)
Expand All @@ -198,6 +198,49 @@ def emit(metadata, data, force: false)
emit_step_by_step(metadata, data)
end

def emit_bulk(metadata, bulk, records)
return if bulk.nil? || bulk.empty?
raise BufferOverflowError unless storable?

stored = false
synchronize do # critical section for buffer (stage/queue)
until stored
chunk = @stage[metadata]
unless chunk
chunk = @stage[metadata] = generate_chunk(metadata)
end

chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
begin
empty_chunk = chunk.empty?
chunk.concat(bulk, records)

if chunk_size_over?(chunk)
if empty_chunk
log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes"
else
chunk.rollback
enqueue_chunk(metadata)
next
end
end

chunk.commit
stored = true
@stage_size += bulk.bytesize
if chunk_size_full?(chunk)
enqueue_chunk(metadata)
end
rescue
chunk.rollback
raise
end
end
end
end
nil
end

def queued_records
synchronize { @queue.reduce(0){|r, chunk| r + chunk.records } }
end
Expand Down Expand Up @@ -310,10 +353,14 @@ def clear_queue!
end
end

def size_over?(chunk)
def chunk_size_over?(chunk)
chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit)
end

def chunk_size_full?(chunk)
chunk.size >= @chunk_bytes_limit || (@chunk_records_limit && chunk.records >= @chunk_records_limit)
end

def emit_step_by_step(metadata, data)
attempt_records = data.size / 3

Expand All @@ -336,7 +383,7 @@ def emit_step_by_step(metadata, data)
attempt = data.slice(0, attempt_records)
chunk.append(attempt)

if size_over?(chunk)
if chunk_size_over?(chunk)
chunk.rollback

if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS
Expand Down
19 changes: 7 additions & 12 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# limitations under the License.
#

require 'fluent/msgpack_factory'
require 'fluent/plugin/buffer'
require 'fluent/unique_id'
require 'fluent/event'

require 'fileutils'
require 'monitor'
Expand All @@ -26,8 +26,8 @@ module Plugin
class Buffer # fluent/plugin/buffer is alread loaded
class Chunk
include MonitorMixin
include MessagePackFactory::Mixin
include UniqueId::Mixin
include ChunkMessagePackEventStreamer

# Chunks has 2 part:
# * metadata: contains metadata which should be restored after resume (if possible)
Expand Down Expand Up @@ -64,6 +64,11 @@ def append(data)
raise NotImplementedError, "Implement this method in child class"
end

# for event streams which is packed or zipped (and we want not to unpack/uncompress)
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end

def commit
raise NotImplementedError, "Implement this method in child class"
end
Expand Down Expand Up @@ -108,16 +113,6 @@ def write_to(io)
FileUtils.copy_stream(i, io)
end
end

def msgpack_each(&block)
open do |io|
u = msgpack_factory.unpacker(io)
begin
u.each(&block)
rescue EOFError
end
end
end
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'fluent/plugin/buffer/chunk'
require 'fluent/unique_id'
require 'fluent/msgpack_factory'

module Fluent
module Plugin
Expand All @@ -33,6 +34,7 @@ class FileChunk < Chunk
# path_suffix: path suffix string, like '.log' (or any other user specified)

include SystemConfig::Mixin
include MessagePackFactory::Mixin

FILE_PERMISSION = 0644

Expand Down Expand Up @@ -74,6 +76,16 @@ def append(data)
true
end

def concat(bulk, records)
raise "BUG: appending to non-staged chunk, now '#{@state}'" unless @state == :staged

bulk.force_encoding(Encoding::ASCII_8BIT)
@chunk.write bulk
@adding_bytes += bulk.bytesize
@adding_records += records
true
end

def commit
write_metadata # this should be at first: of course, this operation may fail

Expand Down
12 changes: 10 additions & 2 deletions lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,28 @@ class Buffer
class MemoryChunk < Chunk
def initialize(metadata)
super
@chunk = ''.force_encoding('ASCII-8BIT')
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
@chunk_bytes = 0
@adding_bytes = 0
@adding_records = 0
end

def append(data)
adding = data.join.force_encoding('ASCII-8BIT')
adding = data.join.force_encoding(Encoding::ASCII_8BIT)
@chunk << adding
@adding_bytes += adding.bytesize
@adding_records += data.size
true
end

def concat(bulk, records)
bulk.force_encoding(Encoding::ASCII_8BIT)
@chunk << bulk
@adding_bytes += bulk.bytesize
@adding_records += records
true
end

def commit
@records += @adding_records
@chunk_bytes += @adding_bytes
Expand Down
Loading

0 comments on commit 433a82b

Please sign in to comment.