Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add authentication feature to in_forward #813

Closed
wants to merge 81 commits into from
Closed

Conversation

okkez
Copy link
Contributor

@okkez okkez commented Feb 25, 2016

I've added authentication feature to in_forward based on v0.14 branch.
in_forward test is green but out_forward test is red for now.

@tagomoris
Copy link
Member

Is it still WIP?

@cosmo0920
Copy link
Contributor

Yes, this PR is still WIP.

@kou
Copy link
Contributor

kou commented Mar 12, 2016

The following change may help you:

diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 12693c7..ad7474c 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -336,7 +336,11 @@ module Fluent
             # To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses
             # and process them asynchronously.
             if IO.select([sock], nil, nil, @ack_response_timeout)
-              raw_data = sock.recv(1024)
+              begin
+                raw_data = sock.recv(1024)
+              rescue Errno::ECONNRESET
+                raw_data = ""
+              end

               # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
               # If this happens we assume the data wasn't delivered and retry it.
diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb
index 089005f..d5e3b72 100644
--- a/lib/fluent/test/base.rb
+++ b/lib/fluent/test/base.rb
@@ -56,7 +56,11 @@ module Fluent
         else
           @instance = klass
         end
-        @instance.router = Engine.root_agent.event_router
+        @instance.router = Object.new
+        router = @instance.router
+        def router.method_missing(name, *args, **kw_args, &block)
+          Engine.root_agent.event_router.__send__(name, *args, **kw_args, &block)
+        end
         @instance.log = TestLogger.new
         Engine.root_agent.instance_variable_set(:@log, @instance.log)

diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index 909d5bb..b2781e4 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -259,7 +259,7 @@ class ForwardOutputTest < Test::Unit::TestCase
   end

   def test_require_a_node_supporting_responses_to_respond_with_ack
-    target_input_driver = create_target_input_driver(true)
+    target_input_driver = create_target_input_driver

     d = create_driver(CONFIG + %[
       flush_interval 1s
@@ -296,7 +296,7 @@ class ForwardOutputTest < Test::Unit::TestCase

   def test_require_a_node_not_supporting_responses_to_respond_with_ack
     # in_forward, that doesn't support ack feature, and keep connection alive
-    target_input_driver = create_target_input_driver
+    target_input_driver = create_target_input_driver(->(options){ nil })

     d = create_driver(CONFIG + %[
       flush_interval 1s
@@ -337,7 +337,7 @@ class ForwardOutputTest < Test::Unit::TestCase
   # bdf1f4f104c00a791aa94dc20087fe2011e1fd83
   def test_require_a_node_not_supporting_responses_2_to_respond_with_ack
     # in_forward, that doesn't support ack feature, and disconnect immediately
-    target_input_driver = create_target_input_driver(false, true)
+    target_input_driver = create_target_input_driver(->(options){ nil }, true)

     d = create_driver(CONFIG + %[
       flush_interval 1s

For lib/fluent/plugin/out_forward.rb change:

sock.recv may raise Errno::ECONNRESET. See the following case:

require "socket"

server = TCPServer.new("127.0.0.1", 0)
client = TCPSocket.new("127.0.0.1", server.addr[1])

server_client = server.accept

client.write("x")
server_client.close

client.recv(1024) # => Errno::ECONNRESET

If sock.recv is raised an exception, the node isn't disabled.

NOTE: ForwardOutputConnectionClosedError in out_forward.rb doesn't exist.

For lib/fluent/test/base.rb change:

We can't use multiple test drivers in a test because we can't hook router's emit_stream multiple times by the following code:

        instance.router.define_singleton_method(:emit_stream) {|tag,es|
          m.call(tag, es)
        }

If we use the singleton method strategy, we should create a new router for each test driver.

okkez@137b9b8 may be a workaround for the limitation. But it may not be a good strategy.

For test/plugin/test_out_forward.rb:

We should follow create_target_test_driver API change if we change the API.

@okkez okkez changed the title [WIP] Add authentication feature to in_forward Add authentication feature to in_forward Mar 14, 2016
@kou
Copy link
Contributor

kou commented Mar 15, 2016

I forgot to mention about my change is just a proof of concept. We should rethink about whether is the implementation reasonable or not. For example, the following code is tricky:

+        @instance.router = Object.new
+        router = @instance.router
+        def router.method_missing(name, *args, **kw_args, &block)
+          Engine.root_agent.event_router.__send__(name, *args, **kw_args, &block)
+        end

We should create proxy class and instantiate it. Or create a new router for each driver.

@tagomoris
Copy link
Member

CI in Windows environment is failing. This is caused by read_nonblock and some other operations like that. But read_nonblock in Windows environment is supported at ruby 2.3, so that CI (w/ ruby 2.1 and 2.2) are failing.

Could you fix tests in any way below?:

  • fix tests without read_nonblock
  • skip related tests in Windows environment (it's better to have ruby version check plus environment check)

@okkez
Copy link
Contributor Author

okkez commented Apr 6, 2016

OK, I will fix in a few days.

@tagomoris
Copy link
Member

Ah, it's my mistake that: read_nonblock in Windows was officially supported at Ruby 2.3, but it has existed in past ruby versions. So we cannot check it by respond_to?(:read_nonblock) even in older versions & Windows.

@tagomoris
Copy link
Member

@okkez ping

@okkez
Copy link
Contributor Author

okkez commented Apr 26, 2016

How about skip if Fluent.windows? && RUBY_VERSION < "2.3.0"?

@okkez
Copy link
Contributor Author

okkez commented Apr 28, 2016

Rebased

# '' : socket is disconnected without any data
# nil: socket read timeout
def read_data(io, timeout)
skip if Fluent.windows? && RUBY_VERSION < "2.3.0"
Copy link
Contributor

@cosmo0920 cosmo0920 May 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line causes errors in AppVeyor: e.g. https://ci.appveyor.com/project/fluent/fluentd/build/656/job/o4qg0yn2w44a32a6#L1485
Perhaps, it should be omit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks correct > omit instead of skip

@tagomoris
Copy link
Member

@okkez Could you rebase once again and fix to use omit?
I think it's time to merge this fix.

@okkez
Copy link
Contributor Author

okkez commented Jun 14, 2016

OK, I will rebase tomorrow.

@okkez okkez force-pushed the add-auth branch 2 times, most recently from 59c5444 to 389b7a4 Compare June 15, 2016 05:56
@okkez
Copy link
Contributor Author

okkez commented Jun 16, 2016

Rebased.
Passed TravisCI.

@okkez
Copy link
Contributor Author

okkez commented Jun 16, 2016

Failed CI on Appveyor, why?

@tagomoris
Copy link
Member

Some failures and warnings are actually occurring.

@okkez
Copy link
Contributor Author

okkez commented Jun 16, 2016

1F1E log.

    test_time:                      C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:238: warning: instance variable @keepalive not initialized 
C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:576: warning: instance variable @writing not initialized 
C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:238: warning: instance variable @keepalive not initialized 
C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:576: warning: instance variable @writing not initialized 
C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:238: warning: instance variable @keepalive not initialized 
C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:576: warning: instance variable @writing not initialized 
F 
=============================================================================== 
Failure: test_time(ForwardInputTest::Message) 
C:/projects/fluentd/test/plugin/test_in_forward.rb:97:in `test_time' 
      94:           send_data packer.write([tag, 0, record]).to_s 
      95:         } 
      96:       end 
  =>  97:       assert_equal(records, d.emits.sort_by {|a| a[0] }) 
      98:     end 
      99:  
     100:     def test_plain 
<[["tag1", #<Fluent::EventTime:0x4c6ff68 @nsec=0, @sec=1293974055>, {"a"=>1}],
 ["tag2", #<Fluent::EventTime:0x4c6ff68 @nsec=0, @sec=1293974055>, {"a"=>2}]]> expected but was 
<[["tag1", #<Fluent::EventTime:0x4c67aa8 @nsec=0, @sec=1293974055>, {"a"=>1}],
 ["tag1", #<Fluent::EventTime:0x4c6ff68 @nsec=0, @sec=1293974055>, {"a"=>1}],
 ["tag2", #<Fluent::EventTime:0x4c6ff68 @nsec=0, @sec=1293974055>, {"a"=>2}]]> 

diff: 
+ [["tag1", #<Fluent::EventTime:0x4c67aa8 @nsec=0, @sec=1293974055>, {"a"=>1}], 
? [["tag1", #<Fluent::EventTime:0x4c6ff68 @nsec=0, @sec=1293974055>, {"a"=>1}], 
?                                                                               
   ["tag2", #<Fluent::EventTime:0x4c6ff68 @nsec=0, @sec=1293974055>, {"a"=>2}]] 
=============================================================================== 
: (1.203076) 
  test_require_a_node_supporting_responses_to_respond_with_ack:C:/projects/fluentd/lib/fluent/plugin/in_forward.rb:238: warning: instance variable @keepalive not initialized 
E 
===============================================================================
Error: test_require_a_node_supporting_responses_to_respond_with_ack(ForwardOutputTest): Fluent::ForwardOutputConnectionClosedError: node 127.0.0.1:1113 closed connection
C:/projects/fluentd/lib/fluent/plugin/out_forward.rb:360:in `send_data'
C:/projects/fluentd/test/plugin/test_out_forward.rb:41:in `send_data'
C:/projects/fluentd/lib/fluent/plugin/out_forward.rb:219:in `block in write_objects'
C:/projects/fluentd/lib/fluent/plugin/out_forward.rb:213:in `times'
C:/projects/fluentd/lib/fluent/plugin/out_forward.rb:213:in `write_objects'
C:/projects/fluentd/lib/fluent/compat/output.rb:458:in `write'
C:/projects/fluentd/lib/fluent/compat/output.rb:119:in `write'
C:/projects/fluentd/lib/fluent/test/output_test.rb:94:in `block in run'
C:/projects/fluentd/lib/fluent/test/input_test.rb:123:in `call'
C:/projects/fluentd/lib/fluent/test/input_test.rb:123:in `block in run'
C:/projects/fluentd/lib/fluent/test/base.rb:81:in `run' 
C:/projects/fluentd/lib/fluent/test/input_test.rb:122:in `run'
C:/projects/fluentd/lib/fluent/test/output_test.rb:76:in `run'
C:/projects/fluentd/test/plugin/test_out_forward.rb:282:in `block in test_require_a_node_supporting_responses_to_respond_with_ack'
     279:     end
     280: 
     281:     target_input_driver.run do
  => 282:       d.run do
     283:         records.each do |record|
     284:           d.emit record, time
     285:         end
C:/projects/fluentd/lib/fluent/test/input_test.rb:123:in `call'
C:/projects/fluentd/lib/fluent/test/input_test.rb:123:in `block in run'
C:/projects/fluentd/lib/fluent/test/base.rb:81:in `run'
C:/projects/fluentd/lib/fluent/test/input_test.rb:122:in `run'
C:/projects/fluentd/test/plugin/test_out_forward.rb:281:in `test_require_a_node_supporting_responses_to_respond_with_ack'
===============================================================================

@okkez
Copy link
Contributor Author

okkez commented Jun 17, 2016

On my local (Windows7 32bit with Ruby 2.2.3), I could not reproduce 1F (test_time).
On the other hand, I can reproduce 1E (test_require_a_node_supporting_responses_to_respond_with_ack).

@okkez
Copy link
Contributor Author

okkez commented Jun 17, 2016

Pass all tests on my local Windows7 32bit.

@@ -50,8 +50,67 @@ def initialize
desc 'Skip an event if incoming event is invalid.'
config_param :skip_invalid_event, :bool, default: false

# TODO check recent fluent-plugin-secure-forward updates
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment.

@tagomoris
Copy link
Member

@okkez I revised your change as #1136, with rebasing on current master branch, and with fixes of tests mainly for Windows environment.
I'm sorry for not merging this pull request. Thank you for great contribution!

@tagomoris tagomoris closed this Aug 23, 2016
@okkez okkez deleted the add-auth branch August 24, 2016 00:47
@okkez
Copy link
Contributor Author

okkez commented Aug 24, 2016

Thank you for revising my PR 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants