Skip to content

Commit

Permalink
Merge branch 'master' into support-mutual-auth-out-forward
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly authored Dec 10, 2018
2 parents 7526f97 + c037e41 commit 487a97e
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 41 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# v1.3

## Release v1.3.1 - 2018/11/27

### Enhancements

* out_forward: Separate parameter names for certificate
https://github.com/fluent/fluentd/pull/2181
https://github.com/fluent/fluentd/pull/2190
* out_forward: Add `verify_connection_at_startup` parameter to check connection setting at startup phase
https://github.com/fluent/fluentd/pull/2184
* config: Check right slash position in regexp type
https://github.com/fluent/fluentd/pull/2176
* parser_nginx: Support multiple IPs in `http_x_forwarded_for` field
https://github.com/fluent/fluentd/pull/2171

### Bug fixes

* fluent-cat: Fix retry limit handling
https://github.com/fluent/fluentd/pull/2193
* record_accessor helper: Delete top level field with bracket style
https://github.com/fluent/fluentd/pull/2192
* filter_record_transformer: Keep `class` methond to avoid undefined method error
https://github.com/fluent/fluentd/pull/2186

## Release v1.3.0 - 2018/11/10

### New features
Expand Down
39 changes: 25 additions & 14 deletions lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
format = 'json'
message_key = 'message'
time_as_integer = false
retry_limit = 5

op.on('-p', '--port PORT', "fluent tcp port (default: #{port})", Integer) {|i|
port = i
Expand Down Expand Up @@ -75,6 +76,10 @@
time_as_integer = true
}

op.on('--retry-limit N', "Specify the number of retry limit (default: #{retry_limit})", Integer) {|n|
retry_limit = n
}

singleton_class.module_eval do
define_method(:usage) do |msg|
puts op.to_s
Expand Down Expand Up @@ -107,6 +112,8 @@
class Writer
include MonitorMixin

RetryLimitError = Class.new(StandardError)

class TimerThread
def initialize(writer)
@writer = writer
Expand All @@ -130,7 +137,7 @@ def run
end
end

def initialize(tag, connector, time_as_integer: false)
def initialize(tag, connector, time_as_integer: false, retry_limit: 5)
@tag = tag
@connector = connector
@socket = false
Expand All @@ -142,7 +149,7 @@ def initialize(tag, connector, time_as_integer: false)
@pending = []
@pending_limit = 1024 # TODO
@retry_wait = 1
@retry_limit = 5 # TODO
@retry_limit = retry_limit
@time_as_integer = time_as_integer

super()
Expand Down Expand Up @@ -236,21 +243,24 @@ def get_socket
end

def try_connect
now = Time.now.to_i

unless @error_history.empty?
# wait before re-connecting
wait = @retry_wait * (2 ** (@error_history.size-1))
if now <= @socket_time + wait
return false
begin
now = Time.now.to_i

unless @error_history.empty?
# wait before re-connecting
wait = 1 #@retry_wait * (2 ** (@error_history.size-1))
if now <= @socket_time + wait
sleep(wait)
try_connect
end
end
end

begin
@socket = @connector.call
@error_history.clear
return true

rescue RetryLimitError => ex
raise ex
rescue
$stderr.puts "connect failed: #{$!}"
@error_history << $!
Expand All @@ -263,9 +273,10 @@ def try_connect
}
@pending.clear
@error_history.clear
raise RetryLimitError, "exceed retry limit"
else
retry
end

return false
end
end

Expand All @@ -285,7 +296,7 @@ def abort_message(time, record)
}
end

w = Writer.new(tag, connector, time_as_integer: time_as_integer)
w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit)
w.start

case format
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/filter_record_transformer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def expand(__str_to_eval__, tag, time, record, tag_parts, tag_prefix, tag_suffix
end

(Object.instance_methods).each do |m|
undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/
undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member|^class$/
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ def start
super

if @pos_file
pos_file_dir = File.dirname(@pos_file)
FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir)
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
Expand Down Expand Up @@ -946,7 +948,7 @@ def initialize(file, file_mutex, seek, pos, inode)
@file = file
@file_mutex = file_mutex
@seek = seek
@pos = pos
@pos = pos
@inode = inode
end

Expand Down
32 changes: 30 additions & 2 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class ConnectionClosedError < Error; end
desc 'Ignore DNS resolution and errors at startup time.'
config_param :ignore_network_errors_at_startup, :bool, default: false

desc 'Verify that a connection can be made with one of out_forward nodes at the time of startup.'
config_param :verify_connection_at_startup, :bool, default: false

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

Expand All @@ -92,7 +95,8 @@ class ConnectionClosedError < Error; end
config_param :tls_verify_hostname, :bool, default: true
desc 'The additional CA certificate path for TLS.'
config_param :tls_ca_cert_path, :array, value_type: :string, default: nil
config_param :tls_cert_path, :array, value_type: :string, default: nil, deprecated: "Use tls_ca_cert_path instead"
desc 'The additional certificate path for TLS.'
config_param :tls_cert_path, :array, value_type: :string, default: nil
desc 'The client certificate path for TLS.'
config_param :tls_client_cert_path, :string, default: nil
desc 'The client private key path for TLS.'
Expand Down Expand Up @@ -173,7 +177,7 @@ def configure(conf)
end

if @transport == :tls
# for backward compatibility
# socket helper adds CA cert or signed certificate to same cert store internally so unify it in this place.
if @tls_cert_path && !@tls_cert_path.empty?
@tls_ca_cert_path = @tls_cert_path
end
Expand Down Expand Up @@ -264,6 +268,17 @@ def start
@sock_ack_waiting = []
thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end

if @verify_connection_at_startup
@nodes.each do |node|
begin
node.verify_connection
rescue StandardError => e
log.fatal "forward's connection setting error: #{e.message}"
raise Fluent::UnrecoverableError, e.message
end
end
end
end

def close
Expand Down Expand Up @@ -582,6 +597,19 @@ def standby?
@standby
end

def verify_connection
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
end
ensure
sock.close
end
end

def establish_connection(sock, ri)
while available? && ri.state != :established
begin
Expand Down
26 changes: 13 additions & 13 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1222,29 +1222,29 @@ def update_retry_state(chunk_id, using_secondary, error = nil)

# @retry exists

if error
if @retry.limit?
if @retry.limit?
if error
records = @buffer.queued_records
msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue."
log.error msg, retry_times: @retry.steps, records: records, error: error
log.error_backtrace error.backtrace
elsif using_secondary
msg = "failed to flush the buffer with secondary output."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
else
msg = "failed to flush the buffer."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
end

if @retry.limit?
@buffer.clear_queue!
log.debug "buffer queue cleared"
@retry = nil
else
@retry.step
if error
if using_secondary
msg = "failed to flush the buffer with secondary output."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
else
msg = "failed to flush the buffer."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/parser_nginx.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Plugin
class NginxParser < RegexpParser
Plugin.register_parser("nginx", self)

config_set_default :expression, /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)"(?:\s+(?<http_x_forwarded_for>[^ ]+))?)?$/
config_set_default :expression, /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)"(?:\s+\"?(?<http_x_forwarded_for>[^\"]*)\"?)?)?$/
config_set_default :time_format, "%d/%b/%Y:%H:%M:%S %z"
end
end
Expand Down
10 changes: 8 additions & 2 deletions lib/fluent/plugin_helper/record_accessor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ def initialize(param)
if @keys.is_a?(Array)
@last_key = @keys.last
@dig_keys = @keys[0..-2]
mcall = method(:call_dig)
mdelete = method(:delete_nest)
if @dig_keys.empty?
@keys = @keys.first
mcall = method(:call_index)
mdelete = method(:delete_top)
else
mcall = method(:call_dig)
mdelete = method(:delete_nest)
end
else
# Call [] for single key to reduce dig overhead
mcall = method(:call_index)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

module Fluent

VERSION = '1.3.0'
VERSION = '1.3.1'

end
19 changes: 19 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,25 @@ def count_timer_object
end
end

def test_pos_file_dir_creation
config = config_element("", "", {
"tag" => "tail",
"path" => "#{TMP_DIR}/*.txt",
"format" => "none",
"pos_file" => "#{TMP_DIR}/pos/tail.pos",
"read_from_head" => true,
"refresh_interval" => 1
})
d = create_driver(config, false)
d.run(expect_emits: 1, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test3\n" }
end
assert_path_exist("#{TMP_DIR}/pos/tail.pos")
cleanup_directory(TMP_DIR)

d.instance_shutdown
end

def test_z_refresh_watchers
plugin = create_driver(EX_CONFIG, false).instance
sio = StringIO.new
Expand Down
Loading

0 comments on commit 487a97e

Please sign in to comment.