diff --git a/shard.lock b/shard.lock index 00cf4ce..f6251c4 100644 --- a/shard.lock +++ b/shard.lock @@ -2,7 +2,7 @@ version: 2.0 shards: ameba: git: https://github.com/crystal-ameba/ameba.git - version: 1.6.1 + version: 1.6.4 amq-protocol: git: https://github.com/cloudamqp/amq-protocol.cr.git @@ -10,5 +10,5 @@ shards: amqp-client: git: https://github.com/cloudamqp/amqp-client.cr.git - version: 1.2.5 + version: 1.3.0 diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index b0036bf..2c3106b 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -31,6 +31,7 @@ end def with_http_server(idle_connection_timeout = 5, &) with_server do |server, amqp_url| http_server = AMQProxy::HTTPServer.new(server, "127.0.0.1", 15673) + spawn { http_server.listen } begin yield http_server, server, amqp_url ensure @@ -45,6 +46,7 @@ def verify_running_amqp! port = UPSTREAM_URL.port || 5762 port = 5671 if tls && UPSTREAM_URL.port.nil? TCPSocket.new(host, port, connect_timeout: 3.seconds).close + puts "AMQP running" rescue Socket::ConnectError STDERR.puts "[ERROR] Specs require a running rabbitmq server on #{host}:#{port}" exit 1 diff --git a/src/amqproxy.cr b/src/amqproxy.cr index e3b019a..9eb827a 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -1,2 +1,15 @@ require "./amqproxy/cli" +{% begin %} + {% + flags = [] of String + flags << "-Dpreview_mt" if flag?(:preview_mt) + flags << "-Dmt" if flag?(:mt) + flags << "-Dexecution_context" if flag?(:execution_context) + flags << "-Dtracing" if flag?(:tracing) + flags << "--release" if flag?(:release) + flags << "--static" if flag?(:static) + flags << "--debug" if flag?(:debug) + %} + puts "Built with #{Crystal::VERSION} #{Crystal::BUILD_COMMIT} {{flags.join(" ").id}}" +{% end %} AMQProxy::CLI.new.run(ARGV) diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..7d9cced 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -119,9 +119,9 @@ class AMQProxy::CLI server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) - HTTPServer.new(server, @listen_address, @http_port.to_i) + http_server = HTTPServer.new(server, @listen_address, @http_port.to_i) + spawn http_server.listen, name: "HTTP Server" server.listen(@listen_address, @listen_port.to_i) - shutdown # wait until all client connections are closed @@ -133,11 +133,12 @@ class AMQProxy::CLI @first_shutdown = true - def initiate_shutdown(_s : Signal) + def initiate_shutdown(signal : Signal) unless server = @server exit 0 end if @first_shutdown + Log.info { "Shutting down due to signal #{signal}" } @first_shutdown = false server.stop_accepting_clients else diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index ffdc8a6..73a955d 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -40,9 +40,13 @@ module AMQProxy end end + private def with_channel_map(&) + yield @channel_map + end + private def finish_publish(channel) buffer = @publish_buffers[channel] - if upstream_channel = @channel_map[channel] + if upstream_channel = with_channel_map &.[channel] upstream_channel.write(buffer.publish) upstream_channel.write(buffer.header) buffer.bodies.each do |body| @@ -61,6 +65,7 @@ module AMQProxy socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0 loop do frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + Log.trace { "Received frame: #{frame}" } @last_heartbeat = Time.monotonic case frame when AMQ::Protocol::Frame::Heartbeat # noop @@ -70,13 +75,15 @@ module AMQProxy write AMQ::Protocol::Frame::Connection::CloseOk.new return when AMQ::Protocol::Frame::Channel::Open - raise "Channel already opened" if @channel_map.has_key? frame.channel - upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel)) - @channel_map[frame.channel] = upstream_channel + with_channel_map do |channel_map| + raise "Channel already opened" if channel_map.has_key? frame.channel + upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel)) + channel_map[frame.channel] = upstream_channel + end write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel) when AMQ::Protocol::Frame::Channel::CloseOk # Server closed channel, CloseOk reply to server is already sent - @channel_map.delete(frame.channel) + with_channel_map &.delete(frame.channel) when AMQ::Protocol::Frame::Basic::Publish @publish_buffers[frame.channel] = PublishBuffer.new(frame) when AMQ::Protocol::Frame::Header @@ -92,7 +99,7 @@ module AMQProxy else src_channel = frame.channel begin - if upstream_channel = @channel_map[frame.channel] + if upstream_channel = with_channel_map &.[frame.channel]? upstream_channel.write(frame) else # Channel::Close is sent, waiting for CloseOk @@ -121,7 +128,7 @@ module AMQProxy end end rescue ex : IO::Error - Log.debug { "Disconnected #{ex.inspect}" } + Log.debug(exception: ex) { "Disconnected #{ex.inspect}" } else Log.debug { "Disconnected" } ensure @@ -132,6 +139,7 @@ module AMQProxy # Send frame to client, channel id should already be remapped by the caller def write(frame : AMQ::Protocol::Frame) @lock.synchronize do + Log.trace { "Sending frame: #{frame}" } case frame when AMQ::Protocol::Frame::BytesBody # Upstream might send large frames, split them to support lower client frame_max @@ -149,9 +157,9 @@ module AMQProxy end case frame when AMQ::Protocol::Frame::Channel::Close - @channel_map[frame.channel] = nil + with_channel_map &.[frame.channel] = nil when AMQ::Protocol::Frame::Channel::CloseOk - @channel_map.delete(frame.channel) + with_channel_map &.delete(frame.channel) when AMQ::Protocol::Frame::Connection::CloseOk @socket.close rescue nil end @@ -174,13 +182,15 @@ module AMQProxy end private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED") - @channel_map.each_value do |upstream_channel| - upstream_channel.try &.close(code, reason) - rescue Upstream::WriteError - Log.debug { "Upstream write error while closing client's channels" } - next # Nothing to do + with_channel_map do |channel_map| + channel_map.each_value do |upstream_channel| + upstream_channel.try &.close(code, reason) + rescue Upstream::WriteError + Log.debug { "Upstream write error while closing client's channels" } + next # Nothing to do + end + channel_map.clear end - @channel_map.clear end private def expect_more_frames?(frame) : Bool @@ -221,6 +231,7 @@ module AMQProxy socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice socket.flush socket.close + Log.debug { "Invalid protocol start: #{proto}" } raise IO::EOFError.new("Invalid protocol start") end diff --git a/src/amqproxy/http_server.cr b/src/amqproxy/http_server.cr index 5e6db22..cd97802 100644 --- a/src/amqproxy/http_server.cr +++ b/src/amqproxy/http_server.cr @@ -22,7 +22,6 @@ module AMQProxy end end bind_tcp - spawn @http.listen, name: "HTTP Server" Log.info { "HTTP server listening on #{@address}:#{@port}" } end @@ -31,6 +30,10 @@ module AMQProxy Log.info { "Bound to #{addr}" } end + def listen + @http.listen + end + def metrics(context) writer = PrometheusWriter.new(context.response, "amqproxy") writer.write({name: "identity_info", diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 92af3e9..320daa9 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -12,6 +12,8 @@ module AMQProxy @clients_lock = Mutex.new @clients = Array(Client).new + @channel_pools_lock = Mutex.new + def self.new(url : URI) tls = url.scheme == "amqps" host = url.host || "127.0.0.1" @@ -29,12 +31,18 @@ module AMQProxy Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" } end + private def with_channel_pools(&) + @channel_pools_lock.synchronize do + yield @channel_pools + end + end + def client_connections @clients.size end def upstream_connections - @channel_pools.each_value.sum &.connections + with_channel_pools &.each_value.sum(&.connections) end def listen(address, port) @@ -43,10 +51,11 @@ module AMQProxy def listen(@server : TCPServer) Log.info { "Proxy listening on #{server.local_address}" } + while socket = server.accept? begin - addr = socket.remote_address - spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}" + Log.debug { "Accepted new client from #{socket.remote_address} (#{socket.inspect})" } + handle_connection(socket) rescue IO::Error next end @@ -72,18 +81,22 @@ module AMQProxy end end - private def handle_connection(socket, remote_address) - c = Client.new(socket) - active_client(c) do - channel_pool = @channel_pools[c.credentials] - c.read_loop(channel_pool) + private def handle_connection(socket) + spawn(name: "Client #{socket.remote_address}") do + c = Client.new(socket) + channel_pool = with_channel_pools &.[c.credentials] + remote_address = socket.remote_address + Log.debug { "Client created for #{remote_address}" } + active_client(c) do + c.read_loop(channel_pool) + end + rescue IO::EOFError + # Client closed connection before/while negotiating + rescue ex # only raise from constructor, when negotating + Log.debug(exception: ex) { "Client negotiation failure (#{remote_address}) #{ex.inspect}" } + ensure + socket.close rescue nil end - rescue IO::EOFError - # Client closed connection before/while negotiating - rescue ex # only raise from constructor, when negotating - Log.debug(exception: ex) { "Client negotiation failure (#{remote_address}) #{ex.inspect}" } - ensure - socket.close rescue nil end private def active_client(client, &) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 2b840c8..31c6777 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -9,7 +9,6 @@ module AMQProxy Log = ::Log.for(self) @socket : IO @channels = Hash(UInt16, DownstreamChannel).new - @channels_lock = Mutex.new @channel_max : UInt16 @lock = Mutex.new @remote_address : String @@ -45,10 +44,10 @@ module AMQProxy end private def create_upstream_channel(downstream_channel : DownstreamChannel) - @channels_lock.synchronize do + with_channels do |channels| 1_u16.upto(@channel_max) do |i| - unless @channels.has_key?(i) - @channels[i] = downstream_channel + unless channels.has_key?(i) + channels[i] = downstream_channel return UpstreamChannel.new(self, i) end end @@ -56,12 +55,16 @@ module AMQProxy end end + private def with_channels(&) + yield @channels + end + def close_channel(id, code, reason) send AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16) end def channels - @channels.size + with_channels &.size end # Frames from upstream (to client) @@ -86,15 +89,15 @@ module AMQProxy when AMQ::Protocol::Frame::Channel::OpenOk # assume it always succeeds when AMQ::Protocol::Frame::Channel::Close send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) - if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) } + if downstream_channel = with_channels &.delete(frame.channel) downstream_channel.write frame end when AMQ::Protocol::Frame::Channel::CloseOk # when client requested channel close - if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) } + if downstream_channel = with_channels &.delete(frame.channel) downstream_channel.write(frame) end else - if downstream_channel = @channels_lock.synchronize { @channels[frame.channel]? } + if downstream_channel = with_channels &.[frame.channel]? downstream_channel.write(frame) else Log.debug { "Frame for unmapped channel from upstream: #{frame}" } @@ -114,21 +117,21 @@ module AMQProxy end private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR") - @channels_lock.synchronize do - return if @channels.empty? - Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } - @channels.each_value do |downstream_channel| + with_channels do |channels| + return if channels.empty? + Log.debug { "Upstream connection closed, closing #{channels.size} client channels" } + channels.each_value do |downstream_channel| downstream_channel.close(code, reason) end - @channels.clear + channels.clear end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) Log.debug { "Sending broadcast frame to all client connections" } clients = Set(Client).new - @channels_lock.synchronize do - @channels.each_value do |downstream_channel| + with_channels do |channels| + channels.each_value do |downstream_channel| clients << downstream_channel.client end end @@ -144,9 +147,7 @@ module AMQProxy raise "Connection frames should not be sent through here: #{frame}" when AMQ::Protocol::Frame::Channel::CloseOk # when upstream server requested a channel close and client confirmed - @channels_lock.synchronize do - @channels.delete(frame.channel) - end + with_channels &.delete(frame.channel) end send frame end