diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index ac81a29bc3..a674446442 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -23,18 +23,21 @@ module Fluent::Plugin class ForwardOutput < Output class Error < StandardError; end - class ResponseError < Error; end + class NoNodesAvailable < Error; end class ConnectionClosedError < Error; end - class ACKTimeoutError < Error; end Fluent::Plugin.register_output('forward', self) - helpers :compat_parameters + helpers :socket, :server, :timer, :thread, :compat_parameters LISTEN_PORT = 24224 + PROCESS_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC + desc 'The timeout time when sending event logs.' config_param :send_timeout, :time, default: 60 + # TODO: add linger_timeout, recv_timeout + desc 'The transport protocol to use for heartbeats.(udp,tcp,none)' config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp desc 'The interval of the heartbeat packer.' @@ -43,8 +46,6 @@ class ACKTimeoutError < Error; end config_param :recover_wait, :time, default: 10 desc 'The hard timeout used to detect server failure.' config_param :hard_timeout, :time, default: 60 - desc 'Set TTL to expire DNS cache in seconds.' - config_param :expire_dns_cache, :time, default: nil # 0 means disable cache desc 'The threshold parameter used to detect server faults.' config_param :phi_threshold, :integer, default: 16 desc 'Use the "Phi accrual failure detector" to detect server failure.' @@ -52,14 +53,20 @@ class ACKTimeoutError < Error; end desc 'Change the protocol to at-least-once.' config_param :require_ack_response, :bool, default: false # require in_forward to respond with ack + + ## The reason of default value of :ack_response_timeout: + # Linux default tcp_syn_retries is 5 (in many environment) + # 3 + 6 + 12 + 24 + 48 + 96 -> 189 (sec) desc 'This option is used when require_ack_response is true.' config_param :ack_response_timeout, :time, default: 190 - desc 'Reading data size from server' - config_param :read_length, :size, default: 512 # 512bytes + desc 'The interval while reading data from server' config_param :read_interval_msec, :integer, default: 50 # 50ms - # Linux default tcp_syn_retries is 5 (in many environment) - # 3 + 6 + 12 + 24 + 48 + 96 -> 189 (sec) + desc 'Reading data size from server' + config_param :read_length, :size, default: 512 # 512bytes + + desc 'Set TTL to expire DNS cache in seconds.' + config_param :expire_dns_cache, :time, default: nil # 0 means disable cache desc 'Enable client-side DNS round robin.' config_param :dns_round_robin, :bool, default: false # heartbeat_type 'udp' is not available for this @@ -109,7 +116,10 @@ def initialize @nodes = [] #=> [Node] @loop = nil @thread = nil - @finished = false + + @usock = nil + @sock_ack_waiting = nil + @sock_ack_waiting_mutex = nil end def configure(conf) @@ -157,79 +167,104 @@ def configure(conf) raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 end + def prefer_delayed_commit + @require_ack_response + end + def start super + # Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout + # But it should be overwritten by ack_response_timeout to rollback chunks after timeout + if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout + log.info "delayed_commit_timeout is overwritten by ack_response_timeout" + @delayed_commit_timeout = @ack_response_timeout + end + @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 - @usock = nil unless @heartbeat_type == :none - @loop = Coolio::Loop.new - if @heartbeat_type == :udp - # assuming all hosts use udp - @usock = Fluent::Compat::SocketUtil.create_udp_socket(@nodes.first.host) - @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) - @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) - @loop.attach(@hb) + @usock = socket_create_udp(@nodes.first.host, @nodes.first.port, nonblock: true) + server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length) do |data, sock| + sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host) + on_heartbeat(sockaddr, data) + end end + timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_timer)) + end - @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) - @loop.attach(@timer) - - @thread = Thread.new(&method(:run)) + if @require_ack_response + @sock_ack_waiting_mutex = Mutex.new + @sock_ack_waiting = [] + thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end end - def shutdown - @finished = true - if @loop - @loop.watchers.each {|w| w.detach } - # @loop.stop - @loop.stop rescue nil - end - @thread.join if @thread + def close @usock.close if @usock - super end - def run - @loop.run if @loop - rescue - log.error "unexpected error", error: $!.to_s - log.error_backtrace - end - def write(chunk) return if chunk.empty? + tag = chunk.metadata.tag + select_a_healthy_node{|node| node.send_data(tag, chunk) } + end + + ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :node, :time, :timeout) do + def expired?(now) + time + timeout < now + end + end + def try_write(chunk) + if chunk.empty? + commit_write(chunk.unique_id) + return + end tag = chunk.metadata.tag + sock, node = select_a_healthy_node{|n| n.send_data(tag, chunk) } + chunk_id = Base64.encode64(chunk.unique_id) + current_time = Process.clock_gettime(PROCESS_CLOCK_ID) + info = ACKWaitingSockInfo.new(sock, chunk_id, node, current_time, @ack_response_timeout) + @sock_ack_waiting_mutex.synchronize do + @sock_ack_waiting << info + end + end + + def select_a_healthy_node error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] + next unless node.available? - if node.available? - begin - node.send_data(tag, chunk) - return - rescue - # for load balancing during detecting crashed servers - error = $! # use the latest error - end + begin + ret = yield node + return ret, node + rescue + # for load balancing during detecting crashed servers + error = $! # use the latest error end end - if error - raise error - else - raise "no nodes are available" # TODO message - end + raise error if error + raise NoNodesAvailable, "no nodes are available" + end + + def create_transfer_socket(host, port, &block) + socket_create_tcp( + host, port, + linger_timeout: @send_timeout, + send_timeout: @send_timeout, + recv_timeout: @ack_response_timeout, + &block + ) end # MessagePack FixArray length is 3 @@ -282,21 +317,7 @@ def rebuild_weight_array @weight_array = weight_array end - class HeartbeatRequestTimer < Coolio::TimerWatcher - def initialize(interval, callback) - super(interval, true) - @callback = callback - end - - def on_timer - @callback.call - rescue - # TODO log? - end - end - def on_timer - return if @finished @nodes.each {|n| if n.tick rebuild_weight_array @@ -311,33 +332,86 @@ def on_timer } end - class HeartbeatHandler < Coolio::IO - def initialize(io, callback) - super(io) - @io = io - @callback = callback + def on_heartbeat(sockaddr, msg) + if node = @nodes.find {|n| n.sockaddr == sockaddr } + # log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port + if node.heartbeat + rebuild_weight_array + end end + end - def on_readable - begin - msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET - return + # return chunk id when succeeded for tests + def read_ack_from_sock(sock, unpacker) + begin + raw_data = sock.recv(@read_length) + rescue Errno::ECONNRESET + raw_data = "" + end + info = @sock_ack_waiting_mutex.synchronize{ @sock_ack_waiting.find{|i| i.sock == sock } } + + # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. + # If this happens we assume the data wasn't delivered and retry it. + if raw_data.empty? + log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port + info.node.disable! + return nil + else + unpacker.feed(raw_data) + res = unpacker.read + if res['ack'] != info.chunk_id + # Some errors may have occured when ack and chunk id is different, so send the chunk again. + log.warn "ack in response and chunk id in sent data are different", chunk_id: info.chunk_id, ack: res['ack'] + rollback_write(info.chunk_id) + return nil end - host = addr[3] - port = addr[1] - sockaddr = Socket.pack_sockaddr_in(port, host) - @callback.call(sockaddr, msg) - rescue - # TODO log? + return info.chunk_id + end + rescue => e + log.error "unexpected error while receiving ack message", error: e + log.error_backtrace + ensure + @sock_ack_waiting_mutex.synchronize do + @sock_ack_waiting.delete(info) end end - def on_heartbeat(sockaddr, msg) - if node = @nodes.find {|n| n.sockaddr == sockaddr } - #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port - if node.heartbeat - rebuild_weight_array + def ack_reader + select_interval = if @delayed_commit_timeout > 3 + 2 + else + @delayed_commit_timeout / 2.0 + end + + unpacker = Fluent::Engine.msgpack_unpacker + + while thread_current_running? + now = Process.clock_gettime(PROCESS_CLOCK_ID) + sockets = [] + @sock_ack_waiting_mutex.synchronize do + new_list = [] + @sock_ack_waiting.each do |info| + if info.expired?(now) + # There are 2 types of cases when no response has been received from socket: + # (1) the node does not support sending responses + # (2) the node does support sending response but responses have not arrived for some reasons. + log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port + info.node.disable! + info.sock.close rescue nil + rollback_write(info.chunk_id) + else + sockets << info.sock + new_list << info + end + end + @sock_ack_waiting = new_list + end + + readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) + next unless readable_sockets + + readable_sockets.each do |sock| + read_ack_from_sock(sock, unpacker) end end end @@ -390,20 +464,6 @@ def standby? @standby end - def connect - TCPSocket.new(resolved_host, port) - end - - def set_socket_options(sock) - opt = [1, @sender.send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - - opt = [@sender.send_timeout.to_i, 0].pack('L!L!') # struct timeval - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) - - sock - end - def establish_connection(sock) while available? && @state != :established begin @@ -434,98 +494,60 @@ def establish_connection(sock) end end - def send_data(tag, chunk) - sock = connect + def send_data_actual(sock, tag, chunk) @state = @sender.security ? :helo : :established - begin - set_socket_options(sock) - - if @state != :established - establish_connection(sock) - end + if @state != :established + establish_connection(sock) + end - unless available? - raise ConnectionClosedError, "failed to establish connection with node #{@name}" - end + unless available? + raise ConnectionClosedError, "failed to establish connection with node #{@name}" + end - option = { 'size' => chunk.size, 'compressed' => @compress } - option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response + option = { 'size' => chunk.size, 'compressed' => @compress } + option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response - # out_forward always uses Raw32 type for content. - # Raw16 can store only 64kbytes, and it should be much smaller than buffer chunk size. + # out_forward always uses Raw32 type for content. + # Raw16 can store only 64kbytes, and it should be much smaller than buffer chunk size. - sock.write @sender.forward_header # beginArray(3) - sock.write tag.to_msgpack # 1. writeRaw(tag) - chunk.open(compressed: @compress) do |chunk_io| - sock.write [0xdb, chunk_io.size].pack('CN') # 2. beginRaw(size) raw32 - IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es) - end - sock.write option.to_msgpack # 3. writeOption(option) - - if @sender.require_ack_response - # Waiting for a response here results in a decrease of throughput because a chunk queue is locked. - # To avoid a decrease of throughput, it is necessary to prepare a list of chunks that wait for responses - # and process them asynchronously. - if IO.select([sock], nil, nil, @sender.ack_response_timeout) - raw_data = begin - sock.recv(1024) - rescue Errno::ECONNRESET - "" - end - - # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. - # If this happens we assume the data wasn't delivered and retry it. - if raw_data.empty? - @log.warn "node closed the connection. regard it as unavailable.", host: @host, port: @port - disable! - raise ConnectionClosedError, "node #{@host}:#{@port} closed connection" - else - @unpacker.feed(raw_data) - res = @unpacker.read - if res['ack'] != option['chunk'] - # Some errors may have occured when ack and chunk id is different, so send the chunk again. - raise ResponseError, "ack in response and chunk id in sent data are different" - end - end + sock.write @sender.forward_header # beginArray(3) + sock.write tag.to_msgpack # 1. writeRaw(tag) + chunk.open(compressed: @compress) do |chunk_io| + sock.write [0xdb, chunk_io.size].pack('CN') # 2. beginRaw(size) raw32 + IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es) + end + sock.write option.to_msgpack # 3. writeOption(option) + end - else - # IO.select returns nil on timeout. - # There are 2 types of cases when no response has been received: - # (1) the node does not support sending responses - # (2) the node does support sending response but responses have not arrived for some reasons. - @log.warn "no response from node. regard it as unavailable.", host: @host, port: @port - disable! - raise ACKTimeoutError, "node #{host}:#{port} does not return ACK" - end - end + def send_data(tag, chunk) + sock = @sender.create_transfer_socket(resolved_host, port) + begin + send_data_actual(sock, tag, chunk) + rescue + sock.close rescue nil + raise + end - heartbeat(false) - res # for test - ensure - sock.close_write - sock.close + if @sender.require_ack_response + return sock # to read ACK from socket end + + sock.close_write rescue nil + sock.close rescue nil + heartbeat(false) + nil end # FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack def send_heartbeat case @sender.heartbeat_type when :tcp - sock = connect - begin - opt = [1, @sender.send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - # opt = [@sender.send_timeout.to_i, 0].pack('L!L!') # struct timeval - # sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) - + @sender.create_transfer_socket(resolved_host, port) do |sock| ## don't send any data to not cause a compatibility problem # sock.write FORWARD_TCP_HEARTBEAT_DATA # successful tcp connection establishment is considered as valid heartbeat heartbeat(true) - ensure - sock.close_write - sock.close end when :udp @usock.send "\0", 0, Socket.pack_sockaddr_in(@port, resolved_host) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 8108800687..51d95ece4e 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -40,6 +40,8 @@ class Output < Base CHUNKING_FIELD_WARN_NUM = 4 + PROCESS_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC + config_param :time_as_integer, :bool, default: false # `` and `` sections are available only when '#format' and '#write' are implemented @@ -138,7 +140,7 @@ def prefer_delayed_commit end # Internal states - FlushThreadState = Struct.new(:thread, :next_time) + FlushThreadState = Struct.new(:thread, :next_clock) DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do def expired? time + timeout < Time.now @@ -898,9 +900,9 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) @retry_mutex.synchronize do if @retry # success to flush chunks in retries if secondary - log.warn "retry succeeded by secondary.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id) + log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id) else - log.warn "retry succeeded.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id) + log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id) end @retry = nil end @@ -918,6 +920,8 @@ def rollback_write(chunk_id) # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } + primary = @as_secondary ? @primary_instance : self + primary.update_retry_state(chunk_id, @as_secondary) true else false @@ -930,7 +934,9 @@ def try_rollback_write info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } - log.warn "failed to flush the buffer chunk, timeout to commit.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time + log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time + primary = @as_secondary ? @primary_instance : self + primary.update_retry_state(info.chunk_id, @as_secondary) end end end @@ -943,7 +949,9 @@ def try_rollback_all info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } - log.info "delayed commit for buffer chunks was cancelled in shutdown", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id) + log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) + primary = @as_secondary ? @primary_instance : self + primary.update_retry_state(info.chunk_id, @as_secondary) end end end @@ -997,7 +1005,7 @@ def try_flush log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue => e - log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id) + log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id) if output.delayed_commit @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id } @@ -1005,35 +1013,52 @@ def try_flush end @buffer.takeback_chunk(chunk.unique_id) - @retry_mutex.synchronize do - if @retry - @counters_monitor.synchronize{ @num_errors += 1 } - if @retry.limit? - records = @buffer.queued_records - log.error "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.", plugin_id: plugin_id, retry_times: @retry.steps, records: records, error: e - log.error_backtrace e.backtrace - @buffer.clear_queue! - log.debug "buffer queue cleared", plugin_id: plugin_id - @retry = nil - else - @retry.step - msg = if using_secondary - "failed to flush the buffer with secondary output." - else - "failed to flush the buffer." - end - log.warn msg, plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e - log.warn_backtrace e.backtrace - end + update_retry_state(chunk.unique_id, using_secondary, e) + + raise if @under_plugin_development && !@retry_for_error_chunk + end + end + + def update_retry_state(chunk_id, using_secondary, error = nil) + @retry_mutex.synchronize do + @counters_monitor.synchronize{ @num_errors += 1 } + chunk_id_hex = dump_unique_id_hex(chunk_id) + + unless @retry + @retry = retry_state(@buffer_config.retry_randomize) + if error + log.warn "failed to flush the buffer.", retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error + log.warn_backtrace error.backtrace + end + return + end + + # @retry exists + + if error + if @retry.limit? + records = @buffer.queued_records + msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." + log.error msg, retry_times: @retry.steps, records: records, error: error + log.error_backtrace error.backtrace + elsif using_secondary + msg = "failed to flush the buffer with secondary output." + log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error + log.warn_backtrace error.backtrace else - @retry = retry_state(@buffer_config.retry_randomize) - @counters_monitor.synchronize{ @num_errors += 1 } - log.warn "failed to flush the buffer.", plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e - log.warn_backtrace e.backtrace + msg = "failed to flush the buffer." + log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error + log.warn_backtrace error.backtrace end end - raise if @under_plugin_development && !@retry_for_error_chunk + if @retry.limit? + @buffer.clear_queue! + log.debug "buffer queue cleared" + @retry = nil + else + @retry.step + end end end @@ -1060,7 +1085,7 @@ def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] - state.next_time = 0 + state.next_clock = 0 if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) state.thread.run else @@ -1102,7 +1127,7 @@ def enqueue_thread_wait # only for tests of output plugin def flush_thread_wakeup @output_flush_threads.each do |state| - state.next_time = 0 + state.next_clock = 0 state.thread.run end end @@ -1156,7 +1181,7 @@ def enqueue_thread_run end rescue => e raise if @under_plugin_development - log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e + log.error "unexpected error while checking flushed chunks. ignored.", error: e log.error_backtrace ensure @output_enqueue_thread_waiting = false @@ -1166,7 +1191,7 @@ def enqueue_thread_run end rescue => e # normal errors are rescued by inner begin-rescue clause. - log.error "error on enqueue thread", plugin_id: plugin_id, error: e + log.error "error on enqueue thread", error: e log.error_backtrace raise end @@ -1175,9 +1200,7 @@ def enqueue_thread_run def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval - # If the given clock_id is not supported, Errno::EINVAL is raised. - clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC - state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval + state.next_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + flush_thread_interval while !self.after_started? && !self.stopped? sleep 0.5 @@ -1187,16 +1210,18 @@ def flush_thread_run(state) begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running - time = Process.clock_gettime(clock_id) - interval = state.next_time - time + current_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + interval = state.next_clock - current_clock - if state.next_time <= time + if state.next_clock <= current_clock && (!@retry || @retry_mutex.synchronize{ @retry.next_time } <= Time.now) try_flush - # next_flush_interval uses flush_thread_interval or flush_thread_burst_interval (or retrying) + + # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) interval = next_flush_time.to_f - Time.now.to_f - # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected (because @retry still exists) - # @retry should be cleared if delayed commit is enabled? Or any other solution? - state.next_time = Process.clock_gettime(clock_id) + interval + # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected + # because @retry still exists (#commit_write is not called yet in #try_flush) + # @retry should be cleared if delayed commit is enabled? Or any other solution? + state.next_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + interval end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } @@ -1210,7 +1235,7 @@ def flush_thread_run(state) rescue => e # normal errors are rescued by output plugins in #try_flush # so this rescue section is for critical & unrecoverable errors - log.error "error on output thread", plugin_id: plugin_id, error: e + log.error "error on output thread", error: e log.error_backtrace raise end diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index d37519a9cb..94870e0058 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -24,7 +24,7 @@ require 'fluent/plugin_helper/formatter' require 'fluent/plugin_helper/inject' require 'fluent/plugin_helper/extract' -# require 'fluent/plugin_helper/socket' +require 'fluent/plugin_helper/socket' require 'fluent/plugin_helper/server' require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/compat_parameters' diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index cec2cff7fc..27a90f7c66 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -99,7 +99,7 @@ def configure(conf) if @_inject_hostname_key @_inject_hostname = @inject_config.hostname unless @_inject_hostname - @_inject_hostname = Socket.gethostname + @_inject_hostname = ::Socket.gethostname log.info "using hostname for specified field", host_key: @_inject_hostname_key, host_name: @_inject_hostname end end diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index ad1aae2326..e058da8a3e 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -108,11 +108,13 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: # sock.remote_port # # ... # end - def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback) + def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback) raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) + raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp + raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 @@ -120,8 +122,10 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backl socket_options[:linger_timeout] ||= 0 end - socket_option_validate!(proto, **socket_options) - socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } + unless socket + socket_option_validate!(proto, **socket_options) + socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } + end if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog @@ -140,9 +144,15 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backl raise "not implemented yet" when :udp raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes - sock = server_create_udp_socket(shared, bind, port) - socket_option_setter.call(sock) - server = EventHandler::UDPServer.new(sock, max_bytes, flags, @log, @under_plugin_development, &callback) + if socket + sock = socket + close_socket = false + else + sock = server_create_udp_socket(shared, bind, port) + socket_option_setter.call(sock) + close_socket = true + end + server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback) when :unix raise "not implemented yet" else @@ -267,10 +277,11 @@ def server_create_tls_socket(shared, bind, port) end class CallbackSocket - def initialize(server_type, sock, enabled_events = []) + def initialize(server_type, sock, enabled_events = [], close_socket: true) @server_type = server_type @sock = sock @enabled_events = enabled_events + @close_socket = close_socket end def remote_addr @@ -294,12 +305,7 @@ def write(data) end def close - @sock.close - # close cool.io socket in another thread, not to make deadlock - # for flushing @_write_buffer when conn.close is called in callback - # ::Thread.new{ - # @sock.close - # } + @sock.close if @close_socket end def data(&callback) @@ -334,8 +340,8 @@ def write(data) end class UDPCallbackSocket < CallbackSocket - def initialize(sock, peeraddr) - super("udp", sock, []) + def initialize(sock, peeraddr, **kwargs) + super("udp", sock, [], **kwargs) @peeraddr = peeraddr end @@ -358,7 +364,7 @@ def write(data) module EventHandler class UDPServer < Coolio::IO - def initialize(sock, max_bytes, flags, log, under_plugin_development, &callback) + def initialize(sock, max_bytes, flags, close_socket, log, under_plugin_development, &callback) raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket) super(sock) @@ -366,6 +372,7 @@ def initialize(sock, max_bytes, flags, log, under_plugin_development, &callback) @sock = sock @max_bytes = max_bytes @flags = flags + @close_socket = close_socket @log = log @under_plugin_development = under_plugin_development @callback = callback @@ -398,7 +405,7 @@ def on_readable_with_sock rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET return end - @callback.call(data, UDPCallbackSocket.new(@sock, addr)) + @callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket)) rescue => e @log.error "unexpected error in processing UDP data", error: e @log.error_backtrace diff --git a/lib/fluent/plugin_helper/socket.rb b/lib/fluent/plugin_helper/socket.rb new file mode 100644 index 0000000000..87a5aaab72 --- /dev/null +++ b/lib/fluent/plugin_helper/socket.rb @@ -0,0 +1,101 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'socket' +require 'ipaddr' + +require_relative 'socket_option' + +module Fluent + module PluginHelper + module Socket + # stop : [-] + # shutdown : [-] + # close : [-] + # terminate: [-] + + include Fluent::PluginHelper::SocketOption + + attr_reader :_sockets # for tests + + # TODO: implement connection pool for specified host + + def socket_create(proto, host, port, **kwargs, &block) + case proto + when :tcp + socket_create_tcp(host, port, **kwargs, &block) + when :udp + socket_create_udp(host, port, **kwargs, &block) + when :tls + socket_create_tls(host, port, **kwargs, &block) + when :unix + raise "not implemented yet" + else + raise ArgumentError, "invalid protocol: #{proto}" + end + end + + def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block) + sock = TCPSocket.new(host, port) + socket_option_set(sock, resolve_name: resolve_name, **kwargs) + if block + begin + block.call(sock) + ensure + sock.close_write rescue nil + sock.close rescue nil + end + else + sock + end + end + + def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block) + family = IPAddr.new(IPSocket.getaddress(host)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 + sock = UDPSocket.new(family) + socket_option_set(sock, resolve_name: resolve_name, **kwargs) + sock.connect(host, port) if connect + if block + begin + block.call(sock) + ensure + sock.close rescue nil + end + else + sock + end + end + + def socket_create_tls(host, port, resolve_name: false, certopts: {}, &block) + raise "not implemented yet" + end + + # socket_create_socks ? + + def initialize + super + # @_sockets = [] # for keepalived sockets / connection pool + end + + # def close + # @_sockets.each do |sock| + # sock.close + # end + # super + # end + end + end +end diff --git a/lib/fluent/plugin_helper/socket_option.rb b/lib/fluent/plugin_helper/socket_option.rb index 7ef9a933f0..82ad59ccf0 100644 --- a/lib/fluent/plugin_helper/socket_option.rb +++ b/lib/fluent/plugin_helper/socket_option.rb @@ -15,6 +15,7 @@ # require 'socket' +require 'fcntl' # this module is only for Socket/Server plugin helpers module Fluent @@ -50,10 +51,13 @@ def socket_option_certopts_validate!(certopts) raise "not implemented yet" end - def socket_option_set(sock, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) unless resolve_name.nil? sock.do_not_reverse_lookup = !resolve_name end + if nonblock + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) + end if linger_timeout optval = [1, linger_timeout.to_i].pack(FORMAT_STRUCT_LINGER) socket_option_set_one(sock, :SO_LINGER, optval) @@ -71,7 +75,7 @@ def socket_option_set(sock, resolve_name: nil, linger_timeout: nil, recv_timeout end def socket_option_set_one(sock, option, value) - sock.setsockopt(Socket::SOL_SOCKET, option, value) + sock.setsockopt(::Socket::SOL_SOCKET, option, value) rescue => e log.warn "failed to set socket option", sock: sock.class, option: option, value: value, error: e end diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 5f52af883b..d563a183d1 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -34,15 +34,11 @@ class Base def initialize(klass, opts: {}, &block) if klass.is_a?(Class) + @instance = klass.new if block - # Create new class for test w/ overwritten methods - # klass.dup is worse because its ancestors does NOT include original class name - klass_name = klass.name - klass = Class.new(klass) - klass.define_singleton_method("name") { klass_name } - klass.module_eval(&block) + @instance.singleton_class.module_eval(&block) + @instance.send(:initialize) end - @instance = klass.new else @instance = klass end @@ -143,16 +139,26 @@ def instance_hook_before_stopped def instance_shutdown instance_hook_before_stopped - @instance.stop unless @instance.stopped? - @instance.before_shutdown unless @instance.before_shutdown? - @instance.shutdown unless @instance.shutdown? + unless @instance.stopped? + @instance.stop rescue nil + end + unless @instance.before_shutdown? + @instance.before_shutdown rescue nil + end + unless @instance.shutdown? + @instance.shutdown rescue nil + end if @instance.respond_to?(:event_loop_wait_until_stop) @instance.event_loop_wait_until_stop end - @instance.after_shutdown unless @instance.after_shutdown? - @instance.close unless @instance.closed? + unless @instance.after_shutdown? + @instance.after_shutdown rescue nil + end + unless @instance.closed? + @instance.close rescue nil + end if @instance.respond_to?(:thread_wait_until_stop) @instance.thread_wait_until_stop @@ -162,7 +168,9 @@ def instance_shutdown @instance.server_wait_until_stop end - @instance.terminate unless @instance.terminated? + unless @instance.terminated? + @instance.terminate rescue nil + end if @socket_manager_server @socket_manager_server.close diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 0b5785e5da..9e02dc5de7 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -9,6 +9,11 @@ class ForwardOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup + @d = nil + end + + def teardown + @d.instance_shutdown if @d end TARGET_HOST = '127.0.0.1' @@ -32,35 +37,23 @@ def create_driver(conf=CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) { attr_reader :responses, :exceptions - def write(chunk) - super - end - def initialize super @responses = [] @exceptions = [] end - def configure(conf) - super - m = Module.new do - def send_data(tag, chunk) - @sender.responses << super - rescue => e - @sender.exceptions << e - raise e - end - end - @nodes.each do |node| - node.singleton_class.prepend(m) - end + def read_ack_from_sock(sock, unpacker) + @responses << super + rescue => e + @exceptions << e + raise e end }.configure(conf) end - def test_configure - d = create_driver(%[ + test 'configure' do + @d = d = create_driver(%[ self_hostname localhost name test @@ -78,8 +71,8 @@ def test_configure assert_equal TARGET_PORT, node.port end - def test_configure_traditional - d = create_driver(< name test @@ -96,49 +89,53 @@ def test_configure_traditional assert_equal( 10*1024*1024, instance.buffer.chunk_limit_size ) end - def test_configure_udp_heartbeat - d = create_driver(CONFIG + "\nheartbeat_type udp") + test 'configure_udp_heartbeat' do + @d = d = create_driver(CONFIG + "\nheartbeat_type udp") assert_equal :udp, d.instance.heartbeat_type end - def test_configure_none_heartbeat - d = create_driver(CONFIG + "\nheartbeat_type none") + test 'configure_none_heartbeat' do + @d = d = create_driver(CONFIG + "\nheartbeat_type none") assert_equal :none, d.instance.heartbeat_type end - def test_configure_expire_dns_cache - d = create_driver(CONFIG + "\nexpire_dns_cache 5") + test 'configure_expire_dns_cache' do + @d = d = create_driver(CONFIG + "\nexpire_dns_cache 5") assert_equal 5, d.instance.expire_dns_cache end - def test_configure_dns_round_robin + test 'configure_dns_round_robin udp' do assert_raise(Fluent::ConfigError) do create_driver(CONFIG + "\nheartbeat_type udp\ndns_round_robin true") end + end - d = create_driver(CONFIG + "\nheartbeat_type tcp\ndns_round_robin true") + test 'configure_dns_round_robin tcp' do + @d = d = create_driver(CONFIG + "\nheartbeat_type tcp\ndns_round_robin true") assert_equal true, d.instance.dns_round_robin + end - d = create_driver(CONFIG + "\nheartbeat_type none\ndns_round_robin true") + test 'configure_dns_round_robin none' do + @d = d = create_driver(CONFIG + "\nheartbeat_type none\ndns_round_robin true") assert_equal true, d.instance.dns_round_robin end - def test_configure_no_server + test 'configure_no_server' do assert_raise(Fluent::ConfigError, 'forward output plugin requires at least one is required') do create_driver('') end end - def test_compress_default_value - d = create_driver + test 'compress_default_value' do + @d = d = create_driver assert_equal :text, d.instance.compress node = d.instance.nodes.first assert_equal :text, node.instance_variable_get(:@compress) end - def test_set_compress_is_gzip - d = create_driver(CONFIG + %[compress gzip]) + test 'set_compress_is_gzip' do + @d = d = create_driver(CONFIG + %[compress gzip]) assert_equal :gzip, d.instance.compress assert_equal :gzip, d.instance.buffer.compress @@ -146,11 +143,11 @@ def test_set_compress_is_gzip assert_equal :gzip, node.instance_variable_get(:@compress) end - def test_set_compress_is_gzip_in_buffer_section + test 'set_compress_is_gzip_in_buffer_section' do mock = flexmock($log) mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ") - d = create_driver(CONFIG + %[ + @d = d = create_driver(CONFIG + %[ type memory compress gzip @@ -163,25 +160,29 @@ def test_set_compress_is_gzip_in_buffer_section assert_equal :text, node.instance_variable_get(:@compress) end - def test_phi_failure_detector - d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0]) + test 'phi_failure_detector disabled' do + @d = d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0]) node = d.instance.nodes.first stub(node.failure).phi { raise 'Should not be called' } node.tick assert_equal node.available, true + end - d = create_driver(CONFIG + %[phi_failure_detector true \n phi_threshold 0]) + test 'phi_failure_detector enabled' do + @d = d = create_driver(CONFIG + %[phi_failure_detector true \n phi_threshold 0]) node = d.instance.nodes.first node.tick assert_equal node.available, false end - def test_wait_response_timeout_config - d = create_driver(CONFIG) + test 'require_ack_response is disabled in default' do + @d = d = create_driver(CONFIG) assert_equal false, d.instance.require_ack_response assert_equal 190, d.instance.ack_response_timeout + end - d = create_driver(CONFIG + %[ + test 'require_ack_response can be enabled' do + @d = d = create_driver(CONFIG + %[ require_ack_response true ack_response_timeout 2s ]) @@ -189,10 +190,10 @@ def test_wait_response_timeout_config assert_equal 2, d.instance.ack_response_timeout end - def test_send_with_time_as_integer + test 'send_with_time_as_integer' do target_input_driver = create_target_input_driver - d = create_driver(CONFIG + %[flush_interval 1s]) + @d = d = create_driver(CONFIG + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") @@ -200,8 +201,7 @@ def test_send_with_time_as_integer {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } + target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) @@ -215,14 +215,13 @@ def test_send_with_time_as_integer assert_equal_event_time(time, events[1][1]) assert_equal ['test', time, records[1]], events[1] - assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned assert_empty d.instance.exceptions end - def test_send_without_time_as_integer + test 'send_without_time_as_integer' do target_input_driver = create_target_input_driver - d = create_driver(CONFIG + %[ + @d = d = create_driver(CONFIG + %[ flush_interval 1s time_as_integer false ]) @@ -233,8 +232,7 @@ def test_send_without_time_as_integer {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } + target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) @@ -248,14 +246,13 @@ def test_send_without_time_as_integer assert_equal_event_time(time, events[1][1]) assert_equal ['test', time, records[1]], events[1] - assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned assert_empty d.instance.exceptions end - def test_send_comprssed_message_pack_stream_if_compress_is_gzip + test 'send_comprssed_message_pack_stream_if_compress_is_gzip' do target_input_driver = create_target_input_driver - d = create_driver(CONFIG + %[ + @d = d = create_driver(CONFIG + %[ flush_interval 1s compress gzip ]) @@ -266,8 +263,7 @@ def test_send_comprssed_message_pack_stream_if_compress_is_gzip {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } + target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) @@ -283,10 +279,10 @@ def test_send_comprssed_message_pack_stream_if_compress_is_gzip assert_equal ['test', time, records[1]], events[1] end - def test_send_to_a_node_supporting_responses + test 'send_to_a_node_supporting_responses' do target_input_driver = create_target_input_driver - d = create_driver(CONFIG + %[flush_interval 1s]) + @d = d = create_driver(CONFIG + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") @@ -294,8 +290,7 @@ def test_send_to_a_node_supporting_responses {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } + target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) @@ -307,14 +302,14 @@ def test_send_to_a_node_supporting_responses assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned + assert_empty d.instance.responses # not attempt to receive responses, so it's empty assert_empty d.instance.exceptions end - def test_send_to_a_node_not_supporting_responses + test 'send_to_a_node_not_supporting_responses' do target_input_driver = create_target_input_driver - d = create_driver(CONFIG + %[flush_interval 1s]) + @d = d = create_driver(CONFIG + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") @@ -322,8 +317,7 @@ def test_send_to_a_node_not_supporting_responses {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } + target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) @@ -335,17 +329,22 @@ def test_send_to_a_node_not_supporting_responses assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned + assert_empty d.instance.responses # not attempt to receive responses, so it's empty assert_empty d.instance.exceptions end - def test_require_a_node_supporting_responses_to_respond_with_ack + test 'a node supporting responses' do target_input_driver = create_target_input_driver - d = create_driver(CONFIG + %[ - flush_interval 1s + @d = d = create_driver(CONFIG + %[ require_ack_response true ack_response_timeout 1s + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + ]) time = event_time("2011-01-02 13:14:15 UTC") @@ -354,12 +353,10 @@ def test_require_a_node_supporting_responses_to_respond_with_ack {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } - d.run(default_tag: 'test') do - records.each do |record| - d.feed(time, record) - end + target_input_driver.run(expect_records: 2) do + d.end_if{ d.instance.responses.length > 0 } + d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do + d.feed([[time, records[0]], [time,records[1]]]) end end @@ -368,87 +365,100 @@ def test_require_a_node_supporting_responses_to_respond_with_ack assert_equal ['test', time, records[1]], events[1] assert_equal 1, d.instance.responses.length - assert d.instance.responses[0].has_key?('ack') assert_empty d.instance.exceptions end - def test_require_a_node_not_supporting_responses_to_respond_with_ack - target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: true) + test 'a destination node not supporting responses by just ignoring' do + target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: false) - d = create_driver(CONFIG + %[ - flush_interval 1s + @d = d = create_driver(CONFIG + %[ require_ack_response true ack_response_timeout 1s + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + ]) + node = d.instance.nodes.first + delayed_commit_timeout_value = nil + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - assert_raise Fluent::Plugin::ForwardOutput::ACKTimeoutError do - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } - d.run(default_tag: 'test', timeout: 2, wait_flush_completion: false) do - records.each do |record| - d.feed(time, record) - end - end + target_input_driver.end_if{ d.instance.rollback_count > 0 } + target_input_driver.end_if{ !node.available } + target_input_driver.run(expect_records: 2, timeout: 25) do + d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false) do + delayed_commit_timeout_value = d.instance.delayed_commit_timeout + d.feed([[time, records[0]], [time,records[1]]]) end end + assert_equal 1, delayed_commit_timeout_value + events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - node = d.instance.nodes.first - assert_equal false, node.available # node is regarded as unavailable when timeout + assert{ d.instance.rollback_count > 0 } - assert_empty d.instance.responses # send_data() raises exception, so response is missing - assert_equal 1, d.instance.exceptions.size + logs = d.instance.log.logs + assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } } + assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } } end - # bdf1f4f104c00a791aa94dc20087fe2011e1fd83 - def test_require_a_node_not_supporting_responses_2_to_respond_with_ack - # in_forward, that doesn't support ack feature, and disconnect immediately + test 'a destination node not supporting responses by disconnection' do target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: true) - d = create_driver(CONFIG + %[ - flush_interval 1s + @d = d = create_driver(CONFIG + %[ require_ack_response true ack_response_timeout 5s + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + ]) + node = d.instance.nodes.first + delayed_commit_timeout_value = nil + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - assert_raise Fluent::Plugin::ForwardOutput::ACKTimeoutError do - target_input_driver.run do - d.end_if{ d.instance.responses.length == 1 } - d.run(default_tag: 'test', timeout: 2, wait_flush_completion: false) do - records.each do |record| - d.feed(time, record) - end - end + target_input_driver.end_if{ d.instance.rollback_count > 0 } + target_input_driver.end_if{ !node.available } + target_input_driver.run(expect_records: 2, timeout: 25) do + d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false) do + delayed_commit_timeout_value = d.instance.delayed_commit_timeout + d.feed([[time, records[0]], [time,records[1]]]) end end + assert_equal 5, delayed_commit_timeout_value + events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] - assert_equal 0, d.instance.responses.size - assert_equal 1, d.instance.exceptions.size # send_data() fails and to be retried + assert{ d.instance.rollback_count > 0 } - node = d.instance.nodes.first - assert_equal false, node.available # node is regarded as unavailable when unexpected EOF + logs = d.instance.log.logs + assert{ logs.any?{|log| log.include?("failed to flush the buffer chunk, timeout to commit.") } } + assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } } end - def test_authentication_with_shared_key + test 'authentication_with_shared_key' do input_conf = TARGET_CONFIG + %[ self_hostname in.localhost @@ -473,7 +483,7 @@ def test_authentication_with_shared_key shared_key fluentd-sharedkey ] - d = create_driver(output_conf) + @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") records = [ @@ -495,7 +505,7 @@ def test_authentication_with_shared_key assert_equal(['test', time, records[1]], events[1]) end - def test_authentication_with_user_auth + test 'authentication_with_user_auth' do input_conf = TARGET_CONFIG + %[ self_hostname in.localhost @@ -527,7 +537,7 @@ def test_authentication_with_user_auth password fluentd ] - d = create_driver(output_conf) + @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") records = [ @@ -564,8 +574,8 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG }.configure(conf) end - def test_heartbeat_type_none - d = create_driver(CONFIG + "\nheartbeat_type none") + test 'heartbeat_type_none' do + @d = d = create_driver(CONFIG + "\nheartbeat_type none") node = d.instance.nodes.first assert_equal Fluent::Plugin::ForwardOutput::NoneHeartbeatNode, node.class @@ -578,23 +588,23 @@ def test_heartbeat_type_none assert_equal node.available, true end - def test_heartbeat_type_udp - d = create_driver(CONFIG + "\nheartbeat_type udp") + test 'heartbeat_type_udp' do + @d = d = create_driver(CONFIG + "\nheartbeat_type udp") d.instance.start usock = d.instance.instance_variable_get(:@usock) - timer = d.instance.instance_variable_get(:@timer) - hb = d.instance.instance_variable_get(:@hb) + servers = d.instance.instance_variable_get(:@_servers) + timers = d.instance.instance_variable_get(:@_timers) assert_equal UDPSocket, usock.class - assert_equal Fluent::Plugin::ForwardOutput::HeartbeatRequestTimer, timer.class - assert_equal Fluent::Plugin::ForwardOutput::HeartbeatHandler, hb.class + assert servers.find{|s| s.title == :out_forward_heartbeat_receiver } + assert timers.include?(:out_forward_heartbeat_request) mock(usock).send("\0", 0, Socket.pack_sockaddr_in(TARGET_PORT, '127.0.0.1')).once - timer.disable # call send_heartbeat at just once - timer.on_timer + # timer.disable # call send_heartbeat at just once + d.instance.send(:on_timer) end - def test_acts_as_secondary + test 'acts_as_secondary' do i = Fluent::Plugin::ForwardOutput.new conf = config_element( 'match', diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 5c912f0e7b..334947d91f 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -1460,6 +1460,7 @@ def waiting(seconds) @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.start @i.after_start + @i.log = Fluent::Test::TestLogger.new end test '#format is called for each event streams' do @@ -1740,8 +1741,9 @@ def waiting(seconds) assert{ chunks[0...3].all?{|c| !c.empty? } } - # rollback is in progress, but some may be flushed again after rollback - Timecop.freeze( Time.parse('2016-04-13 14:04:46 +0900') ) + # rollback is in progress, but some may be flushed again in retry state, after rollback + # retry.next_time is 14:04:49 + Timecop.freeze( Time.parse('2016-04-13 14:04:51 +0900') ) @i.enqueue_thread_wait @i.flush_thread_wakeup diff --git a/test/test_test_drivers.rb b/test/test_test_drivers.rb index 99e2dee81a..695f74e6c8 100644 --- a/test/test_test_drivers.rb +++ b/test/test_test_drivers.rb @@ -100,7 +100,10 @@ def start sub_test_case 'output plugin test driver' do test 'returns the block value as the return value of #run' do - d = Fluent::Test::Driver::Output.new(Class.new(Fluent::Plugin::Output)) do + d = Fluent::Test::Driver::Output.new(Fluent::Plugin::Output) do + def prefer_buffered_processing + false + end def process(tag, es) # drop end @@ -116,7 +119,7 @@ def process(tag, es) sub_test_case 'filter plugin test driver' do test 'returns the block value as the return value of #run' do - d = Fluent::Test::Driver::Filter.new(Class.new(Fluent::Plugin::Filter)) do + d = Fluent::Test::Driver::Filter.new(Fluent::Plugin::Filter) do def filter(tag, time, record) record end