diff --git a/CHANGELOG.md b/CHANGELOG.md index a9ae6630d4..964ef10bb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # v1.3 +## Release v1.3.1 - 2018/11/27 + +### Enhancements + +* out_forward: Separate parameter names for certificate + https://github.com/fluent/fluentd/pull/2181 + https://github.com/fluent/fluentd/pull/2190 +* out_forward: Add `verify_connection_at_startup` parameter to check connection setting at startup phase + https://github.com/fluent/fluentd/pull/2184 +* config: Check right slash position in regexp type + https://github.com/fluent/fluentd/pull/2176 +* parser_nginx: Support multiple IPs in `http_x_forwarded_for` field + https://github.com/fluent/fluentd/pull/2171 + +### Bug fixes + +* fluent-cat: Fix retry limit handling + https://github.com/fluent/fluentd/pull/2193 +* record_accessor helper: Delete top level field with bracket style + https://github.com/fluent/fluentd/pull/2192 +* filter_record_transformer: Keep `class` methond to avoid undefined method error + https://github.com/fluent/fluentd/pull/2186 + ## Release v1.3.0 - 2018/11/10 ### New features diff --git a/lib/fluent/command/cat.rb b/lib/fluent/command/cat.rb index 935db5dfd1..f218ac1776 100644 --- a/lib/fluent/command/cat.rb +++ b/lib/fluent/command/cat.rb @@ -34,6 +34,7 @@ format = 'json' message_key = 'message' time_as_integer = false +retry_limit = 5 op.on('-p', '--port PORT', "fluent tcp port (default: #{port})", Integer) {|i| port = i @@ -75,6 +76,10 @@ time_as_integer = true } +op.on('--retry-limit N', "Specify the number of retry limit (default: #{retry_limit})", Integer) {|n| + retry_limit = n +} + singleton_class.module_eval do define_method(:usage) do |msg| puts op.to_s @@ -107,6 +112,8 @@ class Writer include MonitorMixin + RetryLimitError = Class.new(StandardError) + class TimerThread def initialize(writer) @writer = writer @@ -130,7 +137,7 @@ def run end end - def initialize(tag, connector, time_as_integer: false) + def initialize(tag, connector, time_as_integer: false, retry_limit: 5) @tag = tag @connector = connector @socket = false @@ -142,7 +149,7 @@ def initialize(tag, connector, time_as_integer: false) @pending = [] @pending_limit = 1024 # TODO @retry_wait = 1 - @retry_limit = 5 # TODO + @retry_limit = retry_limit @time_as_integer = time_as_integer super() @@ -236,21 +243,24 @@ def get_socket end def try_connect - now = Time.now.to_i - - unless @error_history.empty? - # wait before re-connecting - wait = @retry_wait * (2 ** (@error_history.size-1)) - if now <= @socket_time + wait - return false + begin + now = Time.now.to_i + + unless @error_history.empty? + # wait before re-connecting + wait = 1 #@retry_wait * (2 ** (@error_history.size-1)) + if now <= @socket_time + wait + sleep(wait) + try_connect + end end - end - begin @socket = @connector.call @error_history.clear return true + rescue RetryLimitError => ex + raise ex rescue $stderr.puts "connect failed: #{$!}" @error_history << $! @@ -263,9 +273,10 @@ def try_connect } @pending.clear @error_history.clear + raise RetryLimitError, "exceed retry limit" + else + retry end - - return false end end @@ -285,7 +296,7 @@ def abort_message(time, record) } end -w = Writer.new(tag, connector, time_as_integer: time_as_integer) +w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit) w.start case format diff --git a/lib/fluent/plugin/filter_record_transformer.rb b/lib/fluent/plugin/filter_record_transformer.rb index c6a74b2fd4..02ba686a39 100644 --- a/lib/fluent/plugin/filter_record_transformer.rb +++ b/lib/fluent/plugin/filter_record_transformer.rb @@ -316,7 +316,7 @@ def expand(__str_to_eval__, tag, time, record, tag_parts, tag_prefix, tag_suffix end (Object.instance_methods).each do |m| - undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ + undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member|^class$/ end end end diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index dbe628bea7..67bbec48de 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -185,6 +185,8 @@ def start super if @pos_file + pos_file_dir = File.dirname(@pos_file) + FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) @@ -946,7 +948,7 @@ def initialize(file, file_mutex, seek, pos, inode) @file = file @file_mutex = file_mutex @seek = seek - @pos = pos + @pos = pos @inode = inode end diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 6263fc3acb..782d487dfa 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -77,6 +77,9 @@ class ConnectionClosedError < Error; end desc 'Ignore DNS resolution and errors at startup time.' config_param :ignore_network_errors_at_startup, :bool, default: false + desc 'Verify that a connection can be made with one of out_forward nodes at the time of startup.' + config_param :verify_connection_at_startup, :bool, default: false + desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip], default: :text @@ -92,7 +95,8 @@ class ConnectionClosedError < Error; end config_param :tls_verify_hostname, :bool, default: true desc 'The additional CA certificate path for TLS.' config_param :tls_ca_cert_path, :array, value_type: :string, default: nil - config_param :tls_cert_path, :array, value_type: :string, default: nil, deprecated: "Use tls_ca_cert_path instead" + desc 'The additional certificate path for TLS.' + config_param :tls_cert_path, :array, value_type: :string, default: nil desc 'The client certificate path for TLS.' config_param :tls_client_cert_path, :string, default: nil desc 'The client private key path for TLS.' @@ -173,7 +177,7 @@ def configure(conf) end if @transport == :tls - # for backward compatibility + # socket helper adds CA cert or signed certificate to same cert store internally so unify it in this place. if @tls_cert_path && !@tls_cert_path.empty? @tls_ca_cert_path = @tls_cert_path end @@ -264,6 +268,17 @@ def start @sock_ack_waiting = [] thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end + + if @verify_connection_at_startup + @nodes.each do |node| + begin + node.verify_connection + rescue StandardError => e + log.fatal "forward's connection setting error: #{e.message}" + raise Fluent::UnrecoverableError, e.message + end + end + end end def close @@ -582,6 +597,19 @@ def standby? @standby end + def verify_connection + sock = @sender.create_transfer_socket(resolved_host, port, @hostname) + begin + ri = RequestInfo.new(@sender.security ? :helo : :established) + if ri.state != :established + establish_connection(sock, ri) + raise if ri.state != :established + end + ensure + sock.close + end + end + def establish_connection(sock, ri) while available? && ri.state != :established begin diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 66077254c9..fad8c2fe9e 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1222,29 +1222,29 @@ def update_retry_state(chunk_id, using_secondary, error = nil) # @retry exists - if error - if @retry.limit? + if @retry.limit? + if error records = @buffer.queued_records msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." log.error msg, retry_times: @retry.steps, records: records, error: error log.error_backtrace error.backtrace - elsif using_secondary - msg = "failed to flush the buffer with secondary output." - log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - else - msg = "failed to flush the buffer." - log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace end - end - - if @retry.limit? @buffer.clear_queue! log.debug "buffer queue cleared" @retry = nil else @retry.step + if error + if using_secondary + msg = "failed to flush the buffer with secondary output." + log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error + log.warn_backtrace error.backtrace + else + msg = "failed to flush the buffer." + log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error + log.warn_backtrace error.backtrace + end + end end end end diff --git a/lib/fluent/plugin/parser_nginx.rb b/lib/fluent/plugin/parser_nginx.rb index 9717e1bcbe..e9090ef06d 100644 --- a/lib/fluent/plugin/parser_nginx.rb +++ b/lib/fluent/plugin/parser_nginx.rb @@ -21,7 +21,7 @@ module Plugin class NginxParser < RegexpParser Plugin.register_parser("nginx", self) - config_set_default :expression, /^(?[^ ]*) (?[^ ]*) (?[^ ]*) \[(?