Skip to content

Commit

Permalink
Use CompressedMessagePackEventStream to handle compressed data
Browse files Browse the repository at this point in the history
  • Loading branch information
ganmacs committed Aug 26, 2016
1 parent 327ff7e commit ddfe5f5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def on_message(msg, chunk_size, peeraddr)
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es = MessagePackEventStream.new(entries, nil, size.to_i)
es_class = (option && option['compress'] == 'gzip') ? CompressedMessagePackEventStream : MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
router.emit_stream(tag, es)
Expand Down
63 changes: 63 additions & 0 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
require 'base64'

require 'fluent/env'
require 'fluent/event'
require 'fluent/plugin/in_forward'
require 'fluent/plugin/compressable'

class ForwardInputTest < Test::Unit::TestCase
include Fluent::Plugin::Compressable

class << self
def startup
socket_manager_path = ServerEngine::SocketManager::Server.generate_path
Expand Down Expand Up @@ -267,6 +271,65 @@ def test_set_size_to_option
end
end

def test_set_compress_to_option
d = create_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]

# create compressed entries
entries = ''
events.each do |_tag, _time, record|
v = ''
[_time, record].to_msgpack(v)
entries += compress(v)
end
chunk = ["tag1", entries, { 'compress' => 'gzip' }].to_msgpack

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, PEERADDR)
assert_equal 'gzip', option['compress']
end
end

assert_equal events, d.emits
end


def test_create_CompressedMessagePackEventStream_with_gzip_compress_option
d = create_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]

# create compressed entries
entries = ''
events.each do |_tag, _time, record|
v = ''
[_time, record].to_msgpack(v)
entries += compress(v)
end
chunk = ["tag1", entries, { 'compress' => 'gzip' }].to_msgpack

# check CompressedMessagePackEventStream is created
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0)

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, PEERADDR)
assert_equal 'gzip', option['compress']
end
end
d.emits
end

def test_send_large_chunk_warning
d = create_driver(CONFIG + %[
chunk_size_warn_limit 16M
Expand Down

0 comments on commit ddfe5f5

Please sign in to comment.