Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions src/lavinmqperf.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "log"
require "./lavinmq/version"
require "./lavinmqperf/*"
require "./lavinmqperf/amqp/*"
require "./lavinmqperf/mqtt/*"
require "./stdlib/resource"

{% unless flag?(:release) %}
Expand All @@ -11,17 +12,28 @@ Signal::SEGV.reset # Let the OS generate a coredump
Log.setup_from_env

module LavinMQPerf
mode = ARGV.shift?
case mode
when "throughput" then Throughput.new.run
when "bind-churn" then BindChurn.new.run
when "queue-churn" then QueueChurn.new.run
when "connection-churn" then ConnectionChurn.new.run
when "channel-churn" then ChannelChurn.new.run
when "consumer-churn" then ConsumerChurn.new.run
when "connection-count" then ConnectionCount.new.run
when "queue-count" then QueueCount.new.run
when /^.+$/ then Perf.new.run([mode.not_nil!])
else abort Perf.new.banner
protocol = ARGV.shift? || "amqp"
case protocol
when "amqp"
mode = ARGV.shift?
case mode
when "throughput" then AMQP::Throughput.new.run
when "bind-churn" then AMQP::BindChurn.new.run
when "queue-churn" then AMQP::QueueChurn.new.run
when "connection-churn" then AMQP::ConnectionChurn.new.run
when "channel-churn" then AMQP::ChannelChurn.new.run
when "consumer-churn" then AMQP::ConsumerChurn.new.run
when "connection-count" then AMQP::ConnectionCount.new.run
when "queue-count" then AMQP::QueueCount.new.run
when /^.+$/ then Perf.new.run([mode.not_nil!])
else abort Perf.new.amqp_banner
end
when "mqtt"
mode = ARGV.shift?
case mode
when "throughput" then MQTT::Throughput.new.run
else abort Perf.new.mqtt_banner
end
else abort Perf.new.amqp_banner
end
end
31 changes: 31 additions & 0 deletions src/lavinmqperf/amqp/bind_churn.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require "amqp-client"
require "benchmark"
require "../perf"

module LavinMQPerf
module AMQP
class BindChurn < Perf
def run
super

r = Random::DEFAULT
::AMQP::Client.start(@uri) do |c|
ch = c.channel
temp_q = ch.queue
durable_q = ch.queue("bind-churn-durable-#{r.hex(8)}")

Benchmark.ips do |x|
x.report("bind non-durable queue") do
temp_q.bind "amq.direct", r.hex(16)
end
x.report("bind durable queue") do
durable_q.bind "amq.direct", r.hex(10)
end
end
ensure
durable_q.delete if durable_q
end
end
end
end
end
21 changes: 21 additions & 0 deletions src/lavinmqperf/amqp/channel_churn.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require "amqp-client"
require "benchmark"
require "../perf"

module LavinMQPerf
module AMQP
class ChannelChurn < Perf
def run
super
c = ::AMQP::Client.new(@uri)
conn = c.connect
Benchmark.ips do |x|
x.report("open-close channel") do
ch = conn.channel
ch.close
end
end
end
end
end
end
21 changes: 21 additions & 0 deletions src/lavinmqperf/amqp/connection_churn.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require "amqp-client"
require "benchmark"
require "../perf"

module LavinMQPerf
module AMQP
class ConnectionChurn < Perf
def run
super
c = ::AMQP::Client.new(@uri)
Benchmark.ips do |x|
x.report("open-close connection and channel") do
conn = c.connect
conn.channel
conn.close
end
end
end
end
end
end
84 changes: 84 additions & 0 deletions src/lavinmqperf/amqp/connection_count.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
require "amqp-client"
require "../perf"

module LavinMQPerf
module AMQP
class ConnectionCount < Perf
alias BasicConsumeFrame = ::AMQP::Client::Frame::Basic::Consume
@connections = 100
@channels = 1
@consumers = 0
@queue = "connection-count"
@random_localhost = false
@done = Channel(Int32).new(100)

def initialize
super
@parser.on("-x count", "--count=number", "Number of connections (default 100)") do |v|
@connections = v.to_i
end
@parser.on("-c channels", "--channels=number", "Number of channels per connection (default 1)") do |v|
@channels = v.to_i
end
@parser.on("-C consumers", "--consumers=number", "Number of consumers per channel (default 0)") do |v|
@consumers = v.to_i
end
@parser.on("-u queue", "--queue=name", "Queue name (default #{@queue})") do |v|
@queue = v
end
@parser.on("-l", "--localhost", "Connect to random localhost 127.0.0.0/16 address") do
@random_localhost = true
end
@parser.on("-k IDLE:COUNT:INTERVAL", "--keepalive=IDLE:COUNT:INTERVAL", "TCP keepalive values") do |v|
@uri.query_params["tcp_keepalive"] = v
end
puts "FD limit: #{System.maximize_fd_limit}"
end

private def connect(i)
c = client.connect
@channels.times do |j|
ch = c.channel
@consumers.times do |k|
ch.queue(@queue) if j == k == 0
# Send raw frame, no wait, no fiber
c.write BasicConsumeFrame.new(ch.id, 0_u16, "", "", false, true, false, true, ::AMQP::Client::Arguments.new)
end
end
@done.send i
end

def run
super
count = 0
loop do
@connections.times.each_slice(100) do |slice|
start = Time.monotonic
slice.each do |i|
spawn connect(i)
end
slice.each do |_i|
@done.receive
print '.'
end
stop = Time.monotonic
puts " #{(stop - start).total_milliseconds.round}ms"
end
puts
print "#{count += @connections} connections "
print "#{count * @channels} channels "
print "#{count * @channels * @consumers} consumers. "
puts "Using #{rss.humanize_bytes} memory."
puts "Press enter to do add #{@connections} connections or ctrl-c to abort"
gets
end
end

private def client : ::AMQP::Client
client = @client ||= ::AMQP::Client.new(@uri)
client.host = "127.0.#{Random.rand(UInt8)}.#{Random.rand(UInt8)}" if @random_localhost
client
end
end
end
end
22 changes: 22 additions & 0 deletions src/lavinmqperf/amqp/consumer_churn.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require "amqp-client"
require "benchmark"
require "../perf"

module LavinMQPerf
module AMQP
class ConsumerChurn < Perf
def run
super
c = ::AMQP::Client.new(@uri).connect
ch = c.channel
q = ch.queue_declare "", auto_delete: false
Benchmark.ips do |x|
x.report("open-close consumer") do
tag = ch.basic_consume(q[:queue_name]) { }
ch.basic_cancel(tag, no_wait: true)
end
end
end
end
end
end
31 changes: 31 additions & 0 deletions src/lavinmqperf/amqp/queue_churn.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require "amqp-client"
require "benchmark"
require "../perf"

module LavinMQPerf
module AMQP
class QueueChurn < Perf
def run
super

r = Random::DEFAULT
::AMQP::Client.start(@uri) do |c|
ch = c.channel

Benchmark.ips do |x|
x.report("create/delete transient queue") do
q = ch.queue
ensure
q.delete if q
end
x.report("create/delete durable queue") do
q = ch.queue("queue-churn-durable-#{r.hex(8)}")
ensure
q.delete if q
end
end
end
end
end
end
end
40 changes: 40 additions & 0 deletions src/lavinmqperf/amqp/queue_count.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
require "amqp-client"
require "../perf"

module LavinMQPerf
module AMQP
class QueueCount < Perf
@queues = 100

def initialize
super
@parser.on("-q queues", "--queues=number", "Number of queues (default 100)") do |v|
@queues = v.to_i
end
end

def run
super
count = 0
c = client.connect
ch = c.channel
loop do
@queues.times.each_slice(100) do |slice|
slice.each do
ch.queue("lavinmqperf-queue-#{Random::DEFAULT.hex(8)}", durable: false, auto_delete: true, exclusive: true)
end
end
puts
print "#{count += @queues} queues "
puts "Using #{rss.humanize_bytes} memory."
puts "Press enter to do add #{@queues} queues or ctrl-c to abort"
gets
end
end

private def client : ::AMQP::Client
@client ||= ::AMQP::Client.new(@uri)
end
end
end
end
Loading
Loading