Skip to content

Commit 681491c

Browse files
committed
added rpc queue
1 parent 12bfd76 commit 681491c

File tree

4 files changed

+243
-0
lines changed

4 files changed

+243
-0
lines changed

src/framework/global/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ if (MUSE_THREADS_SUPPORT)
161161
${CMAKE_CURRENT_LIST_DIR}/concurrency/concurrent.h
162162
${CMAKE_CURRENT_LIST_DIR}/concurrency/threadutils.h
163163
${CMAKE_CURRENT_LIST_DIR}/concurrency/ringqueue.h
164+
${CMAKE_CURRENT_LIST_DIR}/concurrency/rpcqueue.h
164165
)
165166
endif()
166167

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* SPDX-License-Identifier: GPL-3.0-only
3+
* MuseScore-CLA-applies
4+
*
5+
* MuseScore
6+
* Music Composition & Notation
7+
*
8+
* Copyright (C) 2025 MuseScore Limited and others
9+
*
10+
* This program is free software: you can redistribute it and/or modify
11+
* it under the terms of the GNU General Public License version 3 as
12+
* published by the Free Software Foundation.
13+
*
14+
* This program is distributed in the hope that it will be useful,
15+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
* GNU General Public License for more details.
18+
*
19+
* You should have received a copy of the GNU General Public License
20+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
21+
*/
22+
#pragma once
23+
24+
#include <memory>
25+
#include <queue>
26+
27+
#include "ringqueue.h"
28+
29+
namespace muse {
30+
//! NOTE Single Producer/Single Consumer
31+
32+
template<typename T>
33+
class RpcPort;
34+
35+
template<typename T>
36+
class RpcQueue
37+
{
38+
private:
39+
std::shared_ptr<RpcPort<T> > m_port1;
40+
std::shared_ptr<RpcPort<T> > m_port2;
41+
42+
public:
43+
explicit RpcQueue(size_t capacity = 128)
44+
: m_port1(std::make_shared<RpcPort<T> >(capacity))
45+
, m_port2(std::make_shared<RpcPort<T> >(capacity))
46+
{
47+
m_port1->connect(m_port2);
48+
m_port2->connect(m_port1);
49+
}
50+
51+
~RpcQueue() = default;
52+
53+
RpcQueue(const RpcQueue&) = delete;
54+
RpcQueue& operator=(const RpcQueue&) = delete;
55+
56+
std::shared_ptr<RpcPort<T> > port1() const { return m_port1; }
57+
std::shared_ptr<RpcPort<T> > port2() const { return m_port2; }
58+
};
59+
60+
template<typename T>
61+
class RpcPort
62+
{
63+
private:
64+
std::shared_ptr<RpcPort<T> > m_connPort;
65+
RingQueue<T> m_queue;
66+
std::vector<T> m_buffer;
67+
std::queue<T> m_pending;
68+
std::function<void(const T&)> m_handler;
69+
70+
public:
71+
72+
explicit RpcPort(size_t capacity)
73+
: m_queue(capacity), m_buffer(capacity)
74+
{
75+
}
76+
77+
void connect(const std::shared_ptr<RpcPort<T> >& port)
78+
{
79+
m_connPort = port;
80+
}
81+
82+
void process()
83+
{
84+
// try send pending
85+
sendPending();
86+
87+
// receive messages
88+
m_buffer.clear();
89+
bool ok = m_connPort->m_queue.tryPopAll(m_buffer);
90+
if (ok && m_handler) {
91+
for (const T& item : m_buffer) {
92+
m_handler(item);
93+
}
94+
}
95+
}
96+
97+
bool sendPending()
98+
{
99+
while (!m_pending.empty()) {
100+
const T& item = m_pending.front();
101+
bool ok = m_queue.tryPush(item);
102+
if (ok) {
103+
m_pending.pop();
104+
} else {
105+
return false;
106+
}
107+
}
108+
return true;
109+
}
110+
111+
void send(const T& item)
112+
{
113+
// try send pending first
114+
bool ok = sendPending();
115+
if (ok) {
116+
ok = m_queue.tryPush(item);
117+
}
118+
119+
if (!ok) {
120+
m_pending.push(item);
121+
}
122+
}
123+
124+
void onMessage(const std::function<void(const T&)>& handler)
125+
{
126+
m_handler = handler;
127+
}
128+
};
129+
}

src/framework/global/tests/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,8 @@ set(MODULE_TEST_SRC
5656
)
5757

5858
include(SetupGTest)
59+
60+
target_sources(muse_global_tests
61+
PRIVATE
62+
rpcqueue_tests.cpp
63+
)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* SPDX-License-Identifier: GPL-3.0-only
3+
* MuseScore-CLA-applies
4+
*
5+
* MuseScore
6+
* Music Composition & Notation
7+
*
8+
* Copyright (C) 2025 MuseScore Limited and others
9+
*
10+
* This program is free software: you can redistribute it and/or modify
11+
* it under the terms of the GNU General Public License version 3 as
12+
* published by the Free Software Foundation.
13+
*
14+
* This program is distributed in the hope that it will be useful,
15+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
* GNU General Public License for more details.
18+
*
19+
* You should have received a copy of the GNU General Public License
20+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
21+
*/
22+
23+
#include <thread>
24+
#include <chrono>
25+
26+
#include <gtest/gtest.h>
27+
28+
#include "global/concurrency/rpcqueue.h"
29+
30+
using namespace muse;
31+
32+
class Global_Concurrency_RpcQueueTests : public ::testing::Test
33+
{
34+
public:
35+
};
36+
37+
struct Msg {
38+
int val = 0;
39+
};
40+
41+
TEST_F(Global_Concurrency_RpcQueueTests, Communication)
42+
{
43+
RpcQueue<Msg> q;
44+
45+
const int MSG_COUNT = 400;
46+
47+
auto port1 = q.port1();
48+
auto port2 = q.port2();
49+
50+
auto t1 = std::thread([&port1, MSG_COUNT]() {
51+
int successCount = 0;
52+
53+
port1->onMessage([&successCount](const Msg& m) {
54+
EXPECT_EQ(m.val, successCount);
55+
successCount++;
56+
});
57+
58+
for (int i = 0; i < MSG_COUNT; ++i) {
59+
Msg m { i };
60+
port1->send(m);
61+
}
62+
63+
int iteration = 0;
64+
while (iteration < 1000) { // anti freeze
65+
++iteration;
66+
67+
port1->process();
68+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
69+
70+
if (successCount == MSG_COUNT) {
71+
break;
72+
}
73+
}
74+
75+
EXPECT_EQ(successCount, MSG_COUNT);
76+
});
77+
78+
auto t2 = std::thread([&port2, MSG_COUNT]() {
79+
int iteration = 0;
80+
int successCount = 0;
81+
82+
port2->onMessage([&successCount](const Msg& m) {
83+
EXPECT_EQ(m.val, successCount);
84+
successCount++;
85+
});
86+
87+
for (int i = 0; i < MSG_COUNT; ++i) {
88+
Msg m { i };
89+
port2->send(m);
90+
}
91+
92+
while (iteration < 1000) { // anti freeze
93+
++iteration;
94+
95+
port2->process();
96+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
97+
98+
if (successCount == MSG_COUNT) {
99+
break;
100+
}
101+
}
102+
103+
EXPECT_EQ(successCount, MSG_COUNT);
104+
});
105+
106+
t1.join();
107+
t2.join();
108+
}

0 commit comments

Comments
 (0)