Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_server: Ready to support Async 2.0 gem #4619

Merged
merged 7 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ Gem::Specification.new do |gem|
gem.add_development_dependency("test-unit", ["~> 3.3"])
gem.add_development_dependency("test-unit-rr", ["~> 1.0"])
gem.add_development_dependency("oj", [">= 2.14", "< 4"])
gem.add_development_dependency("async", "~> 1.23")
gem.add_development_dependency("async-http", ">= 0.50.0")
gem.add_development_dependency("aws-sigv4", ["~> 1.8"])
gem.add_development_dependency("aws-sdk-core", ["~> 3.191"])
Expand Down
30 changes: 23 additions & 7 deletions lib/fluent/plugin_helper/http_server/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil)
scheme = tls_context ? 'https' : 'http'
@uri = URI("#{scheme}://#{@addr}:#{@port}").to_s
@router = Router.new(default_app)
@reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(@logger))
@server_task = nil
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)

opts = if tls_context
{ ssl_context: tls_context }
Expand All @@ -54,24 +55,35 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil)
end

def start(notify = nil)
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)
@logger.debug("Start async HTTP server listening #{@uri}")
task = @reactor.run do
@server.run

Async do |task|
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)
@server_task = task.async do
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)
@server.run
end
if notify
notify.push(:ready)
end

if async_v2?
@server_task_queue = ::Thread::Queue.new
@server_task_queue.pop
@server_task&.stop
end
end

task.stop
@logger.debug('Finished HTTP server')
end

def stop
@logger.debug('closing HTTP server')

if @reactor
@reactor.stop
if async_v2?
@server_task_queue&.push(:stop)
else
@server_task&.stop
end
end

Expand All @@ -88,6 +100,10 @@ def stop
@router.mount(name, path, app || block)
end
end

private def async_v2?
Gem::Version.new(Async::VERSION) >= Gem::Version.new('2.0')
end
end
end
end
Expand Down
7 changes: 4 additions & 3 deletions test/plugin_helper/test_http_server_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,21 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr
end

client = Async::HTTP::Client.new(Async::HTTP::Endpoint.parse("https://#{addr}:#{port}", ssl_context: context))
reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER))
Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER)

resp = nil
error = nil

reactor.run do
Sync do
Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER)
begin
response = yield(client)
rescue => e # Async::Reactor rescue all error. handle it by myself
error = e
end

if response
resp = Response.new(response.status.to_s, response.body.read, response.headers)
resp = Response.new(response.status.to_s, response.read, response.headers)
end
end

Expand Down