From 141538ced27226ae4681f4bc1e762407cbf0c11b Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 2 Jun 2021 16:59:10 +0900 Subject: [PATCH] out_forward: Fix a race condition on handshake Fix the following failed test: https://github.com/fluent/fluentd/runs/2725344894 ``` 2021-06-02T06:50:53.7067710Z Failure: test: Node with security is thread-safe on multi threads(ForwardOutputTest): 2021-06-02T06:50:53.7069440Z '2021-06-02 06:50:53 +0000 [warn]: connection refused to test: invalid format for PONG message' happens. 2021-06-02T06:50:53.7070450Z expected but was 2021-06-02T06:50:53.7071110Z 2021-06-02T06:50:53.7072550Z /Users/runner/work/fluentd/fluentd/test/plugin/test_out_forward.rb:1017:in `block in ' 2021-06-02T06:50:53.7074360Z 1014: end 2021-06-02T06:50:53.7075180Z 1015: 2021-06-02T06:50:53.7075860Z 1016: logs = d.logs 2021-06-02T06:50:53.7078420Z => 1017: assert_false(logs.any? { |log| log.include?("invalid format for PONG message") || log.include?("shared key mismatch") }, "'#{logs.last.strip}' happens") 2021-06-02T06:50:53.7079580Z 1018: end 2021-06-02T06:50:53.7080160Z 1019: 2021-06-02T06:50:53.7081050Z 1020: def create_target_input_driver(response_stub: nil, disconnect: false, conf: target_config) ``` Signed-off-by: Takuro Ashie --- lib/fluent/plugin/out_forward.rb | 5 ++--- test/plugin/test_out_forward.rb | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index c4e0ccc418..7d8fc67424 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -565,8 +565,6 @@ def initialize(sender, server, failure:, connection_manager:, ack_handler:) username: server.username || '', ) - @unpacker = Fluent::MessagePackFactory.msgpack_unpacker - @resolved_host = nil @resolved_time = 0 @resolved_once = false @@ -613,7 +611,8 @@ def establish_connection(sock, ri) sleep @sender.read_interval next end - @unpacker.feed_each(buf) do |data| + unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + unpacker.feed_each(buf) do |data| if @handshake.invoke(sock, ri, data) == :established @log.debug "connection established", host: @host, port: @port end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 16fd2873be..0b0eb53869 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1014,7 +1014,9 @@ def try_write(chunk) end logs = d.logs - assert_false(logs.any? { |log| log.include?("invalid format for PONG message") || log.include?("shared key mismatch") }, "'#{logs.last.strip}' happens") + assert_false(logs.any? { |log| + log.include?("invalid format for PONG message") || log.include?("shared key mismatch") + }, "Actual log:\n#{logs.join}") end def create_target_input_driver(response_stub: nil, disconnect: false, conf: target_config)