Skip to content

Commit

Permalink
Merge pull request fluent#2657 from ganmacs/remove-mixin
Browse files Browse the repository at this point in the history
there is no reason calling mesgpacker_unapcker/packer from Fluent::Engine
  • Loading branch information
ganmacs authored Oct 25, 2019
2 parents 8535c9a + 4209dcf commit fcd467c
Show file tree
Hide file tree
Showing 18 changed files with 70 additions and 62 deletions.
3 changes: 1 addition & 2 deletions lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def abort_message(time, record)
require 'fluent/engine'

begin
u = Fluent::Engine.msgpack_factory.unpacker($stdin)
u = Fluent::MessagePackFactory.msgpack_unpacker($stdin)
u.each {|record|
w.write(record)
}
Expand All @@ -340,4 +340,3 @@ def abort_message(time, record)
$stderr.puts "Unknown format '#{format}'"
exit 1
end

2 changes: 1 addition & 1 deletion lib/fluent/compat/exec_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def call(io)

class MessagePackParser < Parser
def call(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
@u = Fluent::MessagePackFactory.msgpack_unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
Expand Down
6 changes: 2 additions & 4 deletions lib/fluent/counter/base_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
module Fluent
module Counter
class BaseSocket < Coolio::TCPSocket
include Fluent::MessagePackFactory::Mixin

def packed_write(data)
write pack(data)
end

def on_read(data)
msgpack_unpacker.feed_each(data) do |d|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(data) do |d|
on_message d
end
end
Expand All @@ -39,7 +37,7 @@ def on_message(data)
private

def pack(data)
msgpack_packer.pack(data)
Fluent::MessagePackFactory.msgpack_packer.pack(data)
end
end
end
Expand Down
2 changes: 0 additions & 2 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

module Fluent
class EngineClass
include Fluent::MessagePackFactory::Mixin

def initialize
@root_agent = nil
@default_loop = nil
Expand Down
12 changes: 5 additions & 7 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
module Fluent
class EventStream
include Enumerable
include MessagePackFactory::Mixin
include Fluent::Plugin::Compressable

# dup does deep copy for event stream
Expand Down Expand Up @@ -56,7 +55,7 @@ def each(unapcker: nil, &block)

def to_msgpack_stream(time_int: false, packer: nil)
return to_msgpack_stream_forced_integer(packer: packer) if time_int
out = packer || msgpack_packer
out = packer || Fluent::MessagePackFactory.msgpack_packer
each {|time,record|
out.write([time,record])
}
Expand All @@ -69,7 +68,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)
end

def to_msgpack_stream_forced_integer(packer: nil)
out = packer || msgpack_packer
out = packer || Fluent::MessagePackFactory.msgpack_packer
each {|time,record|
out.write([time.to_i,record])
}
Expand Down Expand Up @@ -238,7 +237,7 @@ def ensure_unpacked!(unpacker: nil)
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
(unpacker || msgpack_unpacker).feed_each(@data) do |time, record|
(unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
Expand All @@ -262,7 +261,7 @@ def each(unpacker: nil, &block)
else
@unpacked_times = []
@unpacked_records = []
(unpacker || msgpack_unpacker).feed_each(@data) do |time, record|
(unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
Expand Down Expand Up @@ -319,12 +318,11 @@ def ensure_decompressed!
end

module ChunkMessagePackEventStreamer
include MessagePackFactory::Mixin
# chunk.extend(ChunkEventStreamer)
# => chunk.each{|time, record| ... }
def each(unpacker: nil, &block)
open do |io|
(unpacker || msgpack_unpacker(io)).each(&block)
(unpacker || Fluent::MessagePackFactory.msgpack_unpacker(io)).each(&block)
end
nil
end
Expand Down
21 changes: 19 additions & 2 deletions lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,39 @@ module MessagePackFactory

module Mixin
def msgpack_factory
if $log
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::MessagePackFactory.engine_factory')
end
MessagePackFactory.engine_factory
end

def msgpack_packer(*args)
msgpack_factory.packer(*args)
if $log
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::MessagePackFactory.msgpack_packer')
end
MessagePackFactory.msgpack_packer(*args)
end

def msgpack_unpacker(*args)
msgpack_factory.unpacker(*args)
if $log
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::MessagePackFactory.msgpack_unpacker')
end
MessagePackFactory.msgpack_unpacker(*args)
end
end

def self.engine_factory
@@engine_factory || factory
end

def self.msgpack_packer(*args)
engine_factory.packer(*args)
end

def self.msgpack_unpacker(*args)
engine_factory.unpacker(*args)
end

def self.factory
factory = MessagePack::Factory.new
factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
Expand Down
5 changes: 2 additions & 3 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class FileChunkError < StandardError; end
# path_suffix: path suffix string, like '.log' (or any other user specified)

include SystemConfig::Mixin
include MessagePackFactory::Mixin

FILE_PERMISSION = 0644

Expand Down Expand Up @@ -221,7 +220,7 @@ def restore_metadata(bindata)

unless data
# old type of restore
data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}
data = Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}
end

now = Fluent::Clock.real_now
Expand Down Expand Up @@ -403,7 +402,7 @@ def restore_metadata_with_new_format(chunk)
if chunk.slice(0, 2) == BUFFER_HEADER
size = chunk.slice(2, 4).unpack('N').first
if size
return msgpack_unpacker(symbolize_keys: true).feed(chunk.slice(6, size)).read rescue nil
return Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(chunk.slice(6, size)).read rescue nil
end
end

Expand Down
3 changes: 1 addition & 2 deletions lib/fluent/plugin/buffer/file_single_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class FileChunkError < StandardError; end
## state: b/q - 'b'(on stage), 'q'(enqueued)

include SystemConfig::Mixin
include MessagePackFactory::Mixin

PATH_EXT = 'buf'
PATH_SUFFIX = ".#{PATH_EXT}"
Expand Down Expand Up @@ -216,7 +215,7 @@ def restore_size(chunk_format)
count = 0
File.open(@path, 'rb') { |f|
if chunk_format == :msgpack
msgpack_unpacker(f).each { |d| count += 1 }
Fluent::MessagePackFactory.msgpack_unpacker(f).each { |d| count += 1 }
else
f.each_line { |l| count += 1 }
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def read_messages(conn, &block)
serializer = :to_json.to_proc
feeder = ->(d){ parser << d }
else # msgpack
parser = Fluent::Engine.msgpack_factory.unpacker
parser = Fluent::MessagePackFactory.msgpack_unpacker
serializer = :to_msgpack.to_proc
feeder = ->(d){
parser.feed_each(d){|obj|
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def on_read(data)
@y.on_parse_complete = @on_message
else
m = method(:on_read_msgpack)
@u = Fluent::Engine.msgpack_factory.unpacker
@u = Fluent::MessagePackFactory.msgpack_unpacker
end

singleton_class.module_eval do
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def initialize(sender, server, failure:, connection_manager:, ack_handler:)
username: server.username,
)

@unpacker = Fluent::Engine.msgpack_unpacker
@unpacker = Fluent::MessagePackFactory.msgpack_unpacker

@resolved_host = nil
@resolved_time = 0
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_forward/ack_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def initialize(timeout:, log:, read_length:)
@timeout = timeout
@log = log
@read_length = read_length
@unpacker = Fluent::Engine.msgpack_unpacker
@unpacker = Fluent::MessagePackFactory.msgpack_unpacker
end

def collect_response(select_interval)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def write(chunk)
chain = NullOutputChain.instance
chunk.open {|io|
# TODO use MessagePackIoEventStream
u = Fluent::Engine.msgpack_factory.unpacker(io)
u = Fluent::MessagePackFactory.msgpack_unpacker(io)
begin
u.each {|(tag,entries)|
es = MultiEventStream.new
Expand Down
4 changes: 2 additions & 2 deletions test/plugin/out_forward/test_handshake_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class HandshakeProtocolTest < Test::Unit::TestCase
handshake.invoke(sock, ri, ['HELO', {}])

assert_equal(ri.state, :pingpong)
Fluent::Engine.msgpack_factory.unpacker.feed_each(sock.string) do |ping|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(sock.string) do |ping|
assert_equal(ping.size, 6)
assert_equal(ping[0], 'PING')
assert_equal(ping[1], hostname)
Expand All @@ -38,7 +38,7 @@ class HandshakeProtocolTest < Test::Unit::TestCase
handshake.invoke(sock, ri, ['HELO', { 'auth' => 'auth' }])

assert_equal(ri.state, :pingpong)
Fluent::Engine.msgpack_factory.unpacker.feed_each(sock.string) do |ping|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(sock.string) do |ping|
assert_equal(ping.size, 6)
assert_equal(ping[0], 'PING')
assert_equal(ping[1], hostname)
Expand Down
18 changes: 9 additions & 9 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def create_driver(conf=CONFIG)
chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
assert_equal 'gzip', option['compressed']
end
Expand Down Expand Up @@ -568,7 +568,7 @@ def create_driver(conf=CONFIG)
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0)

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
assert_equal 'gzip', option['compressed']
end
Expand All @@ -592,7 +592,7 @@ def create_driver(conf=CONFIG)
assert chunk.size < (32 * 1024 * 1024)

d.run(shutdown: false) do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
end
end
Expand Down Expand Up @@ -624,7 +624,7 @@ def create_driver(conf=CONFIG)
chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack

d.run(shutdown: false) do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
end
end
Expand Down Expand Up @@ -654,7 +654,7 @@ def create_driver(conf=CONFIG)

# d.run => send_data
d.run(shutdown: false) do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
end
end
Expand Down Expand Up @@ -1004,11 +1004,11 @@ def create_driver(conf=CONFIG)
end

def packer(*args)
Fluent::Engine.msgpack_factory.packer(*args)
Fluent::MessagePackFactory.msgpack_packer(*args)
end

def unpacker
Fluent::Engine.msgpack_factory.unpacker
Fluent::MessagePackFactory.msgpack_unpacker
end

# res
Expand Down Expand Up @@ -1158,9 +1158,9 @@ def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: f
execute_test_with_source_hostname_key(*keys) { |events|
entries = ''
events.each { |tag, time, record|
Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush
Fluent::MessagePackFactory.msgpack_packer(entries).write([time, record]).flush
}
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write(["tag1", entries]).to_s
}
end
end
Expand Down
10 changes: 5 additions & 5 deletions test/plugin/test_in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_time

d.run do
d.expected_emits.each {|tag,_time,record|
send_data Fluent::Engine.msgpack_factory.packer.write([tag, 0, record]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, 0, record]).to_s
}
end
end
Expand All @@ -33,7 +33,7 @@ def test_message

d.run do
d.expected_emits.each {|tag,_time,record|
send_data Fluent::Engine.msgpack_factory.packer.write([tag, _time, record]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, _time, record]).to_s
}
end
end
Expand All @@ -51,7 +51,7 @@ def test_forward
d.expected_emits.each {|tag,_time,record|
entries << [_time, record]
}
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write(["tag1", entries]).to_s
end
end

Expand All @@ -66,9 +66,9 @@ def test_packed_forward
d.run do
entries = ''
d.expected_emits.each {|tag,_time,record|
Fluent::Engine.msgpack_factory.packer(entries).write([_time, record]).flush
Fluent::MessagePackFactory.msgpack_packer(entries).write([_time, record]).flush
}
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
send_data Fluent::MessagePackFactory.msgpack_packer.write(["tag1", entries]).to_s
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/plugin/test_out_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def test_write_event_time
d.emit({"a"=>2}, time)

expect = ["test",
Fluent::Engine.msgpack_factory.packer.write([time,{"a"=>1}]).to_s +
Fluent::Engine.msgpack_factory.packer.write([time,{"a"=>2}]).to_s
Fluent::MessagePackFactory.msgpack_packer.write([time,{"a"=>1}]).to_s +
Fluent::MessagePackFactory.msgpack_packer.write([time,{"a"=>2}]).to_s
]
expect = Fluent::Engine.msgpack_factory.packer.write(expect).to_s
expect = Fluent::MessagePackFactory.msgpack_packer.write(expect).to_s

result = d.run
assert_equal(expect, result)
Expand Down
Loading

0 comments on commit fcd467c

Please sign in to comment.