@@ -95,6 +95,7 @@ class ConnectionManagerTest : public CppUnit::TestFixture
95
95
void testDeclineICERequest ();
96
96
void testChannelRcvShutdown ();
97
97
void testChannelSenderShutdown ();
98
+ void testMultiChannelShutdown ();
98
99
void testCloseConnectionWith ();
99
100
void testShutdownCallbacks ();
100
101
void testFloodSocket ();
@@ -124,6 +125,7 @@ class ConnectionManagerTest : public CppUnit::TestFixture
124
125
CPPUNIT_TEST (testAcceptsICERequest);
125
126
CPPUNIT_TEST (testChannelRcvShutdown);
126
127
CPPUNIT_TEST (testChannelSenderShutdown);
128
+ CPPUNIT_TEST (testMultiChannelShutdown);
127
129
CPPUNIT_TEST (testCloseConnectionWith);
128
130
CPPUNIT_TEST (testShutdownCallbacks);
129
131
CPPUNIT_TEST (testFloodSocket);
@@ -887,6 +889,111 @@ ConnectionManagerTest::testChannelSenderShutdown()
887
889
CPPUNIT_ASSERT (scv.wait_for (lk, 30s, [&] { return shutdownReceived; }));
888
890
}
889
891
892
+ void
893
+ ConnectionManagerTest::testMultiChannelShutdown ()
894
+ {
895
+ std::condition_variable cv;
896
+ size_t connectedCbCount = 0 ;
897
+ size_t successfullyConnected = 0 ;
898
+ size_t accepted = 0 ;
899
+ size_t shutdownCount = 0 ;
900
+ std::atomic_bool connected = false ;
901
+ std::set<std::shared_ptr<MultiplexedSocket>> sockets;
902
+ bool shut = true ;
903
+
904
+ bob->connectionManager ->onICERequest ([](const DeviceId&) { return true ; });
905
+
906
+ bob->connectionManager ->onChannelRequest ([&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
907
+ if (name.empty ()) return false ;
908
+ std::lock_guard lk {mtx};
909
+ accepted++;
910
+ cv.notify_one ();
911
+ return true ;
912
+ });
913
+
914
+ bob->connectionManager ->onConnectionReady ([&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
915
+ if (not socket or name.empty ()) return ;
916
+ socket->setOnRecv ([rxbuf = std::make_shared<std::vector<uint8_t >>(), w = std::weak_ptr (socket)](const uint8_t * data, size_t size) {
917
+ rxbuf->insert (rxbuf->end (), data, data + size);
918
+ if (rxbuf->size () == 32 ) {
919
+ if (auto socket = w.lock ()) {
920
+ std::error_code ec;
921
+ socket->write (rxbuf->data (), rxbuf->size (), ec);
922
+ CPPUNIT_ASSERT (!ec);
923
+ socket->shutdown ();
924
+ }
925
+ }
926
+ return size;
927
+ });
928
+ std::lock_guard lk {mtx};
929
+ sockets.emplace (socket->underlyingSocket ());
930
+ });
931
+
932
+ auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
933
+ {
934
+ std::lock_guard lk {mtx};
935
+ connectedCbCount++;
936
+ if (socket)
937
+ successfullyConnected++;
938
+ cv.notify_one ();
939
+ }
940
+ if (socket) {
941
+ auto data_sent = dht::PkId::get (socket->name ());
942
+ socket->setOnRecv ([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t >>()](const uint8_t * data, size_t size) {
943
+ rxbuf->insert (rxbuf->end (), data, data + size);
944
+ if (rxbuf->size () == 32 ) {
945
+ CPPUNIT_ASSERT (!std::memcmp (data_sent.data (), rxbuf->data (), data_sent.size ()));
946
+ }
947
+ return size;
948
+ });
949
+ socket->onShutdown ([&]() {
950
+ std::lock_guard lk {mtx};
951
+ shutdownCount++;
952
+ cv.notify_one ();
953
+ });
954
+ connected = true ;
955
+ std::error_code ec;
956
+ socket->write (data_sent.data (), data_sent.size (), ec);
957
+ CPPUNIT_ASSERT (!ec);
958
+ }
959
+ };
960
+
961
+ // max supported number of channels per side (64k - 2 reserved channels)
962
+ static constexpr size_t N = 1024 * 48 - 1 ;
963
+
964
+ for (size_t i = 1 ; i <= N; ++i) {
965
+ alice->connectionManager ->connectDevice (bob->id .second ,
966
+ fmt::format (" git://{}" , i),
967
+ onConnect);
968
+
969
+ if (i % 128 == 0 )
970
+ std::this_thread::sleep_for (15ms);
971
+ if (i % 1000 == 0 ) {
972
+ if (shut && connected.exchange (false )) {
973
+ shut = false ;
974
+ decltype (sockets)::node_type toClose;
975
+ {
976
+ std::lock_guard lk {mtx};
977
+ toClose = sockets.extract (sockets.begin ());
978
+ sockets.clear ();
979
+ }
980
+ fmt::print (" Closing connections {} - {}\n " , i, fmt::ptr (toClose.value ()));
981
+ toClose.value ()->shutdown ();
982
+ }
983
+ }
984
+ }
985
+
986
+ std::unique_lock lk {mtx};
987
+ cv.wait_for (lk, 30s, [&] { return connectedCbCount == N; });
988
+ CPPUNIT_ASSERT_EQUAL (N, connectedCbCount);
989
+ CPPUNIT_ASSERT_EQUAL (N, successfullyConnected);
990
+ CPPUNIT_ASSERT (successfullyConnected <= accepted);
991
+ CPPUNIT_ASSERT (accepted < 2 * successfullyConnected);
992
+ cv.wait_for (lk, 60s, [&] { return shutdownCount == successfullyConnected; });
993
+ CPPUNIT_ASSERT_EQUAL (successfullyConnected, shutdownCount);
994
+ lk.unlock ();
995
+ }
996
+
890
997
void
891
998
ConnectionManagerTest::testCloseConnectionWith ()
892
999
{
0 commit comments