Skip to content

Commit

Permalink
Merge pull request #1356 from fluent/introduce-socket-plugin-helper
Browse files Browse the repository at this point in the history
Introduce socket plugin helper
  • Loading branch information
tagomoris authored Dec 9, 2016
2 parents 66fb153 + c627ef5 commit 3613b32
Show file tree
Hide file tree
Showing 11 changed files with 564 additions and 382 deletions.
368 changes: 195 additions & 173 deletions lib/fluent/plugin/out_forward.rb

Large diffs are not rendered by default.

117 changes: 71 additions & 46 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Output < Base

CHUNKING_FIELD_WARN_NUM = 4

PROCESS_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC

config_param :time_as_integer, :bool, default: false

# `<buffer>` and `<secondary>` sections are available only when '#format' and '#write' are implemented
Expand Down Expand Up @@ -138,7 +140,7 @@ def prefer_delayed_commit
end

# Internal states
FlushThreadState = Struct.new(:thread, :next_time)
FlushThreadState = Struct.new(:thread, :next_clock)
DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do
def expired?
time + timeout < Time.now
Expand Down Expand Up @@ -898,9 +900,9 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
@retry_mutex.synchronize do
if @retry # success to flush chunks in retries
if secondary
log.warn "retry succeeded by secondary.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id)
log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id)
else
log.warn "retry succeeded.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id)
log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id)
end
@retry = nil
end
Expand All @@ -918,6 +920,8 @@ def rollback_write(chunk_id)
# in many cases, false can be just ignored
if @buffer.takeback_chunk(chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
true
else
false
Expand All @@ -930,7 +934,9 @@ def try_rollback_write
info = @dequeued_chunks.shift
if @buffer.takeback_chunk(info.chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
log.warn "failed to flush the buffer chunk, timeout to commit.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time
log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(info.chunk_id, @as_secondary)
end
end
end
Expand All @@ -943,7 +949,9 @@ def try_rollback_all
info = @dequeued_chunks.shift
if @buffer.takeback_chunk(info.chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
log.info "delayed commit for buffer chunks was cancelled in shutdown", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id)
log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id)
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(info.chunk_id, @as_secondary)
end
end
end
Expand Down Expand Up @@ -997,43 +1005,60 @@ def try_flush
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue => e
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id)
if output.delayed_commit
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id }
end
end
@buffer.takeback_chunk(chunk.unique_id)

@retry_mutex.synchronize do
if @retry
@counters_monitor.synchronize{ @num_errors += 1 }
if @retry.limit?
records = @buffer.queued_records
log.error "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.", plugin_id: plugin_id, retry_times: @retry.steps, records: records, error: e
log.error_backtrace e.backtrace
@buffer.clear_queue!
log.debug "buffer queue cleared", plugin_id: plugin_id
@retry = nil
else
@retry.step
msg = if using_secondary
"failed to flush the buffer with secondary output."
else
"failed to flush the buffer."
end
log.warn msg, plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e
log.warn_backtrace e.backtrace
end
update_retry_state(chunk.unique_id, using_secondary, e)

raise if @under_plugin_development && !@retry_for_error_chunk
end
end

def update_retry_state(chunk_id, using_secondary, error = nil)
@retry_mutex.synchronize do
@counters_monitor.synchronize{ @num_errors += 1 }
chunk_id_hex = dump_unique_id_hex(chunk_id)

unless @retry
@retry = retry_state(@buffer_config.retry_randomize)
if error
log.warn "failed to flush the buffer.", retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
return
end

# @retry exists

if error
if @retry.limit?
records = @buffer.queued_records
msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue."
log.error msg, retry_times: @retry.steps, records: records, error: error
log.error_backtrace error.backtrace
elsif using_secondary
msg = "failed to flush the buffer with secondary output."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
else
@retry = retry_state(@buffer_config.retry_randomize)
@counters_monitor.synchronize{ @num_errors += 1 }
log.warn "failed to flush the buffer.", plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e
log.warn_backtrace e.backtrace
msg = "failed to flush the buffer."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
end

raise if @under_plugin_development && !@retry_for_error_chunk
if @retry.limit?
@buffer.clear_queue!
log.debug "buffer queue cleared"
@retry = nil
else
@retry.step
end
end
end

Expand All @@ -1060,7 +1085,7 @@ def submit_flush_once
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
state.next_time = 0
state.next_clock = 0
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.thread.run
else
Expand Down Expand Up @@ -1102,7 +1127,7 @@ def enqueue_thread_wait
# only for tests of output plugin
def flush_thread_wakeup
@output_flush_threads.each do |state|
state.next_time = 0
state.next_clock = 0
state.thread.run
end
end
Expand Down Expand Up @@ -1156,7 +1181,7 @@ def enqueue_thread_run
end
rescue => e
raise if @under_plugin_development
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e
log.error "unexpected error while checking flushed chunks. ignored.", error: e
log.error_backtrace
ensure
@output_enqueue_thread_waiting = false
Expand All @@ -1166,7 +1191,7 @@ def enqueue_thread_run
end
rescue => e
# normal errors are rescued by inner begin-rescue clause.
log.error "error on enqueue thread", plugin_id: plugin_id, error: e
log.error "error on enqueue thread", error: e
log.error_backtrace
raise
end
Expand All @@ -1175,9 +1200,7 @@ def enqueue_thread_run
def flush_thread_run(state)
flush_thread_interval = @buffer_config.flush_thread_interval

# If the given clock_id is not supported, Errno::EINVAL is raised.
clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC
state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval
state.next_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + flush_thread_interval

while !self.after_started? && !self.stopped?
sleep 0.5
Expand All @@ -1187,16 +1210,18 @@ def flush_thread_run(state)
begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
time = Process.clock_gettime(clock_id)
interval = state.next_time - time
current_clock = Process.clock_gettime(PROCESS_CLOCK_ID)
interval = state.next_clock - current_clock

if state.next_time <= time
if state.next_clock <= current_clock && (!@retry || @retry_mutex.synchronize{ @retry.next_time } <= Time.now)
try_flush
# next_flush_interval uses flush_thread_interval or flush_thread_burst_interval (or retrying)

# next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
interval = next_flush_time.to_f - Time.now.to_f
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected (because @retry still exists)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_time = Process.clock_gettime(clock_id) + interval
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
# because @retry still exists (#commit_write is not called yet in #try_flush)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + interval
end

if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
Expand All @@ -1210,7 +1235,7 @@ def flush_thread_run(state)
rescue => e
# normal errors are rescued by output plugins in #try_flush
# so this rescue section is for critical & unrecoverable errors
log.error "error on output thread", plugin_id: plugin_id, error: e
log.error "error on output thread", error: e
log.error_backtrace
raise
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
require 'fluent/plugin_helper/formatter'
require 'fluent/plugin_helper/inject'
require 'fluent/plugin_helper/extract'
# require 'fluent/plugin_helper/socket'
require 'fluent/plugin_helper/socket'
require 'fluent/plugin_helper/server'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/compat_parameters'
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper/inject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def configure(conf)
if @_inject_hostname_key
@_inject_hostname = @inject_config.hostname
unless @_inject_hostname
@_inject_hostname = Socket.gethostname
@_inject_hostname = ::Socket.gethostname
log.info "using hostname for specified field", host_key: @_inject_hostname_key, host_name: @_inject_hostname
end
end
Expand Down
41 changes: 24 additions & 17 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,24 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared:
# sock.remote_port
# # ...
# end
def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp

raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

if proto == :tcp || proto == :tls # default linger_timeout only for server
socket_options[:linger_timeout] ||= 0
end

socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
unless socket
socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
end

if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
Expand All @@ -140,9 +144,15 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backl
raise "not implemented yet"
when :udp
raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
sock = server_create_udp_socket(shared, bind, port)
socket_option_setter.call(sock)
server = EventHandler::UDPServer.new(sock, max_bytes, flags, @log, @under_plugin_development, &callback)
if socket
sock = socket
close_socket = false
else
sock = server_create_udp_socket(shared, bind, port)
socket_option_setter.call(sock)
close_socket = true
end
server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
when :unix
raise "not implemented yet"
else
Expand Down Expand Up @@ -267,10 +277,11 @@ def server_create_tls_socket(shared, bind, port)
end

class CallbackSocket
def initialize(server_type, sock, enabled_events = [])
def initialize(server_type, sock, enabled_events = [], close_socket: true)
@server_type = server_type
@sock = sock
@enabled_events = enabled_events
@close_socket = close_socket
end

def remote_addr
Expand All @@ -294,12 +305,7 @@ def write(data)
end

def close
@sock.close
# close cool.io socket in another thread, not to make deadlock
# for flushing @_write_buffer when conn.close is called in callback
# ::Thread.new{
# @sock.close
# }
@sock.close if @close_socket
end

def data(&callback)
Expand Down Expand Up @@ -334,8 +340,8 @@ def write(data)
end

class UDPCallbackSocket < CallbackSocket
def initialize(sock, peeraddr)
super("udp", sock, [])
def initialize(sock, peeraddr, **kwargs)
super("udp", sock, [], **kwargs)
@peeraddr = peeraddr
end

Expand All @@ -358,14 +364,15 @@ def write(data)

module EventHandler
class UDPServer < Coolio::IO
def initialize(sock, max_bytes, flags, log, under_plugin_development, &callback)
def initialize(sock, max_bytes, flags, close_socket, log, under_plugin_development, &callback)
raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket)

super(sock)

@sock = sock
@max_bytes = max_bytes
@flags = flags
@close_socket = close_socket
@log = log
@under_plugin_development = under_plugin_development
@callback = callback
Expand Down Expand Up @@ -398,7 +405,7 @@ def on_readable_with_sock
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET
return
end
@callback.call(data, UDPCallbackSocket.new(@sock, addr))
@callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket))
rescue => e
@log.error "unexpected error in processing UDP data", error: e
@log.error_backtrace
Expand Down
Loading

0 comments on commit 3613b32

Please sign in to comment.