Skip to content

Commit

Permalink
Merge pull request #1273 from fluent/fix-chunk-handling-regression-of…
Browse files Browse the repository at this point in the history
…-compat-layer

Chunks in compat layer should provide msgpack_each method. fix #1272
  • Loading branch information
repeatedly authored Oct 13, 2016
2 parents 0a9d99a + 30fea9d commit ba67270
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
3 changes: 3 additions & 0 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
require 'fluent/compat/output_chain'
require 'fluent/timezone'
require 'fluent/mixin'
require 'fluent/event'
require 'fluent/process' # to load Fluent::DetachProcessMixin

require 'fluent/plugin_helper/compat_parameters'
Expand Down Expand Up @@ -123,6 +124,7 @@ module BufferedChunkMixin
# prepend this module to BufferedOutput (including ObjectBufferedOutput) plugin singleton class
def write(chunk)
chunk.extend(ChunkSizeCompatMixin)
chunk.extend(ChunkMessagePackEventStreamer)
chunk.extend(AddKeyToChunkMixin) if chunk.metadata.variables && chunk.metadata.variables.has_key?(:key)
super
end
Expand All @@ -132,6 +134,7 @@ module TimeSliceChunkMixin
# prepend this module to TimeSlicedOutput plugin singleton class
def write(chunk)
chunk.extend(ChunkSizeCompatMixin)
chunk.extend(ChunkMessagePackEventStreamer)
chunk.extend(AddTimeSliceKeyToChunkMixin)
chunk.time_slice_format = @time_slice_format
chunk.timekey = @_timekey
Expand Down
56 changes: 56 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/buffer'
require 'fluent/output'
require 'fluent/event'

require 'json'
Expand Down Expand Up @@ -126,6 +127,28 @@ def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
module OldPluginMethodMixin
def initialize
super
@format = nil
@write = nil
end
def register(name, &block)
instance_variable_set("@#{name}", block)
end
def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
end
class DummyOldBufferedOutput < Fluent::BufferedOutput
include OldPluginMethodMixin
end
class DummyOldObjectBufferedOutput < Fluent::ObjectBufferedOutput
include OldPluginMethodMixin
end
end

class BufferedOutputTest < Test::Unit::TestCase
Expand All @@ -138,6 +161,8 @@ def create_output(type=:full)
when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new
when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new
when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new
when :old_buf then FluentPluginOutputAsBufferedTest::DummyOldBufferedOutput.new
when :old_obj then FluentPluginOutputAsBufferedTest::DummyOldObjectBufferedOutput.new
else
raise ArgumentError, "unknown type: #{type}"
end
Expand Down Expand Up @@ -346,6 +371,37 @@ def waiting(seconds)
assert_equal events, each_pushed.map{|tag,time,record| [time,record]}
end
end

data(:BufferedOutput => :old_buf,
:ObjectBufferedOutput => :old_obj)
test 'old plugin types can iterate chunk by msgpack_each in #write' do |plugin_type|
events_from_chunk = []
# event_emitter helper requires Engine.root_agent for routing
ra = Fluent::RootAgent.new(log: $log)
stub(Fluent::Engine).root_agent { ra }
@i = create_output(plugin_type)
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', @hash)]))
@i.register(:format) { |tag, time, record| [time, record].to_msgpack }
@i.register(:write) { |chunk| e = []; chunk.msgpack_each { |t, r| e << [t, r] }; events_from_chunk << [:write, e]; }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5) { sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end
end

sub_test_case 'buffered output configured with many chunk keys' do
Expand Down

0 comments on commit ba67270

Please sign in to comment.