From 812f9bae92a0c0eab06e93109d8657cd815f41c5 Mon Sep 17 00:00:00 2001 From: bootjp Date: Wed, 14 Nov 2018 19:03:42 +0900 Subject: [PATCH 01/14] Add connection check at startup when out_forward. Signed-off-by: bootjp --- lib/fluent/plugin/out_forward.rb | 27 +++++++ test/plugin/test_out_forward.rb | 120 +++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index ca25092ccd..8570ecc429 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -77,6 +77,9 @@ class ConnectionClosedError < Error; end desc 'Ignore DNS resolution and errors at startup time.' config_param :ignore_network_errors_at_startup, :bool, default: false + desc 'Verify that a connection can be made with one of out_forward nodes at the time of startup.' + config_param :verify_connection_at_startup, :bool, default: false + desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip], default: :text @@ -253,6 +256,17 @@ def start @sock_ack_waiting = [] thread_create(:out_forward_receiving_ack, &method(:ack_reader)) end + + if @verify_connection_at_startup + @nodes.each do |node| + begin + node.verify_connection + rescue StandardError => e + log.fatal e.message + raise Fluent::UnrecoverableError, e.message + end + end + end end def close @@ -568,6 +582,19 @@ def standby? @standby end + def verify_connection + sock = @sender.create_transfer_socket(resolved_host, port, @hostname) + begin + ri = RequestInfo.new(@sender.security ? :helo : :established) + if ri.state != :established + establish_connection(sock, ri) + raise if ri.state != :established + end + ensure + sock.close + end + end + def establish_connection(sock, ri) while available? && ri.state != :established begin diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 7a14a5614b..ac372abd72 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -217,6 +217,18 @@ def read_ack_from_sock(sock, unpacker) assert_equal 2, d.instance.ack_response_timeout end + test 'verify_connection_at_startup is disabled in default' do + @d = d = create_driver(CONFIG) + assert_equal false, d.instance.verify_connection_at_startup + end + + test 'verify_connection_at_startup can be enabled' do + @d = d = create_driver(CONFIG + %[ + verify_connection_at_startup true + ]) + assert_equal true, d.instance.verify_connection_at_startup + end + test 'send tags in str (utf-8 strings)' do target_input_driver = create_target_input_driver @@ -774,4 +786,112 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG i.configure(conf) end end + + test 'verify_connection_at_startup test' do + @d = d = create_driver(CONFIG + %[ + verify_connection_at_startup true + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + + ]) + assert_raise Fluent::UnrecoverableError do + d.instance_start + end + d.instance_shutdown + end + + test 'authentication_with_shared_key_miss_match' do + input_conf = TARGET_CONFIG + %[ + + self_hostname in.localhost + shared_key fluentd-sharedkey + + ] + target_input_driver = create_target_input_driver(conf: input_conf) + + output_conf = %[ + send_timeout 30 + heartbeat_type transport + transport tls + tls_verify_hostname false + verify_connection_at_startup true + require_ack_response true + ack_response_timeout 5s + + self_hostname localhost + shared_key key_miss_match + + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + flush_thread_interval 31s + + + + host #{TARGET_HOST} + port #{TARGET_PORT} + + ] + @d = d = create_driver(output_conf) + + target_input_driver.run(expect_records: 1, timeout: 15) do + assert_raise Fluent::UnrecoverableError do + d.instance_start + end + d.instance_shutdown + end + end + + test 'authentication_with_shared_key_match' do + input_conf = TARGET_CONFIG + %[ + + self_hostname in.localhost + shared_key fluentd-sharedkey + + host 127.0.0.1 + + + ] + target_input_driver = create_target_input_driver(conf: input_conf) + + output_conf = %[ + send_timeout 51 + verify_connection_at_startup true + + self_hostname localhost + shared_key fluentd-sharedkey + + + name test + host #{TARGET_HOST} + port #{TARGET_PORT} + shared_key fluentd-sharedkey + + ] + @d = d = create_driver(output_conf) + + time = event_time("2011-01-02 13:14:15 UTC") + records = [ + {"a" => 1}, + {"a" => 2} + ] + + target_input_driver.run(expect_records: 2, timeout: 15) do + d.run(default_tag: 'test') do + records.each do |record| + d.feed(time, record) + end + end + end + + events = target_input_driver.events + assert{ events != [] } + assert_equal(['test', time, records[0]], events[0]) + assert_equal(['test', time, records[1]], events[1]) + end end From ef2e4b796a462252e9d0e8606dcf2776b9ec2a79 Mon Sep 17 00:00:00 2001 From: bootjp Date: Fri, 16 Nov 2018 10:24:09 +0900 Subject: [PATCH 02/14] Fix test description. Signed-off-by: bootjp --- test/plugin/test_out_forward.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index ac372abd72..9adb61b06d 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -787,7 +787,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG end end - test 'verify_connection_at_startup test' do + test 'verify_connection_at_startup_nodes_are_not_available' do @d = d = create_driver(CONFIG + %[ verify_connection_at_startup true @@ -803,7 +803,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG d.instance_shutdown end - test 'authentication_with_shared_key_miss_match' do + test 'verify_connection_at_startup_nodes_shared_key_miss_match' do input_conf = TARGET_CONFIG + %[ self_hostname in.localhost @@ -847,7 +847,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG end end - test 'authentication_with_shared_key_match' do + test 'verify_connection_at_startup_nodes_shared_key_match' do input_conf = TARGET_CONFIG + %[ self_hostname in.localhost From 77d8943eea0043e3302eb126f948a0556d42d8ab Mon Sep 17 00:00:00 2001 From: bootjp Date: Thu, 22 Nov 2018 12:57:18 +0900 Subject: [PATCH 03/14] Fix error message. Signed-off-by: bootjp --- lib/fluent/plugin/out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 8570ecc429..780545d5a9 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -262,7 +262,7 @@ def start begin node.verify_connection rescue StandardError => e - log.fatal e.message + log.fatal "forward's connection setting error: #{e.message}" raise Fluent::UnrecoverableError, e.message end end From 8a7022dcc2d6f95cdc2b99f4637465823efe7dd5 Mon Sep 17 00:00:00 2001 From: bootjp Date: Thu, 22 Nov 2018 13:01:26 +0900 Subject: [PATCH 04/14] Fix style of test case. Signed-off-by: bootjp --- test/plugin/test_out_forward.rb | 87 +++++++++++++++++---------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 9adb61b06d..946f303fb2 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -219,14 +219,14 @@ def read_ack_from_sock(sock, unpacker) test 'verify_connection_at_startup is disabled in default' do @d = d = create_driver(CONFIG) - assert_equal false, d.instance.verify_connection_at_startup + assert_false d.instance.verify_connection_at_startup end test 'verify_connection_at_startup can be enabled' do @d = d = create_driver(CONFIG + %[ verify_connection_at_startup true ]) - assert_equal true, d.instance.verify_connection_at_startup + assert_true d.instance.verify_connection_at_startup end test 'send tags in str (utf-8 strings)' do @@ -787,7 +787,8 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG end end - test 'verify_connection_at_startup_nodes_are_not_available' do + sub_test_case 'verify_connection_at_startup' do + test 'nodes are not available' do @d = d = create_driver(CONFIG + %[ verify_connection_at_startup true @@ -797,57 +798,56 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG flush_at_shutdown false # suppress errors in d.instance_shutdown ]) - assert_raise Fluent::UnrecoverableError do - d.instance_start + assert_raise Fluent::UnrecoverableError do + d.instance_start + end + d.instance_shutdown end - d.instance_shutdown - end - test 'verify_connection_at_startup_nodes_shared_key_miss_match' do - input_conf = TARGET_CONFIG + %[ + test 'nodes_shared_key_miss_match' do + input_conf = TARGET_CONFIG + %[ self_hostname in.localhost shared_key fluentd-sharedkey ] - target_input_driver = create_target_input_driver(conf: input_conf) - - output_conf = %[ - send_timeout 30 - heartbeat_type transport - transport tls - tls_verify_hostname false - verify_connection_at_startup true - require_ack_response true - ack_response_timeout 5s - - self_hostname localhost - shared_key key_miss_match - - - flush_mode immediate - retry_type periodic - retry_wait 30s - flush_at_shutdown false # suppress errors in d.instance_shutdown - flush_thread_interval 31s - + target_input_driver = create_target_input_driver(conf: input_conf) + output_conf = %[ + send_timeout 30 + heartbeat_type transport + transport tls + tls_verify_hostname false + verify_connection_at_startup true + require_ack_response true + ack_response_timeout 5s + + self_hostname localhost + shared_key key_miss_match + + + flush_mode immediate + retry_type periodic + retry_wait 30s + flush_at_shutdown false # suppress errors in d.instance_shutdown + flush_thread_interval 31s + - - host #{TARGET_HOST} - port #{TARGET_PORT} - - ] - @d = d = create_driver(output_conf) + + host #{TARGET_HOST} + port #{TARGET_PORT} + + ] + @d = d = create_driver(output_conf) - target_input_driver.run(expect_records: 1, timeout: 15) do - assert_raise Fluent::UnrecoverableError do - d.instance_start + target_input_driver.run(expect_records: 1, timeout: 15) do + assert_raise Fluent::UnrecoverableError do + d.instance_start + end + d.instance_shutdown end - d.instance_shutdown end - end - test 'verify_connection_at_startup_nodes_shared_key_match' do + test 'nodes_shared_key_match' do input_conf = TARGET_CONFIG + %[ self_hostname in.localhost @@ -872,7 +872,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG port #{TARGET_PORT} shared_key fluentd-sharedkey - ] + ] @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") @@ -894,4 +894,5 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG assert_equal(['test', time, records[0]], events[0]) assert_equal(['test', time, records[1]], events[1]) end + end end From c4d0a88b7f6b2772be66f84d5b6590cd03ef5310 Mon Sep 17 00:00:00 2001 From: Tatsuki Sugiura Date: Thu, 22 Nov 2018 13:14:04 +0900 Subject: [PATCH 05/14] Keep `class` method to avoid error on Sigdump.dump. When stopping fluentd, sometime I get follwoing error message on stderr; ``` Unexpected error undefined method `class' for # /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:74:in `block in dump_object_count' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:73:in `each_object' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:73:in `dump_object_count' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:18:in `block in dump' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:136:in `open' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:136:in `_open_dump_path' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/sigdump-0.2.4/lib/sigdump.rb:14:in `dump' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/serverengine-2.0.7/lib/serverengine/server.rb:74:in `dump' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/serverengine-2.0.7/lib/serverengine/server.rb:112:in `block (2 levels) in install_signal_handlers' /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/serverengine-2.0.7/lib/serverengine/signal_thread.rb:96:in `main' ``` This cause Sigdump.dump fails with CleanroomExpander because it will call `class` method for all objects to gathering stats. This patch just keep `class` method to avoid error. Signed-off-by: Tatsuki Sugiura --- lib/fluent/plugin/filter_record_transformer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/filter_record_transformer.rb b/lib/fluent/plugin/filter_record_transformer.rb index c6a74b2fd4..02ba686a39 100644 --- a/lib/fluent/plugin/filter_record_transformer.rb +++ b/lib/fluent/plugin/filter_record_transformer.rb @@ -316,7 +316,7 @@ def expand(__str_to_eval__, tag, time, record, tag_parts, tag_prefix, tag_suffix end (Object.instance_methods).each do |m| - undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ + undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member|^class$/ end end end From 32483a7f2617742efc466bae3f03fb6afbf0190a Mon Sep 17 00:00:00 2001 From: Konstantin Grachev Date: Sun, 25 Nov 2018 16:24:28 +0300 Subject: [PATCH 06/14] [Bugfix] Nginx Parser: http_x_forwarded_for can contain multiple IP divided by comma Signed-off-by: Konstantin Grachev --- lib/fluent/plugin/parser_nginx.rb | 2 +- test/plugin/test_parser_nginx.rb | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/parser_nginx.rb b/lib/fluent/plugin/parser_nginx.rb index 9717e1bcbe..e9090ef06d 100644 --- a/lib/fluent/plugin/parser_nginx.rb +++ b/lib/fluent/plugin/parser_nginx.rb @@ -21,7 +21,7 @@ module Plugin class NginxParser < RegexpParser Plugin.register_parser("nginx", self) - config_set_default :expression, /^(?[^ ]*) (?[^ ]*) (?[^ ]*) \[(?