Skip to content

Commit

Permalink
CI: Fix unstable tests of out_forward (#4436)
Browse files Browse the repository at this point in the history
Backported from 5d18f35

* Avoid calling `instance_start` in duplicate.
* Avoid not calling `instance_shutdown`.

To fix the following error, which sometimes occur.
I'm not sure this actually fixes it, but, at least, we should fix the
points above.

    Error: test: Create new connection per send_data(ForwardOutputTest): ArgumentError: expected loop to be an instance of Coolio::Loop, not nil
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/io.rb:35:in `attach'
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/io.rb:35:in `attach'
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/socket.rb:39:in `attach'
    (eval):7:in `attach'
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/server.rb:40:in `on_connection'
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/listener.rb:65:in `on_readable'
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/loop.rb:88:in `run_once'
    C:/hostedtoolcache/windows/Ruby/3.2.3/x64/lib/ruby/gems/3.2.0/gems/cool.io-1.8.0/lib/cool.io/loop.rb:88:in `run'
    D:/a/fluentd/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
    D:/a/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Takuro Ashie <[email protected]>
  • Loading branch information
daipom and ashie authored Mar 14, 2024
1 parent 00aec89 commit e668920
Showing 1 changed file with 25 additions and 37 deletions.
62 changes: 25 additions & 37 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1248,27 +1248,22 @@ def plugin_id_for_test?
target_input_driver = create_target_input_driver(conf: target_config)
output_conf = config
d = create_driver(output_conf)
d.instance_start

begin
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.twice
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.twice

target_input_driver.run(timeout: 15) do
d.run(shutdown: false) do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
target_input_driver.run(timeout: 15) do
d.run do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
end
ensure
d.instance_shutdown
end
end

Expand All @@ -1282,7 +1277,6 @@ def plugin_id_for_test?
port #{@target_port}
</server>
])
d.instance_start
assert_nothing_raised { d.run }
end

Expand All @@ -1294,33 +1288,28 @@ def plugin_id_for_test?
keepalive_timeout 2
]
d = create_driver(output_conf)
d.instance_start

begin
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.once
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.once

target_input_driver.run(timeout: 15) do
d.run(shutdown: false) do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
target_input_driver.run(timeout: 15) do
d.run do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
end
ensure
d.instance_shutdown
end
end

test 'create timer of purging obsolete sockets' do
output_conf = config + %[keepalive true]
d = create_driver(output_conf)
@d = d = create_driver(output_conf)

mock(d.instance).timer_execute(:out_forward_heartbeat_request, 1).once
mock(d.instance).timer_execute(:out_forward_keep_alived_socket_watcher, 5).once
Expand All @@ -1336,7 +1325,6 @@ def plugin_id_for_test?
keepalive_timeout 2
]
d = create_driver(output_conf)
d.instance_start

chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
Expand Down

0 comments on commit e668920

Please sign in to comment.