diff --git a/fluentd.gemspec b/fluentd.gemspec index ff225597d3..195cca94d2 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -63,7 +63,6 @@ Gem::Specification.new do |gem| gem.add_development_dependency("test-unit", ["~> 3.3"]) gem.add_development_dependency("test-unit-rr", ["~> 1.0"]) gem.add_development_dependency("oj", [">= 2.14", "< 4"]) - gem.add_development_dependency("async", "~> 1.23") gem.add_development_dependency("async-http", ">= 0.50.0") gem.add_development_dependency("aws-sigv4", ["~> 1.8"]) gem.add_development_dependency("aws-sdk-core", ["~> 3.191"]) diff --git a/lib/fluent/plugin_helper/http_server/server.rb b/lib/fluent/plugin_helper/http_server/server.rb index a916db9279..50df177e83 100644 --- a/lib/fluent/plugin_helper/http_server/server.rb +++ b/lib/fluent/plugin_helper/http_server/server.rb @@ -39,7 +39,8 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil) scheme = tls_context ? 'https' : 'http' @uri = URI("#{scheme}://#{@addr}:#{@port}").to_s @router = Router.new(default_app) - @reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(@logger)) + @server_task = nil + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) opts = if tls_context { ssl_context: tls_context } @@ -54,24 +55,35 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil) end def start(notify = nil) + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) @logger.debug("Start async HTTP server listening #{@uri}") - task = @reactor.run do - @server.run + Async do |task| + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) + @server_task = task.async do + Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger) + @server.run + end if notify notify.push(:ready) end + + if async_v2? + @server_task_queue = ::Thread::Queue.new + @server_task_queue.pop + @server_task&.stop + end end - task.stop @logger.debug('Finished HTTP server') end def stop @logger.debug('closing HTTP server') - - if @reactor - @reactor.stop + if async_v2? + @server_task_queue&.push(:stop) + else + @server_task&.stop end end @@ -88,6 +100,10 @@ def stop @router.mount(name, path, app || block) end end + + private def async_v2? + Gem::Version.new(Async::VERSION) >= Gem::Version.new('2.0') + end end end end diff --git a/test/plugin_helper/test_http_server_helper.rb b/test/plugin_helper/test_http_server_helper.rb index 1db426c76a..24312a9843 100644 --- a/test/plugin_helper/test_http_server_helper.rb +++ b/test/plugin_helper/test_http_server_helper.rb @@ -127,12 +127,13 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr end client = Async::HTTP::Client.new(Async::HTTP::Endpoint.parse("https://#{addr}:#{port}", ssl_context: context)) - reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER)) + Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER) resp = nil error = nil - reactor.run do + Sync do + Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER) begin response = yield(client) rescue => e # Async::Reactor rescue all error. handle it by myself @@ -140,7 +141,7 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr end if response - resp = Response.new(response.status.to_s, response.body.read, response.headers) + resp = Response.new(response.status.to_s, response.read, response.headers) end end