From 8b3f97a2c90680a7d5a8f7820bb33a818c261d0f Mon Sep 17 00:00:00 2001 From: ganmacs Date: Wed, 3 Aug 2016 14:26:45 +0900 Subject: [PATCH 1/4] Add size `size` key in option at out_forward Sending array's length with msgpack is always 3, so remove FORWARD_HEADER_EXT --- lib/fluent/compat/output.rb | 4 ++ lib/fluent/plugin/out_forward.rb | 17 +++--- test/plugin/test_out_forward.rb | 94 +++++++++++++++++++++++++++++++- 3 files changed, 102 insertions(+), 13 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index d634da69b1..24cbf386cd 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -111,6 +111,10 @@ module ChunkSizeCompatMixin def size self.bytesize end + + def size_of_events + @size + @adding_size + end end module BufferedChunkMixin diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index e703c7f468..ad0056249f 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -276,16 +276,10 @@ def rebuild_weight_array @weight_array = weight_array end - # MessagePack FixArray length = 3 (if @extend_internal_protocol) - # = 2 (else) - FORWARD_HEADER = [0x92].pack('C').freeze - FORWARD_HEADER_EXT = [0x93].pack('C').freeze + # MessagePack FixArray length is 3 + FORWARD_HEADER = [0x93].pack('C').freeze def forward_header - if @extend_internal_protocol - FORWARD_HEADER_EXT - else - FORWARD_HEADER - end + FORWARD_HEADER end #FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack @@ -336,8 +330,9 @@ def send_data(node, tag, chunk) # writeRawBody(packed_es) chunk.write_to(sock) + option = { 'size' => chunk.size_of_events } + if @extend_internal_protocol - option = {} option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response sock.write option.to_msgpack @@ -374,6 +369,8 @@ def send_data(node, tag, chunk) raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK" end end + else + sock.write option.to_msgpack end node.heartbeat(false) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index f6e4bd2219..173dfc9efb 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -120,6 +120,70 @@ def test_wait_response_timeout_config assert_equal 2, d.instance.ack_response_timeout end + def test_sending_contains_with_ack + target_input_driver = create_target_input_driver(true) + + d = create_driver(CONFIG + %[ + ack_response_timeout 1s + ]) + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + + records = [ + {"a" => 1}, + {"a" => 2} + ] + d.register_run_post_condition do + d.instance.responses.length == 1 + end + + target_input_driver.run do + d.run do + records.each do |record| + d.emit record, time + end + end + end + + emits = target_input_driver.emits + assert_equal ['test', time, records[0]], emits[0] + assert_equal ['test', time, records[1]], emits[1] + + assert_equal target_input_driver.instance.received_options[0]['size'], 2 + end + + def test_sending_contains_without_ack + target_input_driver = create_target_input_driver(true) + + d = create_driver(CONFIG + %[ + ack_response_timeout 1s + ]) + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + + records = [ + {"a" => 1}, + {"a" => 2} + ] + d.register_run_post_condition do + d.instance.responses.length == 1 + end + + target_input_driver.run do + d.run do + records.each do |record| + d.emit record, time + end + end + end + + emits = target_input_driver.emits + assert_equal ['test', time, records[0]], emits[0] + assert_equal ['test', time, records[1]], emits[1] + + assert_equal target_input_driver.instance.received_options[0]['size'], 2 + end + def test_send_with_time_as_integer target_input_driver = create_target_input_driver @@ -379,12 +443,18 @@ def create_target_input_driver(do_respond=false, disconnect=false, conf=TARGET_C DummyEngineDriver.new(Fluent::ForwardInput) { handler_class = Class.new(Fluent::ForwardInput::Handler) { |klass| attr_reader :chunk_counter # for checking if received data is successfully deserialized + attr_reader :received_options def initialize(sock, log, on_message) @sock = sock @log = log @chunk_counter = 0 - @on_message = on_message + @received_options = [] + @on_message = ->(msg, chunk_size, source) { + option = on_message.call(msg, chunk_size, source) + @received_options << option + option + } @source = nil end @@ -409,7 +479,7 @@ def close end } - define_method(:start) do + define_method(:_start) do @thread = Thread.new do Socket.tcp_server_loop(@bind, @port) do |sock, client_addrinfo| begin @@ -431,12 +501,15 @@ def close sock.close_write sock.close end + @received_options = handler.received_options end end end end - def shutdown + attr_reader :received_options + + def _shutdown @thread.kill @thread.join end @@ -457,9 +530,24 @@ def test_heartbeat_type_none assert_equal node.available, true end + module SuppressCallSuperMixin + def start + _start + end + + def before_shutdown + # nothing + end + + def shutdown + _shutdown + end + end + class DummyEngineDriver < Fluent::Test::TestDriver def initialize(klass, &block) super(klass, &block) + @instance.class.prepend(SuppressCallSuperMixin) @engine = DummyEngineClass.new end From 80c418f2112846e43bf40b8abb8d00537fc750c6 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Wed, 3 Aug 2016 17:39:13 +0900 Subject: [PATCH 2/4] pass `size` to initilizer of MessagePackEventStream * pass `size` to initilizer of MessagePackEventStream if `size` exists in `option` at in_forward * remove unnecessary condtion --- lib/fluent/plugin/in_forward.rb | 7 ++++--- test/plugin/test_in_forward.rb | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 7a2b6b8e64..ca0a2a7191 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -169,10 +169,11 @@ def on_message(msg, chunk_size, source) if entries.class == String # PackedForward - es = MessagePackEventStream.new(entries) + option = msg[2] + size = (option && option['size']) || 0 + es = MessagePackEventStream.new(entries, nil, size.to_i) es = check_and_skip_invalid_event(tag, es, source) if @skip_invalid_event router.emit_stream(tag, es) - option = msg[2] elsif entries.class == Array # Forward @@ -265,7 +266,7 @@ def on_read(data) @y = Yajl::Parser.new @y.on_parse_complete = lambda { |obj| option = @on_message.call(obj, @chunk_counter, @source) - respond option if option + respond option @chunk_counter = 0 } else diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 5c0d5c1819..815b69556d 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -242,6 +242,30 @@ def test_message_json end end + def test_set_size_to_option + d = create_driver + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag1", time, {"a"=>2}] + ] + + entries = '' + events.each {|_tag, _time, record| + [_time, record].to_msgpack(entries) + } + + chunk = ["tag1", entries, { 'size' => events.length }].to_msgpack + + d.run do + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| + option = d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + assert_equal option['size'], events.length + end + end + end + def test_send_large_chunk_warning d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M From fb5f42e9a87b110bb5139a489ec30a961fc924d3 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 4 Aug 2016 14:12:59 +0900 Subject: [PATCH 3/4] Add a comment why this module is needed --- test/plugin/test_out_forward.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 173dfc9efb..e3248a782a 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -530,6 +530,8 @@ def test_heartbeat_type_none assert_equal node.available, true end + # To suppress calling `ForwardInput#start` in `DummyEngineDriver`, + # `DummyEnigneDriver` should avoid calling CallSuperMixin.prepend at `Fluent::Compat::Input#initialize`. module SuppressCallSuperMixin def start _start From 8efc126d868fc548c0640abe09db112cff82e146 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 4 Aug 2016 22:22:12 +0900 Subject: [PATCH 4/4] Remove unused code * `@extend_internal_protocol` is same as `@require_ack_response`, so remove it. --- lib/fluent/plugin/out_forward.rb | 82 +++++++++++++------------------- test/plugin/test_out_forward.rb | 2 - 2 files changed, 33 insertions(+), 51 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index ad0056249f..a3eb2d1870 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -78,8 +78,6 @@ def initialize desc 'Use the "Phi accrual failure detector" to detect server failure.' config_param :phi_failure_detector, :bool, default: true - # if any options added that requires extended forward api, fix @extend_internal_protocol - desc 'Change the protocol to at-least-once.' config_param :require_ack_response, :bool, default: false # require in_forward to respond with ack desc 'This option is used when require_ack_response is true.' @@ -95,8 +93,6 @@ def initialize config_param :port, :integer, default: LISTEN_PORT config_param :host, :string, default: nil - attr_accessor :extend_internal_protocol - def configure(conf) super @@ -112,13 +108,6 @@ def configure(conf) recover_sample_size = @recover_wait / @heartbeat_interval - # add options here if any options addes which uses extended protocol - @extend_internal_protocol = if @require_ack_response - true - else - false - end - if @dns_round_robin if @heartbeat_type == :udp raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" @@ -308,7 +297,7 @@ def send_data(node, tag, chunk) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) - # beginArray(2) + # beginArray(3) sock.write forward_header # writeRaw(tag) @@ -331,50 +320,45 @@ def send_data(node, tag, chunk) chunk.write_to(sock) option = { 'size' => chunk.size_of_events } + option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response + sock.write option.to_msgpack + + if @require_ack_response && @ack_response_timeout > 0 + # Waiting for a response here results in a decrease of throughput because a chunk queue is locked. + # To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses + # and process them asynchronously. + if IO.select([sock], nil, nil, @ack_response_timeout) + raw_data = sock.recv(1024) + + # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. + # If this happens we assume the data wasn't delivered and retry it. + if raw_data.empty? + @log.warn "node #{node.host}:#{node.port} closed the connection. regard it as unavailable." + node.disable! + raise ForwardOutputConnectionClosedError, "node #{node.host}:#{node.port} closed connection" + else + # Serialization type of the response is same as sent data. + res = MessagePack.unpack(raw_data) - if @extend_internal_protocol - option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response - sock.write option.to_msgpack - - if @require_ack_response && @ack_response_timeout > 0 - # Waiting for a response here results in a decrease of throughput because a chunk queue is locked. - # To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses - # and process them asynchronously. - if IO.select([sock], nil, nil, @ack_response_timeout) - raw_data = sock.recv(1024) - - # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. - # If this happens we assume the data wasn't delivered and retry it. - if raw_data.empty? - @log.warn "node #{node.host}:#{node.port} closed the connection. regard it as unavailable." - node.disable! - raise ForwardOutputConnectionClosedError, "node #{node.host}:#{node.port} closed connection" - else - # Serialization type of the response is same as sent data. - res = MessagePack.unpack(raw_data) - - if res['ack'] != option['chunk'] - # Some errors may have occured when ack and chunk id is different, so send the chunk again. - raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different" - end + if res['ack'] != option['chunk'] + # Some errors may have occured when ack and chunk id is different, so send the chunk again. + raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different" end - - else - # IO.select returns nil on timeout. - # There are 2 types of cases when no response has been received: - # (1) the node does not support sending responses - # (2) the node does support sending response but responses have not arrived for some reasons. - @log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable." - node.disable! - raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK" end + + else + # IO.select returns nil on timeout. + # There are 2 types of cases when no response has been received: + # (1) the node does not support sending responses + # (2) the node does support sending response but responses have not arrived for some reasons. + @log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable." + node.disable! + raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK" end - else - sock.write option.to_msgpack end node.heartbeat(false) - return res # for test + res # for test ensure sock.close_write sock.close diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index e3248a782a..22edc4b4b2 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -107,7 +107,6 @@ def test_phi_failure_detector def test_wait_response_timeout_config d = create_driver(CONFIG) - assert_equal false, d.instance.extend_internal_protocol assert_equal false, d.instance.require_ack_response assert_equal 190, d.instance.ack_response_timeout @@ -115,7 +114,6 @@ def test_wait_response_timeout_config require_ack_response true ack_response_timeout 2s ]) - assert d.instance.extend_internal_protocol assert d.instance.require_ack_response assert_equal 2, d.instance.ack_response_timeout end