Skip to content

Add mqtt to lavinmqperf#983

Merged
kickster97 merged 2 commits intomainfrom
mqtt-perf
Jun 23, 2025
Merged

Add mqtt to lavinmqperf#983
kickster97 merged 2 commits intomainfrom
mqtt-perf

Conversation

@kickster97
Copy link
Member

@kickster97 kickster97 commented Mar 12, 2025

WHAT is this pull request doing?

This PR abstracts LavinMQPerf into AMQP and MQTT modules.
lavinmqperf can now be run for both AMQP and MQTT by specifying the protocol in the cli like this:

lavinmqperf [protocol] [throughput | bind-churn | queue-churn | connection-churn | connection-count | queue-count] [arguments]

e.g.

lavinmqperf mqtt throughput

It adds the throughput test for the MQTT module, largely similar to the AMQP throughput test.

MQTT-specific features:

  • QoS levels
  • Retain flag for messages
  • Clean session option

addresses #971

HOW can this pull request be tested?

lavinmqperf mqtt throughput 

@kickster97 kickster97 marked this pull request as ready for review April 7, 2025 09:06
@kickster97 kickster97 requested a review from a team as a code owner April 7, 2025 09:06
@snichme
Copy link
Member

snichme commented Apr 8, 2025

Is there a way to run the throughput test using non-durable queues?

@carlhoerberg
Copy link
Member

carlhoerberg commented Apr 14, 2025

😱

✗ bin/lavinmqperf mqtt throughput -y 300 -x 100
Publish rate: 0 msgs/s Consume rate: 25063 msgs/s
Publish rate: 0 msgs/s Consume rate: 25941 msgs/s
Publish rate: 0 msgs/s Consume rate: 27455 msgs/s
Publish rate: 0 msgs/s Consume rate: 31092 msgs/s
Publish rate: 0 msgs/s Consume rate: 2382 msgs/s
Publish rate: 0 msgs/s Consume rate: 397 msgs/s
Publish rate: 0 msgs/s Consume rate: 3165 msgs/s
Publish rate: 0 msgs/s Consume rate: 1033 msgs/s
Publish rate: 0 msgs/s Consume rate: 3452 msgs/s
Publish rate: 0 msgs/s Consume rate: 10074 msgs/s
Publish rate: 0 msgs/s Consume rate: 12846 msgs/s
Publish rate: 0 msgs/s Consume rate: 15797 msgs/s
Publish rate: 0 msgs/s Consume rate: 16273 msgs/s
Invalid memory access (signal 11) at address 0x1060
[0x55e33f940eb9] ?? +94434512604857 in bin/lavinmqperf
[0x55e33f94058b] ?? +94434512602507 in bin/lavinmqperf
[0x7f1790141050] ?? +139739178209360 in /lib64/libc.so.6
[0x55e33f919810] ?? +94434512443408 in bin/lavinmqperf
[0x55e33f941eee] ?? +94434512609006 in bin/lavinmqperf
[0x55e33f943e6c] ?? +94434512617068 in bin/lavinmqperf
[0x55e33f943f6e] ?? +94434512617326 in bin/lavinmqperf
[0x55e33fa6785e] GC_inner_start_routine +94 in bin/lavinmqperf
[0x55e33fa58b98] GC_call_with_stack_base +24 in bin/lavinmqperf
[0x7f1790197ba8] ?? +139739178564520 in /lib64/libc.so.6
[0x7f179021bb8c] ?? +139739179105164 in /lib64/libc.so.6
[0x0] ???

didn't get a coredump unfortunately, will try to reproduce again

built with -Dpreview_mt --release

@carlhoerberg
Copy link
Member

Ah, are not getting a core dump becaise we don't reset the SEGV signal handler in lavinmqperf

@carlhoerberg
Copy link
Member

Maybe this is a crystal 1.16.0 issue, this is the coredump backtrace:

#0  0x0000556fffdc834d in wake_at () at /usr/share/crystal/src/crystal/event_loop/polling/event.cr:31
#1  heap_compare () at /usr/share/crystal/src/crystal/event_loop/polling/event.cr:64
#2  meld () at /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:92
#3  merge_pairs () at /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:130
#4  shift? () at /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:38
#5  run () at /usr/share/crystal/src/crystal/event_loop/timers.cr:35
#6  reschedule () at /usr/share/crystal/src/crystal/scheduler.cr:144
#7  0x0000556fffdf09ee in reschedule () at /usr/share/crystal/src/crystal/scheduler.cr:62
#8  suspend () at /usr/share/crystal/src/fiber.cr:351
#9  run_loop () at /usr/share/crystal/src/crystal/scheduler.cr:150
#10 0x0000556fffdf296c in -> () at /usr/share/crystal/src/crystal/scheduler.cr:150
#11 0x0000556fffdf2a6e in start () at /usr/share/crystal/src/crystal/system/thread.cr:231
#12 thread_proc () at /usr/share/crystal/src/crystal/system/unix/pthread.cr:47
#13 -> () at /usr/share/crystal/src/crystal/system/unix/pthread.cr:22
#14 0x0000556ffff1689e in GC_inner_start_routine ()
#15 0x0000556ffff07bd8 in GC_call_with_stack_base ()
#16 0x00007f8e864f7ba8 in start_thread () from /lib64/libc.so.6
#17 0x00007f8e8657bb8c in __clone3 () from /lib64/libc.so.6

@carlhoerberg
Copy link
Member

With debug symbols compiled in:

#0  0x00005645d3f6ed2f in merge_pairs (self=0x7fe033d0fce0, node=...) at /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:128
        tail = {type_id = 424, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0x7fdf607ff7b8}}
        a = {type_id = 869402120, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0xf700000001}}
        b = {type_id = 424, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0x7fde927ff5d8}}
        root = {type_id = 424, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0x7fdf607ff7b8}}
#1  0x00005645d3f6ebfd in shift? (self=0x7fe033d0fce0) at /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:38
        node = {type_id = 424, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0x7fe02b1ffdd0}}
#2  0x00005645d3f7fe53 in run (self=0x7fe033d0de70, blocking=true) at /usr/share/crystal/src/crystal/event_loop/timers.cr:35
        fiber = 0x7fe02a190a80
        blocking = true
        buffer = {{events = 1, data = {ptr = 0xb, fd = 11, u32 = 11, u64 = 11}}, {events = 5, data = {ptr = 0xfc00000001, fd = 1, u32 = 1,
              u64 = 1082331758593}}, {events = 1, data = {ptr = 0xa, fd = 10, u32 = 10, u64 = 10}}, {events = 4, data = {ptr = 0x18c00000001, fd = 1,
              u32 = 1, u64 = 1700807049217}}, {events = 4, data = {ptr = 0x17400000001, fd = 1, u32 = 1, u64 = 1597727834113}}, {events = 1, data = {
              ptr = 0xa, fd = 10, u32 = 10, u64 = 10}}, {events = 4, data = {ptr = 0x14400000001, fd = 1, u32 = 1, u64 = 1391569403905}}, {events = 5,
            data = {ptr = 0x18500000001, fd = 1, u32 = 1, u64 = 1670742278145}}, {events = 5, data = {ptr = 0x18c00000001, fd = 1, u32 = 1,
              u64 = 1700807049217}}, {events = 5, data = {ptr = 0x19200000001, fd = 1, u32 = 1, u64 = 1726576852993}}, {events = 5, data = {
              ptr = 0x19500000001, fd = 1, u32 = 1, u64 = 1739461754881}}, {events = 5, data = {ptr = 0x16000000002, fd = 2, u32 = 2, u64 = 1511828488194}}, {
            events = 5, data = {ptr = 0x16700000002, fd = 2, u32 = 2, u64 = 1541893259266}}, {events = 5, data = {ptr = 0x17b00000002, fd = 2, u32 = 2,
              u64 = 1627792605186}}, {events = 5, data = {ptr = 0x18200000002, fd = 2, u32 = 2, u64 = 1657857376258}}, {events = 5, data = {
              ptr = 0x14e00000003, fd = 3, u32 = 3, u64 = 1434519076867}}, {events = 5, data = {ptr = 0x19b00000001, fd = 1, u32 = 1, u64 = 1765231558657}}, {
            events = 5, data = {ptr = 0x19c00000001, fd = 1, u32 = 1, u64 = 1769526525953}}, {events = 5, data = {ptr = 0x19d00000001, fd = 1, u32 = 1,
              u64 = 1773821493249}}, {events = 5, data = {ptr = 0x19e00000001, fd = 1, u32 = 1, u64 = 1778116460545}}, {events = 5, data = {
              ptr = 0x19f00000001, fd = 1, u32 = 1, u64 = 1782411427841}}, {events = 5, data = {ptr = 0x1a000000001, fd = 1, u32 = 1, u64 = 1786706395137}}, {
            events = 5, data = {ptr = 0x1a100000001, fd = 1, u32 = 1, u64 = 1791001362433}}, {events = 5, data = {ptr = 0x1a200000001, fd = 1, u32 = 1,
              u64 = 1795296329729}}, {events = 5, data = {ptr = 0x1a300000001, fd = 1, u32 = 1, u64 = 1799591297025}}, {events = 8221, data = {
              ptr = 0x5a00000000, fd = 0, u32 = 0, u64 = 386547056640}}, {events = 8221, data = {ptr = 0x5d00000000, fd = 0, u32 = 0, u64 = 399431958528}}, {
            events = 8221, data = {ptr = 0x6100000000, fd = 0, u32 = 0, u64 = 416611827712}}, {events = 8221, data = {ptr = 0x6200000000, fd = 0, u32 = 0,
              u64 = 420906795008}}, {events = 8221, data = {ptr = 0x6900000000, fd = 0, u32 = 0, u64 = 450971566080}}, {events = 8221, data = {
              ptr = 0x6d00000000, fd = 0, u32 = 0, u64 = 468151435264}}, {events = 8221, data = {ptr = 0x7100000000, fd = 0, u32 = 0, u64 = 485331304448}}, {
            events = 8221, data = {ptr = 0x7400000000, fd = 0, u32 = 0, u64 = 498216206336}}, {events = 8221, data = {ptr = 0x7900000000, fd = 0, u32 = 0,
              u64 = 519691042816}}, {events = 8221, data = {ptr = 0x7c00000000, fd = 0, u32 = 0, u64 = 532575944704}}, {events = 8221, data = {
              ptr = 0x8000000000, fd = 0, u32 = 0, u64 = 549755813888}}, {events = 8221, data = {ptr = 0x8400000000, fd = 0, u32 = 0, u64 = 566935683072}}, {
            events = 8221, data = {ptr = 0x8a00000000, fd = 0, u32 = 0, u64 = 592705486848}}, {events = 8221, data = {ptr = 0x8d00000000, fd = 0, u32 = 0,
              u64 = 605590388736}}, {events = 8221, data = {ptr = 0x9200000000, fd = 0, u32 = 0, u64 = 627065225216}}, {events = 8221, data = {
              ptr = 0x9400000000, fd = 0, u32 = 0, u64 = 635655159808}}, {events = 8221, data = {ptr = 0x9800000000, fd = 0, u32 = 0, u64 = 652835028992}}, {
            events = 8221, data = {ptr = 0x9e00000000, fd = 0, u32 = 0, u64 = 678604832768}}, {events = 8221, data = {ptr = 0x9a00000000, fd = 0, u32 = 0,
              u64 = 661424963584}}, {events = 8221, data = {ptr = 0xa300000000, fd = 0, u32 = 0, u64 = 700079669248}}, {events = 8221, data = {
              ptr = 0xa900000000, fd = 0, u32 = 0, u64 = 725849473024}}, {events = 8221, data = {ptr = 0xad00000000, fd = 0, u32 = 0, u64 = 743029342208}}, {
            events = 8221, data = {ptr = 0xaa00000000, fd = 0, u32 = 0, u64 = 730144440320}}, {events = 8221, data = {ptr = 0xb400000000, fd = 0, u32 = 0,
              u64 = 773094113280}}, {events = 8221, data = {ptr = 0xb900000000, fd = 0, u32 = 0, u64 = 794568949760}}, {events = 8221, data = {
              ptr = 0xbd00000000, fd = 0, u32 = 0, u64 = 811748818944}}, {events = 8221, data = {ptr = 0xc000000000, fd = 0, u32 = 0, u64 = 824633720832}}, {
            events = 8221, data = {ptr = 0xc600000000, fd = 0, u32 = 0, u64 = 850403524608}}, {events = 8221, data = {ptr = 0xc500000000, fd = 0, u32 = 0,
              u64 = 846108557312}}, {events = 8221, data = {ptr = 0xcc00000000, fd = 0, u32 = 0, u64 = 876173328384}}, {events = 8221, data = {
              ptr = 0xd000000000, fd = 0, u32 = 0, u64 = 893353197568}}, {events = 8221, data = {ptr = 0xd400000000, fd = 0, u32 = 0, u64 = 910533066752}}, {
            events = 8221, data = {ptr = 0xd800000000, fd = 0, u32 = 0, u64 = 927712935936}}, {events = 8221, data = {ptr = 0xdc00000000, fd = 0, u32 = 0,
              u64 = 944892805120}}, {events = 8221, data = {ptr = 0xde00000000, fd = 0, u32 = 0, u64 = 953482739712}}, {events = 8221, data = {
              ptr = 0xe100000000, fd = 0, u32 = 0, u64 = 966367641600}}, {events = 8221, data = {ptr = 0xe500000000, fd = 0, u32 = 0, u64 = 983547510784}}, {
            events = 8221, data = {ptr = 0xeb00000000, fd = 0, u32 = 0, u64 = 1009317314560}}, {events = 8221, data = {ptr = 0xef00000000, fd = 0, u32 = 0,
              u64 = 1026497183744}}, {events = 8221, data = {ptr = 0xf300000000, fd = 0, u32 = 0, u64 = 1043677052928}}, {events = 8221, data = {
              ptr = 0xf500000000, fd = 0, u32 = 0, u64 = 1052266987520}}, {events = 8221, data = {ptr = 0xf900000000, fd = 0, u32 = 0, u64 = 1069446856704}},
          {events = 8221, data = {ptr = 0xfc00000000, fd = 0, u32 = 0, u64 = 1082331758592}}, {events = 8221, data = {ptr = 0xff00000000, fd = 0, u32 = 0,
              u64 = 1095216660480}}, {events = 8221, data = {ptr = 0x10300000000, fd = 0, u32 = 0, u64 = 1112396529664}}, {events = 8221, data = {
              ptr = 0x10500000000, fd = 0, u32 = 0, u64 = 1120986464256}}, {events = 8221, data = {ptr = 0x10b00000000, fd = 0, u32 = 0,
              u64 = 1146756268032}}, {events = 8221, data = {ptr = 0x11000000000, fd = 0, u32 = 0, u64 = 1168231104512}}, {events = 8221, data = {
              ptr = 0x11300000000, fd = 0, u32 = 0, u64 = 1181116006400}}, {events = 8221, data = {ptr = 0x11100000000, fd = 0, u32 = 0,
              u64 = 1172526071808}}, {events = 22085, data = {ptr = 0x5645d42634e8 <__pthread_register_cancel@got.plt>, fd = -735693592, u32 = 3559273704,
              u64 = 94857912005864}}, {events = 1705638384, data = {ptr = 0x2bbef9f000007ffd, fd = 32765, u32 = 32765, u64 = 3152231598393425917}}, {
            events = 32736, data = {ptr = 0x7fe0347b101d <_dl_fixup+269>, fd = 880480285, u32 = 880480285, u64 = 140600929882141}}, {events = 5, data = {
              ptr = 0x0, fd = 0, u32 = 0, u64 = 0}}, {events = 0, data = {ptr = 0x7fe033f941a8, fd = 871973288, u32 = 871973288, u64 = 140600921375144}}, {
            events = 870818224, data = {ptr = 0x7fe0, fd = 32736, u32 = 32736, u64 = 32736}}, {events = 0, data = {ptr = 0x7fe02bbefdc0, fd = 733937088,
              u32 = 733937088, u64 = 140600783338944}}, {events = 733939392, data = {ptr = 0xffffff7000007fe0, fd = 32736, u32 = 32736,
              u64 = 18446743455234293728}}, {events = 4294967295, data = {ptr = 0x0, fd = 0, u32 = 0, u64 = 0}}, {events = 1705638384, data = {
              ptr = 0x2bbefe1000007ffd, fd = 32765, u32 = 32765, u64 = 3152236133878890493}}, {events = 32736, data = {
              ptr = 0x7fe0347b365e <_dl_runtime_resolve_xsavec+126>, fd = 880490078, u32 = 880490078, u64 = 140600929891934}}, {events = 0, data = {
              ptr = 0x33e856d500000000, fd = 0, u32 = 0, u64 = 3740334963359219712}}, {events = 32736, data = {ptr = 0x4cc3b26e1a6517ee, fd = 442832878,
              u32 = 442832878, u64 = 5531460953300080622}}, {events = 0, data = {ptr = 0x2bbefe1000000000, fd = 0, u32 = 0, u64 = 3152236133878857728}}, {
            events = 32736, data = {ptr = 0x7fe033d41000, fd = 869535744, u32 = 869535744, u64 = 140600918937600}}, {events = 0, data = {ptr = 0x0, fd = 0,
              u32 = 0, u64 = 0}}, {events = 0, data = {ptr = 0x0, fd = 0, u32 = 0, u64 = 0}}, {events = 0, data = {ptr = 0x2bbefa5800000000, fd = 0, u32 = 0,
              u64 = 3152232045069991936}}, {events = 32736, data = {ptr = 0x5645d3f89d50 <->>, fd = -738681520, u32 = 3556285776, u64 = 94857909017936}}, {
--Type <RET> for more, q to quit, c to continue without paging--
            events = 0, data = {ptr = 0x2000000000, fd = 0, u32 = 0, u64 = 137438953472}}, {events = 0, data = {ptr = 0x7fe02bbf06c0, fd = 733939392,
              u32 = 733939392, u64 = 140600783341248}}, {events = 3560143616, data = {ptr = 0x33d13a8000005645, fd = 22085, u32 = 22085,
              u64 = 3733829887497098821}}, {events = 32736, data = {ptr = 0x7fe033d13a80, fd = 869350016, u32 = 869350016, u64 = 140600918751872}}, {
            events = 1, data = {ptr = 0xd41ae0a300000000, fd = 0, u32 = 0, u64 = 15283775276119490560}}, {events = 22085, data = {ptr = 0x7fe033d13a80,
              fd = 869350016, u32 = 869350016, u64 = 140600918751872}}, {events = 32, data = {ptr = 0x2bbefe1000000000, fd = 0, u32 = 0,
              u64 = 3152236133878857728}}, {events = 32736, data = {ptr = 0x7fe02bbf06c0, fd = 733939392, u32 = 733939392, u64 = 140600783341248}}, {
            events = 4294967152, data = {ptr = 0xd3eefe0effffffff, fd = -1, u32 = 4294967295, u64 = 15271422726837895167}}, {events = 22085, data = {
              ptr = 0x20, fd = 32, u32 = 32, u64 = 32}}, {events = 32, data = {ptr = 0x2000000000, fd = 0, u32 = 0, u64 = 137438953472}}, {events = 0,
            data = {ptr = 0x133d13a98, fd = 869350040, u32 = 869350040, u64 = 5164317336}}, {events = 32, data = {ptr = 0xd41d061000000000, fd = 0, u32 = 0,
              u64 = 15284379376154574848}}, {events = 22085, data = {ptr = 0x55555554, fd = 1431655764, u32 = 1431655764, u64 = 1431655764}}, {
            events = 1179670597, data = {ptr = 0x2000005645, fd = 22085, u32 = 22085, u64 = 137438975557}}, {events = 0, data = {
              ptr = 0x5645d3f6faed <malloc+93>, fd = -738788627, u32 = 3556178669, u64 = 94857908910829}}, {events = 4, data = {ptr = 0x33d0fc800000082b,
              fd = 2091, u32 = 2091, u64 = 3733761717776156715}}, {events = 32736, data = {ptr = 0x7fe033d13a80, fd = 869350016, u32 = 869350016,
              u64 = 140600918751872}}, {events = 3556178162, data = {ptr = 0x2b1ffff000005645, fd = 22085, u32 = 22085, u64 = 3107483674166187589}}, {
            events = 32736, data = {ptr = 0x564500000000, fd = 0, u32 = 0, u64 = 94854352732160}}, {events = 0, data = {ptr = 0x400000004, fd = 4, u32 = 4,
              u64 = 17179869188}}, {events = 0, data = {ptr = 0x7fe000000000, fd = 0, u32 = 0, u64 = 140600049401856}}, {events = 0, data = {
              ptr = 0x33d13a8000000000, fd = 0, u32 = 0, u64 = 3733829887497076736}}, {events = 32736, data = {ptr = 0x5645d41d4730, fd = -736278736,
              u32 = 3558688560, u64 = 94857911420720}}, {events = 5, data = {ptr = 0xd3f2370f00000005, fd = 5, u32 = 5, u64 = 15272329823930810373}}, {
            events = 22085, data = {ptr = 0x500000000, fd = 0, u32 = 0, u64 = 21474836480}}, {events = 3556241734, data = {ptr = 0x33d0fc8000005645,
              fd = 22085, u32 = 22085, u64 = 3733761717776176709}}, {events = 32736, data = {ptr = 0x100000005, fd = 5, u32 = 5, u64 = 4294967301}}, {
            events = 3559276380, data = {ptr = 0x2a10df0800005645, fd = 22085, u32 = 22085, u64 = 3031167774673098309}}, {events = 32736, data = {
              ptr = 0x500000005, fd = 5, u32 = 5, u64 = 21474836485}}, {events = 3555866383, data = {ptr = 0x33d3ef2800000000, fd = 0, u32 = 0,
              u64 = 3734591471098003456}}, {events = 32736, data = {ptr = 0x5645d3f26110 <set+128>, fd = -739090160, u32 = 3555877136,
              u64 = 94857908609296}}, {events = 4, data = {ptr = 0x500000001, fd = 1, u32 = 1, u64 = 21474836481}}, {events = 0, data = {
              ptr = 0x7fe033d3ef28, fd = 869527336, u32 = 869527336, u64 = 140600918929192}}}
        epoll_events = {size = 1, read_only = false, pointer = 0x7fe02bbef5f8}
        timer_triggered = true
        i = 0
        epoll_event = 0x7fe02bbef5f8
        __temp_1037 = 11
        i = 1
        fiber = 0x7fe02a190a80
        epoll_event = 0x7fe02bbef5f8
        index = {data = 1082331758593}
        events = 5
        pd = 0x7fe033d20720
        index = {data = 1082331758593}
        entry = 0x7fe033d20720
        index = {data = 1082331758593}
        entry = {type_id = 503, union = {Pointer(Crystal::EventLoop::Polling::Arena::Entry(Crystal::EventLoop::Polling::PollDescriptor)) = 0x7fe033d20720}}
        event = 0x3feccccccccccccd
        event = 0x0
        node = 0x404b000000000000
        node = 0x7fe02bbefe10
        _next = 0x1
        fiber = 0x0
        event = 0x7fe033cc6600
        __temp_1042 = false
        event = 0x0
        fiber = 0x0
        __temp_1044 = false
        event = 0x0
        fiber = 0x0
        __temp_1045 = false
        event = 0x7fdf6a7ff5d8
        event = {type_id = 424, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0x7fdf6a7ff5d8}}
        fiber = 0x7fe02a190a80
        __temp_1046 = false
        event = 0x0
        fiber = 0x0
        __temp_1047 = true
        fiber = 0x0
        timer_triggered = true
        buffer = {0x0 <repeats 11 times>, 0xa8c46505853, 0x207, 0xa88, 0x0, 0x0, 0x0, 0x203, 0x0 <repeats 110 times>}
        size = 0
        __temp_1049 = false
        event = 0x0
        __temp_1048 = {[0] = 227483, [1] = 739632449}
        seconds = 227483
--Type <RET> for more, q to quit, c to continue without paging--
        nanoseconds = 739632449
        now = {seconds = 227483, nanoseconds = 739632449}
        event = {type_id = 424, union = {Pointer(Crystal::EventLoop::Polling::Event) = 0x7fe02b1ffdd0}}
        event = 0x0
        i = -1
        i = 0
        fiber = 0x0
        event = 0x7fe033e2731d <sigsuspend+93>
        fiber = 0x7fe033d13a80
        __temp_1055 = 22085
        select_action = 0x0
        pd = 0x0
        pd = 0x0
#3  0x00005645d3f2623c in reschedule (self=0x7fe033d3ef00) at /usr/share/crystal/src/crystal/scheduler.cr:144
        runnable = 0x0
        section = Sched
        operation = 0x5645d41d3f30
        time = 869347040
        metadata = {<No data fields>}
#4  0x00005645d3f261c7 in reschedule () at /usr/share/crystal/src/crystal/scheduler.cr:62
No locals.
#5  0x00005645d3f26196 in suspend () at /usr/share/crystal/src/fiber.cr:351
No locals.
#6  0x00005645d3f890fa in run_loop (self=0x7fe033d3ef00) at /usr/share/crystal/src/crystal/scheduler.cr:150
        runnable = 0x0
        section = Sched
        operation = 0x5645d41d48c0
        time = 869388256
        metadata = {<No data fields>}
#7  0x00005645d3f8dcef in -> (__temp_1318=0x7fe033d12ee0) at /usr/share/crystal/src/crystal/scheduler.cr:150
        scheduler = 0x7fe033d3ef00
        pending = {value = 0}
#8  0x00005645d3f8e09e in start (self=0x7fe033d12ee0) at /usr/share/crystal/src/crystal/system/thread.cr:231
        fiber = 0x7fe033d13b40
        name = 0x7fe033d14f60
        ex = 0x388
#9  0x00005645d3f8dfe0 in thread_proc (data=0x7fe033d12ee0) at /usr/share/crystal/src/crystal/system/unix/pthread.cr:47
        th = 0x7fe033d12ee0
#10 0x00005645d3f8df9e in -> (__temp_1320=0x7fe033d12ee0) at /usr/share/crystal/src/crystal/system/unix/pthread.cr:22
No locals.
#11 0x00005645d41c35de in GC_inner_start_routine ()
No symbol table info available.
#12 0x00005645d41b4918 in GC_call_with_stack_base ()
No symbol table info available.
#13 0x00007fe033e7dba8 in start_thread () from /lib64/libc.so.6
No symbol table info available.
#14 0x00007fe033f01b8c in __clone3 () from /lib64/libc.so.6
No symbol table info available.

@kickster97
Copy link
Member Author

kickster97 commented Apr 14, 2025

Hmm not good..

forgot to take notes here last week, so doing it now: last week me & @spuun saw multi-thread issues the throughput tool, we think we have it down to mqtt-client.
The consuming client disconnects after a while, we could not nail down where the issue was, but it can't be replicated with mosquitto. If we run read_loop and message_loop in mqtt_client with same_thread: true we see no issues.

@spuun
Copy link
Member

spuun commented Apr 14, 2025

Hmm not good..

forgot to take notes here, so doing it now: last week me & @spuun saw multi-thread issues the throughput tool, we think we have it down to mqtt-client. The consuming client disconnects after a while, we could not nail down where the issue was, but it can't be replicated with mosquitto. If we run read_loop and message_loop in mqtt_client with same_thread: true we see no issues.

I narrowed that down to crystal-lang/crystal#15647

@ysbaddaden
Copy link

This might be caused by crystal-lang/crystal#15647. With the fix from crystal-lang/crystal#15650 lavinmqperf now prints lots of:

Unhandled exception in spawn(name: mqtt-client read_loop): BUG: transfering fd=340 to another evloop with pending reader/writer fibers (RuntimeError)
  from /home/julien/work/crystal-lang/crystal/src/crystal/event_loop/polling/poll_descriptor.cr:30:5 in 'take_ownership'
  from /home/julien/work/crystal-lang/crystal/src/crystal/event_loop/polling.cr:413:31 in 'wait_readable'
  from /home/julien/work/crystal-lang/crystal/src/crystal/event_loop/polling.cr:318:9 in 'evented_read'
  from /home/julien/work/crystal-lang/crystal/src/crystal/event_loop/polling.cr:199:5 in 'read'
  from /home/julien/work/crystal-lang/crystal/src/crystal/system/socket.cr:85:5 in 'system_read'
  from /home/julien/work/crystal-lang/crystal/src/socket.cr:446:5 in 'unbuffered_read'
  from /home/julien/work/crystal-lang/crystal/src/io/buffered.cr:272:5 in 'fill_buffer'
  from /home/julien/work/crystal-lang/crystal/src/io/buffered.cr:62:5 in 'read_byte'
  from lib/mqtt-client/src/mqtt-client/connection.cr:133:15 in 'read_loop'
  from lib/mqtt-client/src/mqtt-client/connection.cr:131:15 in 'read_loop'
  from lib/mqtt-client/src/mqtt-client/connection.cr:43:9 in '->'

As explained in crystal-lang/crystal#15658 (comment) this means that a thread's evloop is trying to read or write, while another thread's evloop is already having a pending read or write.

@ysbaddaden
Copy link

ysbaddaden commented Apr 14, 2025

Or maybe there's an issue with the fix. I'm investigating to understand 🕵️

@ysbaddaden
Copy link

The fix exposes a limitation from the "lifetime event loop": we can't share a fd across event loops, and I understand from tracing that there are two fibers created: one is dedicated to read from the the fd while the other is dedicated to write to the fd.

Problem is, those two fibers can't end up in two different event loops at the same time. This is a known limitation of the "lifetime" design.

Due to preview_mt spawning fibers into whatever thread and each thread having their own event loop, the fibers end up in different threads, and thus different event loops, and both try to wait on read and write at the same time, so one evloop takes ownership of the fd while another evloop crashes because it also tries to take ownership while there's a pending reader.

The situation can be avoided for preview_mt with same_thread: true to make sure both fibers are on the same thread (and thus the same event loop).

It can also be avoided with a Fiber::ExecutionContext::MultiThreaded context that have one event loop per context (not per thread), as long as both fibers are running in the same context.

@ysbaddaden
Copy link

ysbaddaden commented Apr 14, 2025

I started a MT execution context + the patch from crystal-lang/crystal#15647 and I can't reproduce the "can't transfer fd" bug nor the SEGFAULT.

diff --git a/src/lavinmqperf/mqtt/throughput.cr b/src/lavinmqperf/mqtt/throughput.cr
index 5bff1522..e5363a5f 100644
--- a/src/lavinmqperf/mqtt/throughput.cr
+++ b/src/lavinmqperf/mqtt/throughput.cr
@@ -100,16 +100,18 @@ def initialize
       def run
         super

+        mt = Fiber::ExecutionContext::MultiThreaded.new("MQTT", maximum: 4)
+
         done = WaitGroup.new(@consumers + @publishers)

         @consumers.times do |i|
-          spawn { reconnect_on_disconnect(done) { consume(i) } }
+          mt.spawn { reconnect_on_disconnect(done) { consume(i) } }
         end

         sleep 1.seconds # Give consumers time to connect

         @publishers.times do |i|
-          spawn { reconnect_on_disconnect(done) { pub(i) } }
+          mt.spawn { reconnect_on_disconnect(done) { pub(i) } }
         end

         if @timeout != Time::Span.zero

@carlhoerberg
Copy link
Member

Ok, we "fixed" mqtt-client.cr and spawned the message_loop with same_thread: true in the read_loop fiber but now I get:

Unhandled exception in spawn(name: mqtt-client message_loop): Crystal::EventLoop::Polling::Event#wake_at cannot be nil (NilAssertionError)
  from /usr/share/crystal/src/crystal/event_loop/polling/event.cr:31:3 in 'wake_at'
  from /usr/share/crystal/src/crystal/event_loop/polling/event.cr:64:15 in 'heap_compare'
  from /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:92:8 in 'meld'
  from /usr/share/crystal/src/crystal/pointer_pairing_heap.cr:67:19 in 'delete'
  from /usr/share/crystal/src/crystal/event_loop/timers.cr:54:7 in 'delete'
  from /usr/share/crystal/src/crystal/event_loop/polling.cr:460:34 in 'delete_timer'
  from /usr/share/crystal/src/crystal/event_loop/polling.cr:479:33 in 'run'
  from /usr/share/crystal/src/crystal/scheduler.cr:144:11 in 'reschedule'
  from /usr/share/crystal/src/crystal/scheduler.cr:62:5 in 'reschedule'
  from /usr/share/crystal/src/fiber.cr:351:5 in 'suspend'
  from /usr/share/crystal/src/channel.cr:221:7 in 'receive?'
  from lib/mqtt-client/src/mqtt-client/connection.cr:144:21 in 'message_loop'
  from lib/mqtt-client/src/mqtt-client/connection.cr:142:7 in 'message_loop'
  from lib/mqtt-client/src/mqtt-client/connection.cr:154:9 in '->'
  from /usr/share/crystal/src/fiber.cr:170:11 in 'run'
  from /usr/share/crystal/src/fiber.cr:105:3 in '->'
  from ???

@ysbaddaden
Copy link

ysbaddaden commented Apr 14, 2025

Allright. I'll try to reproduce. It sounds like a Event#clear set @wake_at to nil before removing the event from the timers' structure... which should have been fixed by crystal-lang/crystal#15647 but maybe a thread is acting up in parallel.

@carlhoerberg
Copy link
Member

Oh, I missed that the socket opened on another thread from the read_loop, setting same_thread on both the read_loop and message_loop I don't get any seg fault nor exception. Using stock crystal 1.16.0, haven't applied the patch from crystal-lang/crystal#15650

@ysbaddaden
Copy link

But then you're using a single thread for everything, so you're back to ST.

You should be able to open in a thread/context, do some things (like authentication) then pass the fd to another thread/context.

Having the message_loop and read_loop fibers in the same thread should be enough.

@ysbaddaden
Copy link

ysbaddaden commented Apr 14, 2025

I patched mqtt-client/connection, so both fibers would be on the same thread:

spawn same_thread: false do
  spawn read_loop, name: "mqtt-client:read_loop", same_thread: true
  spawn message_loop, name: "mqtt-client:message_loop", same_thread: true
end

But I still get BUG: transfering fd=302 to another evloop with pending reader/writer fibers 😞

EDIT: there's a 3rd unnamed fiber ? that creates a pending writer.

@carlhoerberg
Copy link
Member

Yes, isn't it the lavinmqperf that calls Connection#publish?

@carlhoerberg
Copy link
Member

Maybe should have a write_loop and a Channel with all write requests, instead of a Mutex around the socket write operations

@ysbaddaden
Copy link

No, it was me not thinking correctly. I took a break for dinner and realized we need the fix (to be released in 1.16.1) for the evloop timers to work correctly with MT, otherwise we get canceled timeouts on the wrong event loop and that leads to the above segfaults and nil wake_at.

@kickster97
Copy link
Member Author

For this PR's sake, and because its good to be close to the packets, i'll go with using mqtt-protocol instead of mqtt-clent.

this change is interesting though, running lavinmqperf mqtt throughput (default values) crashes lavinmq with
Process terminated because of an invalid memory access

@kickster97 kickster97 force-pushed the mqtt-perf branch 2 times, most recently from d71a82b to c8430a1 Compare April 29, 2025 09:26
@kickster97 kickster97 requested a review from Copilot April 29, 2025 09:32
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review any files in this pull request.

Files not reviewed (19)
  • src/lavinmqperf.cr: Language not supported
  • src/lavinmqperf/amqp/bind_churn.cr: Language not supported
  • src/lavinmqperf/amqp/channel_churn.cr: Language not supported
  • src/lavinmqperf/amqp/connection_churn.cr: Language not supported
  • src/lavinmqperf/amqp/connection_count.cr: Language not supported
  • src/lavinmqperf/amqp/consumer_churn.cr: Language not supported
  • src/lavinmqperf/amqp/queue_churn.cr: Language not supported
  • src/lavinmqperf/amqp/queue_count.cr: Language not supported
  • src/lavinmqperf/amqp/throughput.cr: Language not supported
  • src/lavinmqperf/bind_churn.cr: Language not supported
  • src/lavinmqperf/channel_churn.cr: Language not supported
  • src/lavinmqperf/connection_churn.cr: Language not supported
  • src/lavinmqperf/connection_count.cr: Language not supported
  • src/lavinmqperf/consumer_churn.cr: Language not supported
  • src/lavinmqperf/mqtt/throughput.cr: Language not supported
  • src/lavinmqperf/perf.cr: Language not supported
  • src/lavinmqperf/queue_churn.cr: Language not supported
  • src/lavinmqperf/queue_count.cr: Language not supported
  • src/lavinmqperf/throughput.cr: Language not supported

@kickster97 kickster97 requested a review from Copilot April 29, 2025 09:33
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review any files in this pull request.

Files not reviewed (19)
  • src/lavinmqperf.cr: Language not supported
  • src/lavinmqperf/amqp/bind_churn.cr: Language not supported
  • src/lavinmqperf/amqp/channel_churn.cr: Language not supported
  • src/lavinmqperf/amqp/connection_churn.cr: Language not supported
  • src/lavinmqperf/amqp/connection_count.cr: Language not supported
  • src/lavinmqperf/amqp/consumer_churn.cr: Language not supported
  • src/lavinmqperf/amqp/queue_churn.cr: Language not supported
  • src/lavinmqperf/amqp/queue_count.cr: Language not supported
  • src/lavinmqperf/amqp/throughput.cr: Language not supported
  • src/lavinmqperf/bind_churn.cr: Language not supported
  • src/lavinmqperf/channel_churn.cr: Language not supported
  • src/lavinmqperf/connection_churn.cr: Language not supported
  • src/lavinmqperf/connection_count.cr: Language not supported
  • src/lavinmqperf/consumer_churn.cr: Language not supported
  • src/lavinmqperf/mqtt/throughput.cr: Language not supported
  • src/lavinmqperf/perf.cr: Language not supported
  • src/lavinmqperf/queue_churn.cr: Language not supported
  • src/lavinmqperf/queue_count.cr: Language not supported
  • src/lavinmqperf/throughput.cr: Language not supported

Copy link
Member

@snichme snichme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has anything changed in the amqp perf tools or are they just moved?

@kickster97
Copy link
Member Author

Has anything changed in the amqp perf tools or are they just moved?

they are just moved 👍

@kickster97 kickster97 requested a review from snichme April 30, 2025 08:30
@kickster97 kickster97 force-pushed the mqtt-perf branch 3 times, most recently from 8d50e03 to 36cd8c9 Compare May 7, 2025 12:46
@kickster97
Copy link
Member Author

I got some strange errors after rebasing and adopting execution context, the consuming client would randomly disconnect itself without catching any errors, and LavinMQ would either log "connection reset by peer.." or nothing at all.

Me & @spuun looked at this today, and this seems to not happen when I use two separate execution contexts for the consumer and publisher. One execution context should be ok here (?), so we are a bit confused because using one causes the errors (at least to happen more frequently, with 2 ECs we don't see errors even if we keep it going for minutes)

@carlhoerberg
Copy link
Member

Did you try to use a WaitGroup and wait for all connections to be connected before consuming/publihsing? Maybe the extra MT context isn't needed then.

@kickster97
Copy link
Member Author

Did you try to use a WaitGroup and wait for all connections to be connected before consuming/publihsing? Maybe the extra MT context isn't needed then.

looking at that now, got some bumps with Negative WaitGroup counter (RuntimeError) if we disconnect/connect the broker. But for this issue it looks like it could be a solution

@carlhoerberg
Copy link
Member

Ok, yes, that's why i did wg.done rescue nil in the amqp throughput. Needed when the connection is reconnected.

@kickster97
Copy link
Member Author

Ok, yes, that's why i did wg.done rescue nil in the amqp throughput. Needed when the connection is reconnected.

I get this for the amqp throughput thats merged to main as well. If I start lavinmq and lavinmqperf and then stop lavinmq and start it again, lavinmqperf crashes

@kickster97
Copy link
Member Author

kickster97 commented May 28, 2025

But for this issue it looks like it could be a solution

hmm scratch that.. now I caught an EOF error even when using wait_until_all_are_connected.. We did a "smaller" version already in MQTT throughput, where we call Fiber.yield and gave the clients a bit of time to finish connecting..

I'll still add the waitgroup because thats better than just arbitrarily waiting with a yield, but I don't think it solves the execution context problem

@kickster97 kickster97 force-pushed the mqtt-perf branch 4 times, most recently from 7fae885 to 28eefb0 Compare June 4, 2025 07:42
@kickster97 kickster97 requested a review from carlhoerberg June 10, 2025 11:51
@kickster97 kickster97 merged commit 4f0159e into main Jun 23, 2025
15 of 17 checks passed
@kickster97 kickster97 deleted the mqtt-perf branch June 23, 2025 13:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants