From d2f93cb61a7306c302f503c074eb68861f346a80 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 3 Aug 2022 12:20:06 +0900 Subject: [PATCH 1/7] http_server: Support Async 2.0 gem Signed-off-by: Takuro Ashie Signed-off-by: Shizuo Fujita --- fluentd.gemspec | 1 - lib/fluent/plugin_helper/http_server/server.rb | 14 ++++++++------ test/plugin_helper/test_http_server_helper.rb | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index ff225597d3..195cca94d2 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -63,7 +63,6 @@ Gem::Specification.new do |gem| gem.add_development_dependency("test-unit", ["~> 3.3"]) gem.add_development_dependency("test-unit-rr", ["~> 1.0"]) gem.add_development_dependency("oj", [">= 2.14", "< 4"]) - gem.add_development_dependency("async", "~> 1.23") gem.add_development_dependency("async-http", ">= 0.50.0") gem.add_development_dependency("aws-sigv4", ["~> 1.8"]) gem.add_development_dependency("aws-sdk-core", ["~> 3.191"]) diff --git a/lib/fluent/plugin_helper/http_server/server.rb b/lib/fluent/plugin_helper/http_server/server.rb index a916db9279..f99124a878 100644 --- a/lib/fluent/plugin_helper/http_server/server.rb +++ b/lib/fluent/plugin_helper/http_server/server.rb @@ -39,7 +39,8 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil) scheme = tls_context ? 'https' : 'http' @uri = URI("#{scheme}://#{@addr}:#{@port}").to_s @router = Router.new(default_app) - @reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(@logger)) + @server_task = nil + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) opts = if tls_context { ssl_context: tls_context } @@ -55,23 +56,24 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil) def start(notify = nil) @logger.debug("Start async HTTP server listening #{@uri}") - task = @reactor.run do - @server.run + Async do |task| + @server_task = task.async do + @server.run + end if notify notify.push(:ready) end end - task.stop @logger.debug('Finished HTTP server') end def stop @logger.debug('closing HTTP server') - if @reactor - @reactor.stop + if @server_task + @server_task.stop end end diff --git a/test/plugin_helper/test_http_server_helper.rb b/test/plugin_helper/test_http_server_helper.rb index 1db426c76a..12b97b6d38 100644 --- a/test/plugin_helper/test_http_server_helper.rb +++ b/test/plugin_helper/test_http_server_helper.rb @@ -127,12 +127,12 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr end client = Async::HTTP::Client.new(Async::HTTP::Endpoint.parse("https://#{addr}:#{port}", ssl_context: context)) - reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER)) + Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER) resp = nil error = nil - reactor.run do + Async do |task| begin response = yield(client) rescue => e # Async::Reactor rescue all error. handle it by myself From 6c4c06e98b5c074be5105f4f39295b1e95995450 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 9 Aug 2022 10:18:28 +0900 Subject: [PATCH 2/7] Still stay on async-1.x Because dependent io-event (v1.0.9) can't build on Windows. Signed-off-by: Takuro Ashie Signed-off-by: Shizuo Fujita --- fluentd.gemspec | 1 + 1 file changed, 1 insertion(+) diff --git a/fluentd.gemspec b/fluentd.gemspec index 195cca94d2..ff225597d3 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -63,6 +63,7 @@ Gem::Specification.new do |gem| gem.add_development_dependency("test-unit", ["~> 3.3"]) gem.add_development_dependency("test-unit-rr", ["~> 1.0"]) gem.add_development_dependency("oj", [">= 2.14", "< 4"]) + gem.add_development_dependency("async", "~> 1.23") gem.add_development_dependency("async-http", ">= 0.50.0") gem.add_development_dependency("aws-sigv4", ["~> 1.8"]) gem.add_development_dependency("aws-sdk-core", ["~> 3.191"]) From 343a830f9761e5a63efc59531a08660c84ed59c1 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 15 Feb 2023 08:54:36 +0900 Subject: [PATCH 3/7] test_http_server_helper: Remove a needless argument Signed-off-by: Takuro Ashie Signed-off-by: Shizuo Fujita --- test/plugin_helper/test_http_server_helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin_helper/test_http_server_helper.rb b/test/plugin_helper/test_http_server_helper.rb index 12b97b6d38..f7cdf20ec6 100644 --- a/test/plugin_helper/test_http_server_helper.rb +++ b/test/plugin_helper/test_http_server_helper.rb @@ -132,7 +132,7 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr resp = nil error = nil - Async do |task| + Async do begin response = yield(client) rescue => e # Async::Reactor rescue all error. handle it by myself From bc4e4f561a7dd3b535c5f389b0d28bc8ffa159ac Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 15 Feb 2023 09:26:54 +0900 Subject: [PATCH 4/7] http_server: Replace Console.logger of all related threads Signed-off-by: Takuro Ashie Signed-off-by: Shizuo Fujita --- lib/fluent/plugin_helper/http_server/server.rb | 3 +++ test/plugin_helper/test_http_server_helper.rb | 1 + 2 files changed, 4 insertions(+) diff --git a/lib/fluent/plugin_helper/http_server/server.rb b/lib/fluent/plugin_helper/http_server/server.rb index f99124a878..a7b561e6be 100644 --- a/lib/fluent/plugin_helper/http_server/server.rb +++ b/lib/fluent/plugin_helper/http_server/server.rb @@ -55,10 +55,13 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil) end def start(notify = nil) + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) @logger.debug("Start async HTTP server listening #{@uri}") Async do |task| + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) @server_task = task.async do + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) @server.run end if notify diff --git a/test/plugin_helper/test_http_server_helper.rb b/test/plugin_helper/test_http_server_helper.rb index f7cdf20ec6..905526c207 100644 --- a/test/plugin_helper/test_http_server_helper.rb +++ b/test/plugin_helper/test_http_server_helper.rb @@ -133,6 +133,7 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr error = nil Async do + Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER) begin response = yield(client) rescue => e # Async::Reactor rescue all error. handle it by myself From 5e5ff62a24ffb71fa098db51b1e612c83c8e6314 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Wed, 28 Aug 2024 16:11:18 +0900 Subject: [PATCH 5/7] Revert "Still stay on async-1.x" This reverts commit 895fa6b6ea44aa6d4c8d5b6b3b0155bbf4cb2076. Signed-off-by: Shizuo Fujita --- fluentd.gemspec | 1 - 1 file changed, 1 deletion(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index ff225597d3..195cca94d2 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -63,7 +63,6 @@ Gem::Specification.new do |gem| gem.add_development_dependency("test-unit", ["~> 3.3"]) gem.add_development_dependency("test-unit-rr", ["~> 1.0"]) gem.add_development_dependency("oj", [">= 2.14", "< 4"]) - gem.add_development_dependency("async", "~> 1.23") gem.add_development_dependency("async-http", ">= 0.50.0") gem.add_development_dependency("aws-sigv4", ["~> 1.8"]) gem.add_development_dependency("aws-sdk-core", ["~> 3.191"]) From 50667dd2f55d6c3640fe5000321ee5ecdc87cbad Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Wed, 21 Aug 2024 17:30:43 +0900 Subject: [PATCH 6/7] test_http_server_helper: Fix test Signed-off-by: Shizuo Fujita --- test/plugin_helper/test_http_server_helper.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/plugin_helper/test_http_server_helper.rb b/test/plugin_helper/test_http_server_helper.rb index 905526c207..24312a9843 100644 --- a/test/plugin_helper/test_http_server_helper.rb +++ b/test/plugin_helper/test_http_server_helper.rb @@ -132,7 +132,7 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr resp = nil error = nil - Async do + Sync do Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER) begin response = yield(client) @@ -141,7 +141,7 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr end if response - resp = Response.new(response.status.to_s, response.body.read, response.headers) + resp = Response.new(response.status.to_s, response.read, response.headers) end end From 483be9528669cfd21224fed62f12ec86fedc9269 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Wed, 28 Aug 2024 18:07:30 +0900 Subject: [PATCH 7/7] http_server: Stop http server safely Signed-off-by: Shizuo Fujita --- lib/fluent/plugin_helper/http_server/server.rb | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin_helper/http_server/server.rb b/lib/fluent/plugin_helper/http_server/server.rb index a7b561e6be..50df177e83 100644 --- a/lib/fluent/plugin_helper/http_server/server.rb +++ b/lib/fluent/plugin_helper/http_server/server.rb @@ -67,6 +67,12 @@ def start(notify = nil) if notify notify.push(:ready) end + + if async_v2? + @server_task_queue = ::Thread::Queue.new + @server_task_queue.pop + @server_task&.stop + end end @logger.debug('Finished HTTP server') @@ -74,9 +80,10 @@ def start(notify = nil) def stop @logger.debug('closing HTTP server') - - if @server_task - @server_task.stop + if async_v2? + @server_task_queue&.push(:stop) + else + @server_task&.stop end end @@ -93,6 +100,10 @@ def stop @router.mount(name, path, app || block) end end + + private def async_v2? + Gem::Version.new(Async::VERSION) >= Gem::Version.new('2.0') + end end end end