diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bb5dd64fe..62a0f3679 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1673,6 +1673,26 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i vector broken; + // Return value rules: + // In non-blocking mode: + // - return Socket ID, if: + // - you requested only one connection in this call + // In blocking mode: + // - return Socket ID, if: + // - you requested only one connection in this call + // - you connect a group that was not connected yet + // - otherwise return 0 + + // Leave the last SID value in retval if you had only one + // connection to start. Otherwise override it with 0. + if (arraysize > 1) + retval = 0; + + // For blocking mode only, and only in case when the group + // was not yet connected, this retval could be overridden + // again with the first ready socket ID, and this socket ID + // will be returned. + while (block_new_opened) { if (spawned.empty()) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..afb772d63 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -4951,6 +4951,11 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, s->m_Status = SRTS_CONNECTED; // acknowledde any waiting epolls to write + // This must be done AFTER the group member status is upgraded to IDLE because + // this state change will trigger the waiting function in blocking-mode groupConnect + // and this may be immediately followed by exit from connect and start sending function, + // which must see this very link already as IDLE, not PENDING, which will make this + // link unable to be used and therefore the sending call would fail. uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_CONNECT, true); CGlobEvent::triggerEvent(); @@ -5541,6 +5546,12 @@ void * srt::CUDT::tsbpd(void* param) gkeeper.group->updateLatestRcv(self->m_parent); } } + + // After re-acquisition of the m_RecvLock, re-check the closing flag + if (self->m_bClosing) + { + break; + } #endif CGlobEvent::triggerEvent(); tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for. @@ -5582,15 +5593,38 @@ void * srt::CUDT::tsbpd(void* param) */ HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); self->m_bTsbPdNeedsWakeup = true; - THREAD_PAUSED(); - tsbpd_cc.wait(); - THREAD_RESUMED(); - } - HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - " - << "NOW=" << FormatTime(steady_clock::now())); + bWokeUpOnSignal = false; + while (!bWokeUpOnSignal) + { + // For safety reasons, do wakeup once per 1/8s and re-check the flag. + // This should be enough long time that during a normal transmission + // the TSBPD thread would be woken up much earlier when required by + // ACK per ACK timer (at most 10ms since the last check) and in case + // when this might result in a deadlock, it would only hold up to 125ms, + // which should be little harmful for the application. NOTE THAT THIS + // IS A SANITY CHECK FOR A SITUATION THAT SHALL NEVER HAPPEN. + THREAD_PAUSED(); + bWokeUpOnSignal = tsbpd_cc.wait_for(milliseconds_from(125)); + THREAD_RESUMED(); + if (self->m_bClosing && !bWokeUpOnSignal) + { + HLOGC(tslog.Debug, log << "tsbpd: IPE: Closing flag set in the meantime of waiting. Continue to EXIT"); + + // This break doesn't have to be done in case when signaled + // because if so this current loop will be interrupted anyway, + // and the outer loop will be terminated at the check of self->m_bClosing. + // This is only a sanity check. + break; + } + } + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - " + << "NOW=" << FormatTime(steady_clock::now())); + + } } + THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; @@ -6328,7 +6362,7 @@ bool srt::CUDT::closeInternal() ATR_NOEXCEPT // Inform the threads handler to stop. m_bClosing = true; - HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock"); + HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock"); ScopedLock connectguard(m_ConnectionLock); @@ -7824,6 +7858,11 @@ void srt::CUDT::destroySynch() void srt::CUDT::releaseSynch() { SRT_ASSERT(m_bClosing); + if (!m_bClosing) + { + HLOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!"); + m_bClosing = true; + } // wake up user calls CSync::lock_notify_one(m_SendBlockCond, m_SendBlockLock); @@ -9931,7 +9970,7 @@ void srt::CUDT::processClose() m_bBroken = true; m_iBrokenCounter = 60; - HLOGP(smlog.Debug, "processClose: sent message and set flags"); + HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags"); if (m_bTsbPd) { @@ -11683,6 +11722,7 @@ void srt::CUDT::checkTimers() void srt::CUDT::updateBrokenConnection() { + HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events"); m_bClosing = true; releaseSynch(); // app can call any UDT API to learn the connection_broken error diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 8cd8440c7..d92157a16 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -896,7 +896,7 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const if (!pwait) { // As this is mapped in the socket's data, it should be impossible. - LOGC(eilog.Error, log << "epoll/update: IPE: update struck E" + HLOGC(eilog.Debug, log << "epoll/update: IPE: update struck E" << (*i) << " which is NOT SUBSCRIBED to @" << uid); continue; } diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 5601cdeee..c899cd130 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -569,6 +569,7 @@ void CUDTGroup::deriveSettings(CUDT* u) #undef IMF } +// XXX This function is likely of no use now. bool CUDTGroup::applyFlags(uint32_t flags, HandshakeSide) { const bool synconmsg = IsSet(flags, SRT_GFLAG_SYNCONMSG); @@ -1388,7 +1389,26 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) // { send_CheckBrokenSockets() - if (!pendingSockets.empty()) + // Make an extra loop check to see if we could be + // in a condition of "all sockets either blocked or pending" + + int nsuccessful = 0; // number of successfully connected sockets + int nblocked = 0; // number of sockets blocked in connection + bool is_pending_blocked = false; + for (vector::iterator is = sendstates.begin(); is != sendstates.end(); ++is) + { + if (is->stat != -1) + { + nsuccessful++; + } + // is->stat == -1 + else if (is->code == SRT_EASYNCSND) + { + ++nblocked; + } + } + + if (!pendingSockets.empty() || nblocked) { HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets, polling them."); @@ -1405,12 +1425,24 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) } else { + int swait_timeout = 0; + + // There's also a hidden condition here that is the upper if condition. + is_pending_blocked = (nsuccessful == 0); + + // If this is the case when + if (m_bSynSending && is_pending_blocked) + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: will block for " << m_iSndTimeOut << " - waiting for any writable in blocking mode"); + swait_timeout = m_iSndTimeOut; + } + { InvertedLock ug(m_GroupLock); THREAD_PAUSED(); m_Global.m_EPoll.swait( - *m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened + *m_SndEpolld, (sready), swait_timeout, false /*report by retval*/); // Just check if anything happened THREAD_RESUMED(); } @@ -1423,6 +1455,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) HLOGC(gslog.Debug, log << "grp/sendBroadcast: RDY: " << DisplayEpollResults(sready)); // sockets in EX: should be moved to wipeme. + // IMPORTANT: we check only PENDING sockets (not blocked) because only + // pending sockets might report ERR epoll without being explicitly broken. + // Sockets that did connect and just have buffer full will be always broken, + // if they're going to report ERR in epoll. for (vector::iterator i = pendingSockets.begin(); i != pendingSockets.end(); ++i) { if (CEPoll::isready(sready, *i, SRT_EPOLL_ERR)) @@ -1434,6 +1470,9 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) int no_events = 0; m_Global.m_EPoll.update_usock(m_SndEID, *i, &no_events); } + + if (CEPoll::isready(sready, *i, SRT_EPOLL_OUT)) + is_pending_blocked = false; } // After that, all sockets that have been reported @@ -1450,7 +1489,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) if (m_bClosing) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - send_CloseBrokenSockets(wipeme); + // Just for a case, when a socket that was blocked or pending + // had switched to write-enabled, + + send_CloseBrokenSockets((wipeme)); // wipeme will be cleared by this function // Re-check after the waiting lock has been reacquired if (m_bClosing) @@ -1712,9 +1754,18 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) if (none_succeeded) { - HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)"); m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false); - m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); + if (!m_bSynSending && (is_pending_blocked || was_blocked)) + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: no links are ready for sending"); + ercode = SRT_EASYNCSND; + } + else + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)"); + m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); + } + // Reparse error code, if set. // It might be set, if the last operation was failed. // If any operation succeeded, this will not be executed anyway. @@ -3628,6 +3679,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) { if (len <= 0) { + LOGC(gslog.Error, log << "grp/send(backup): negative length: " << len); throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); } @@ -3647,6 +3699,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) if (m_bClosing) { leaveCS(m_Global.m_GlobControlLock); + LOGC(gslog.Error, log << "grp/send(backup): Cannot send, connection lost!"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 0e48c8a04..a37da250a 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -1,15 +1,18 @@ + #include #include #include #include -#include - #include "gtest/gtest.h" #include "test_env.h" #include "srt.h" +#include "udt.h" +#include "common.h" #include "netinet_any.h" +#include "apputil.hpp" + TEST(Bonding, SRTConnectGroup) { srt::TestInit srtinit; @@ -122,11 +125,22 @@ void listening_thread(bool should_read) // srt_accept.. } -void ConnectCallback(void* /*opaq*/, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) +SRTSOCKET g_listen_socket = -1; +int g_nconnected = 0; +int g_nfailed = 0; + +// This ConnectCallback is mainly informative, but it also collects the +// number of succeeded and failed links. +void ConnectCallback(void* , SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) { std::cout << "Connect callback. Socket: " << sock << ", error: " << error << ", token: " << token << '\n'; + + if (error == SRT_SUCCESS) + ++g_nconnected; + else + ++g_nfailed; } TEST(Bonding, NonBlockingGroupConnect) @@ -376,7 +390,7 @@ TEST(Bonding, Options) #endif int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR); started = true; @@ -413,7 +427,7 @@ TEST(Bonding, Options) } // Now the thread is accepting, so we call the connect. - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); SRTSOCKET member = srt_connect(grp, sa.get(), sa.size()); // We've released the mutex and signaled the CV, so accept should proceed now. @@ -507,7 +521,7 @@ TEST(Bonding, InitialFailure) int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 5), SRT_ERROR); @@ -564,3 +578,927 @@ TEST(Bonding, InitialFailure) srt_close(lsn); } + + +// General idea: +// This should try to connect to two nonexistent links, +// the connecting function (working in blocking mode) +// should exit with error, after the group has been closed +// in a separate thread. +// +// Steps: +// 1. Create group +// 2. Use a nonexistent endpoints 192.168.1.237:4200 and *:4201 +// 3. Close the group in a thread +// 4. Wait for error +TEST(Bonding, ConnectBlind) +{ + struct sockaddr_in sa; + + srt_startup(); + + const int ss = srt_create_group(SRT_GTYPE_BROADCAST); + ASSERT_NE(ss, SRT_ERROR); + + std::vector targets; + for (int i = 0; i < 2; ++i) + { + sa.sin_family = AF_INET; + sa.sin_port = htons(4200 + i); + ASSERT_EQ(inet_pton(AF_INET, "192.168.1.237", &sa.sin_addr), 1); + + const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(NULL, (struct sockaddr*)&sa, sizeof sa); + targets.push_back(gd); + } + + std::future closing_promise = std::async(std::launch::async, [](int ss) { + std::this_thread::sleep_for(std::chrono::seconds(2)); + std::cerr << "Closing group" << std::endl; + srt_close(ss); + }, ss); + + std::cout << "srt_connect_group calling " << std::endl; + const int st = srt_connect_group(ss, targets.data(), targets.size()); + std::cout << "srt_connect_group returned " << st << std::endl; + + closing_promise.wait(); + EXPECT_EQ(st, -1); + + // Delete config objects before prospective exception + for (auto& gd: targets) + srt_delete_config(gd.config); + + int res = srt_close(ss); + if (res == SRT_ERROR) + { + std::cerr << "srt_close: " << srt_getlasterror_str() << std::endl; + } + + srt_cleanup(); +} + +// TEST IDEA: +// This uses srt_connect_group in non-blocking mode. The listener +// is also created to respond to the connection. Expected is to +// continue the connecting in background and report a success, +// and report the epoll IN on listener for the first connection, +// and UPDATE For the second one. +// +// TEST STEPS: +// 1. Create a listener socket and a group. +// 2. Set the group and the listener socket non-blocking mode +// 3. Start the accepting thread +// - wait for IN event ready on the listener socket +// - accept a connection +// - wait for UPDATE event ready on the listener socket +// - wait for any event up to 5s (possibly ERR) +// - close the listener socket +// 4. Prepare two connections and start connecting +// 5. Wait for the OUT readiness event on the group +// 6. Close the group. +// 7. Join the thread +TEST(Bonding, ConnectNonBlocking) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + const string ADDR = "127.0.0.1"; + const int PORT = 4209; + + // NOTE: Add more group types, if implemented! + vector types { SRT_GTYPE_BROADCAST, SRT_GTYPE_BACKUP }; + + for (const auto GTYPE: types) + { + g_listen_socket = srt_create_socket(); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + ASSERT_EQ(inet_pton(AF_INET, ADDR.c_str(), &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(PORT); + + ASSERT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + ASSERT_NE(srt_listen(g_listen_socket, 5), -1); + + int lsn_eid = srt_epoll_create(); + int lsn_events = SRT_EPOLL_IN | SRT_EPOLL_ERR | SRT_EPOLL_UPDATE; + srt_epoll_add_usock(lsn_eid, g_listen_socket, &lsn_events); + + // Caller part + + const int ss = srt_create_group(GTYPE); + ASSERT_NE(ss, SRT_ERROR); + std::cout << "Created group socket: " << ss << '\n'; + + int no = 0; + ASSERT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode + ASSERT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode + + const int poll_id = srt_epoll_create(); + // Will use this epoll to wait for srt_accept(...) + const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR; + ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(PORT); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + //srt_setloglevel(LOG_DEBUG); + + auto acthr = std::thread([&lsn_eid]() { + SRT_EPOLL_EVENT ev[3]; + + cout << "[A] Waiting for accept\n"; + + // This can wait in infinity; worst case it will be killed in process. + int uwait_res = srt_epoll_uwait(lsn_eid, ev, 3, -1); + ASSERT_EQ(uwait_res, 1); + ASSERT_EQ(ev[0].fd, g_listen_socket); + + // Check if the IN event is set, even if it's not the only event + ASSERT_EQ(ev[0].events & SRT_EPOLL_IN, SRT_EPOLL_IN); + bool have_also_update = ev[0].events & SRT_EPOLL_UPDATE; + + sockaddr_any adr; + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + if (have_also_update) + { + cout << "[A] NOT waiting for update - already reported previously\n"; + } + else + { + cout << "[A] Waiting for update\n"; + // Now another waiting is required and expected the update event + uwait_res = srt_epoll_uwait(lsn_eid, ev, 3, -1); + ASSERT_EQ(uwait_res, 1); + ASSERT_EQ(ev[0].fd, g_listen_socket); + ASSERT_EQ(ev[0].events, SRT_EPOLL_UPDATE); + } + + cout << "[A] Waitig for close (up to 5s)\n"; + // Wait up to 5s for an error + srt_epoll_uwait(lsn_eid, ev, 3, 5000); + + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + cout << "Connecting two sockets\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[1] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + + ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR); + + int result = srt_connect_group(ss, cc, 2); + ASSERT_EQ(result, 0); + char data[4] = { 1, 2, 3, 4}; + int wrong_send = srt_send(ss, data, sizeof data); + int errorcode = srt_getlasterror(NULL); + EXPECT_EQ(wrong_send, -1); + EXPECT_EQ(errorcode, SRT_EASYNCSND) << "REAL ERROR: " << srt_getlasterror_str(); + + // Wait up to 2s + SRT_EPOLL_EVENT ev[3]; + const int uwait_result = srt_epoll_uwait(poll_id, ev, 3, 2000); + std::cout << "Returned from connecting two sockets " << std::endl; + + EXPECT_EQ(uwait_result, 1); // Expect the group reported + EXPECT_EQ(ev[0].fd, ss); + + // One second to make sure that both links are connected. + this_thread::sleep_for(seconds(1)); + + EXPECT_EQ(srt_close(ss), 0); + acthr.join(); + + srt_epoll_release(lsn_eid); + srt_epoll_release(poll_id); + + srt_close(g_listen_socket); + } + +} + +// TEST IDEA: +// In this test there is created a working listener socket to +// accept the connection, and we use a Backup-type group with +// two links, but different weights. We connect them both and +// make sure that both are ready for use. Then we send a packet +// over the group and see, which link got activated and which +// remained idle. Expected is to have the link with higher +// priority (greater weight) to be activated. +// +// TEST STEPS: +// 1. Create a listener socket and a group. +// 3. Start the accepting thread +// - accept a connection +// - read one packet from the accepted entity +// - close the listener socket +// 4. Prepare two connections (one with weight=1) and connect the group +// 5. Wait for having all links connected +// 6. Send one packet and check which link was activated +// 6. Close the group. +// 7. Join the thread +TEST(Bonding, BackupPriorityBegin) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + g_nconnected = 0; + g_nfailed = 0; + + g_listen_socket = srt_create_socket(); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(4200); + + ASSERT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + ASSERT_NE(srt_listen(g_listen_socket, 5), -1); + + // Caller part + + const int ss = srt_create_group(SRT_GTYPE_BACKUP); + ASSERT_NE(ss, SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(4200); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + auto acthr = std::thread([]() { + sockaddr_any adr; + cout << "[A] Accepting a connection...\n"; + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + SRT_SOCKGROUPDATA gdata[2]; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + long long data[1320/8]; + + cout << "[A] Receiving...\n"; + int ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + ASSERT_EQ(ds, 8); + + cout << "[A] Closing\n"; + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + cout << "Connecting two sockets\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 0; + cc[1] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[1].token = 1; + cc[1].weight = 1; // higher than the default 0 + + int result = srt_connect_group(ss, cc, 2); + ASSERT_GT(result, 0); // blocking mode, first connection = returns Socket ID + + // Make sure both links are connected + SRT_SOCKGROUPDATA gdata[2]; + size_t psize = 2; + size_t nwait = 10; + cout << "Waiting for getting 2 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 2) + { + int l1, l2; + l1 = gdata[0].memberstate; + l2 = gdata[1].memberstate; + + if (l1 > SRT_GST_PENDING && l2 > SRT_GST_PENDING) + { + cout << "Both up: [0]=" << l1 << " [1]=" << l2 << "\n"; + break; + } + else + { + cout << "Still link states [0]=" << l1 << " [1]=" << l2 << "\n"; + } + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + ASSERT_NE(nwait, 0); + + // Now send one packet + long long data = 0x1234123412341234; + + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + // This call should retrieve the group information + // AFTER the transition has happened + int sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, sizeof data); + + // So, let's check which link is in RUNNING state + // TOKEN value is the index in cc array, and we should + // also have the weight value there. + + SRT_SOCKGROUPDATA* mane, * backup; + if (gdata[0].weight == 0) + { + backup = &gdata[0]; + mane = &gdata[1]; + } + else + { + mane = &gdata[0]; + backup = &gdata[1]; + } + + cout << "MAIN:[" << mane->token << "] weight=" << mane->weight << endl; + cout << "BACKUP:[" << backup->token << "] weight=" << backup->weight << endl; + + // Ok, now mane link should be active, backup idle + EXPECT_EQ(mane->memberstate, SRT_GST_RUNNING); + EXPECT_EQ(backup->memberstate, SRT_GST_IDLE); + + acthr.join(); +} + + +// TEST IDEA: +// In this test there is created a working listener socket to +// accept the connection, and we use a Backup-type group with +// two links, but different weights. We connect the first link +// with less weight and send one packet to make sure this only +// link was activated. Then we connect a second link with weight=1. +// Then we send the packet again and see if the new link was +// immediately activated. The first link should be silenced after +// time, but there's no possibility to check this in such a +// simple test. +// +// TEST STEPS: +// 1. Create a listener socket and a group. +// 3. Start the accepting thread +// - accept a connection +// - read one packet from the accepted entity +// - read the second packet from the accepted entity +// - close the listener socket +// 4. Prepare one connection with weight=0 and connect the group +// 5. Send a packet to enforce activation of one link +// 6. Prepare another connection with weight=1 and connect the group +// 7. Send a packet +// 8. Check member status - both links should be running. +// 9. Close the group. +// 10. Join the thread +TEST(Bonding, BackupPriorityTakeover) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + g_nconnected = 0; + g_nfailed = 0; + + g_listen_socket = srt_create_socket(); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(4200); + + ASSERT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + ASSERT_NE(srt_listen(g_listen_socket, 5), -1); + + // Caller part + + const int ss = srt_create_group(SRT_GTYPE_BACKUP); + ASSERT_NE(ss, SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(4200); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + auto acthr = std::thread([]() { + sockaddr_any adr; + cout << "[A] Accepting a connection...\n"; + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + SRT_SOCKGROUPDATA gdata[2]; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + long long data[1320/8]; + + cout << "[A] Receiving 1...\n"; + int ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + ASSERT_EQ(ds, 8); + + cout << "[A] Receiving 2...\n"; + ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + ASSERT_EQ(ds, 8); + + // To make it possible that the state is checked before it is closed. + this_thread::sleep_for(seconds(1)); + + cout << "[A] Closing\n"; + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + cout << "Connecting first link weight=0:\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 0; + + int result = srt_connect_group(ss, cc, 1); + ASSERT_GT(result, 0); // connect with only one element returns socket ID + + // As we have one link, after `srt_connect_group` returns, we have + // this link now connected. Send one data portion. + + SRT_SOCKGROUPDATA gdata[2]; + + long long data = 0x1234123412341234; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + cout << "Sending (1)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + int sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, sizeof data); + ASSERT_EQ(mc.grpdata_size, 1); + EXPECT_EQ(gdata[0].memberstate, SRT_GST_RUNNING); + + cout << "Connecting second link weight=1:\n"; + // Now prepare the second connection + cc[0].token = 1; + cc[0].weight = 1; // higher than the default 0 + result = srt_connect_group(ss, cc, 1); + ASSERT_GT(result, 0); // connect with only one element returns socket ID + + // Make sure both links are connected + size_t psize = 2; + size_t nwait = 10; + cout << "Waiting for getting 2 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 2) + { + int l1, l2; + l1 = gdata[0].memberstate; + l2 = gdata[1].memberstate; + + if (l1 > SRT_GST_PENDING && l2 > SRT_GST_PENDING) + { + cout << "Both up: [0]=" << l1 << " [1]=" << l2 << "\n"; + break; + } + else + { + cout << "Still link states [0]=" << l1 << " [1]=" << l2 << "\n"; + } + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + ASSERT_NE(nwait, 0); + + // Now send one packet (again) + mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + cout << "Sending (2)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, sizeof data); + + // So, let's check which link is in RUNNING state + // TOKEN value is the index in cc array, and we should + // also have the weight value there. + + SRT_SOCKGROUPDATA* mane, * backup; + if (gdata[0].weight == 0) + { + backup = &gdata[0]; + mane = &gdata[1]; + } + else + { + mane = &gdata[0]; + backup = &gdata[1]; + } + + cout << "MAIN:[" << mane->token << "] weight=" << mane->weight << endl; + cout << "BACKUP:[" << backup->token << "] weight=" << backup->weight << endl; + + // Ok, now both links should be running (this state lasts + // for the "temporary activation" period. + EXPECT_EQ(mane->memberstate, SRT_GST_RUNNING); + EXPECT_EQ(backup->memberstate, SRT_GST_RUNNING); + + acthr.join(); +} + + +// TEST IDEA: +// In this test there is created a working listener socket to +// accept the connection, and we use a Backup-type group with +// two links, but different weights. We connect then two links +// both with weight=1. Then we send a packet to make sure that +// exactly one of them got activated. Then we connect another +// link with weight=0. Then we send a packet again, which should +// not change the link usage. Then we check which link was +// active so far, and we close the socket for that link to make +// it broken, then we wait for having only two links connected. +// Then a packet is sent to activate a link. We expect the link +// with higher weight is activated. +// +// TEST STEPS: +// 1. Create a listener socket. +// 2. Create and setup a group. +// 3. Start the accepting thread +// A1. accept a connection +// A2. read one packet from the accepted entity +// A3. read the second packet from the accepted entity +// A4. read the third packet from the accepted entity +// A5. close the listener socket +// 4. Prepare two connections with weight=1 and connect the group +// 5. Send a packet to enforce activation of one link +// 6. Prepare another connection with weight=0 and connect the group +// 7. Wait for having all 3 links connected. +// 8. Send a packet +// 9. Find which link is currently active and close it +// 10. Wait for having only two links. +// 11. Send a packet. +// 12. Find one link active and one idle +// 13. Check if the link with weight=1 is active and the one with weight=0 is idle. +// 14. Close the group. +// 15. Join the thread +TEST(Bonding, BackupPrioritySelection) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + g_nconnected = 0; + g_nfailed = 0; + volatile bool recvd = false; + + // 1. + g_listen_socket = srt_create_socket(); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(4200); + + ASSERT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + ASSERT_NE(srt_listen(g_listen_socket, 5), -1); + + // Caller part + // 2. + const int ss = srt_create_group(SRT_GTYPE_BACKUP); + ASSERT_NE(ss, SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + // Set the group's stability timeout to 1s, otherwise it will + // declare the links unstable by not receiving ACKs. + int stabtimeo = 1000; + srt_setsockflag(ss, SRTO_GROUPMINSTABLETIMEO, &stabtimeo, sizeof stabtimeo); + + //srt_setloglevel(LOG_DEBUG); + srt::resetlogfa( std::set { + SRT_LOGFA_GRP_SEND, + SRT_LOGFA_GRP_MGMT, + SRT_LOGFA_CONN + }); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(4200); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + // 3. + auto acthr = std::thread([&recvd]() { + sockaddr_any adr; + cout << "[A1] Accepting a connection...\n"; + + // A1 + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + SRT_SOCKGROUPDATA gdata[2]; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + long long data[1320/8]; + + // A2 + cout << "[A2] Receiving 1...\n"; + int ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + if (ds == -1) { cout << "[A2] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + ASSERT_EQ(ds, 8); + + // A3 + cout << "[A3] Receiving 2...\n"; + ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + if (ds == -1) { cout << "[A3] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + ASSERT_EQ(ds, 8); + recvd = true; + + // A4 + cout << "[A4] Receiving 3...\n"; + ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + if (ds == -1) { cout << "[A4] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + ASSERT_EQ(ds, 8); + + cout << "[A] Waiting 5s...\n"; + // To make it possible that the state is checked before it is closed. + this_thread::sleep_for(seconds(5)); + + // A5 + cout << "[A5] Closing\n"; + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + + cout << "(4) Connecting first 2 links weight=1:\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 0; + cc[0].weight = 1; + cc[1] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[1].token = 1; + cc[1].weight = 1; + + // 4. + int result = srt_connect_group(ss, cc, 2); + ASSERT_GT(result, 0); // BLOCKING MODE, always returns the socket value + + // As we have one link, after `srt_connect_group` returns, we have + // this link now connected. Send one data portion. + + SRT_SOCKGROUPDATA gdata[3]; + memset(gdata, 0, sizeof(gdata)); + + long long data = 0x1234123412341234; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 3; + + // We can send now. We know that we have at least one + // link connected and it already has the same priority + // as the other. + + //srt_setloglevel(LOG_DEBUG); + // 5. + cout << "(5) Sending (1)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + int sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + // In case when this was an error, display the code + if (sendret == -1) { cout << "[A4] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + + EXPECT_EQ(sendret, sizeof data); + + + ASSERT_EQ(mc.grpdata_size, 2); + + int state0 = gdata[0].memberstate; + int state1 = gdata[1].memberstate; + + cout << "States: [0]=" << state0 << " [1]=" << state1 << endl; + EXPECT_TRUE(state0 == SRT_GST_RUNNING || state1 == SRT_GST_RUNNING); + + // 6. + cout << "(6) Connecting third link weight=0:\n"; + // Now prepare the third connection + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 2; + cc[0].weight = 0; // higher than the default 0 + result = srt_connect_group(ss, cc, 1); + ASSERT_GE(result, 0); // ONE connection only - will return socket id + + // Make sure all 3 links are connected + size_t psize = 3; + size_t nwait = 10; + set states; + + // 7. + cout << "(7) Waiting for getting 3 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 3) + { + states.clear(); + for (int i = 0; i < 3; ++i) + states.insert(gdata[i].memberstate); + + if (states.count(SRT_GST_PENDING)) + { + cout << "Still not all links...\n"; + } + else + { + cout << "All links up\n"; + break; + } + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + ASSERT_NE(nwait, 0); + + // Now send one packet (again) + mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 3; + + // 8. + cout << "(8) Sending (2)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, sizeof data); + ASSERT_EQ(mc.grpdata_size, 3); + + // So, let's check which link is in RUNNING state + // TOKEN value is the index in cc array, and we should + // also have the weight value there. + + SRT_SOCKGROUPDATA* mane = nullptr; + + for (size_t i = 0; i < mc.grpdata_size; ++i) + { + if (gdata[i].memberstate == SRT_GST_RUNNING) + { + mane = &gdata[i]; + break; + } + } + + ASSERT_NE(mane, nullptr); + ASSERT_EQ(mane->weight, 1); + + // Spin-wait for making sure the reception succeeded before + // closing. This shouldn't be a problem in general, but + int ntry = 100; + while (!recvd && --ntry) + this_thread::sleep_for(milliseconds(200)); + ASSERT_NE(ntry, 0); + + cout << "(9) Found activated link: [" << mane->token << "] - closing after 0.5s...\n"; + + // Waiting is to make sure that the listener thread has received packet 3. + this_thread::sleep_for(milliseconds(500)); + ASSERT_NE(srt_close(mane->id), -1); + + // Now expect to have only 2 links, wait for it if needed. + psize = 2; + nwait = 10; + + cout << "(10) Waiting for ONLY 2 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 2) + { + break; + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + ASSERT_NE(nwait, 0); + + // Now send one packet (again) + mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + cout << "(11) Sending (3)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, sizeof data); + + cout << "(sleep)\n"; + this_thread::sleep_for(seconds(1)); + + mane = nullptr; + SRT_SOCKGROUPDATA* backup = nullptr; + cout << "(12) Checking main/backup:"; + int repeat_check = 1; // 50; +CheckLinksAgain: + for (size_t i = 0; i < mc.grpdata_size; ++i) + { + cout << "[" << i << "]" << srt_logging::MemberStatusStr(gdata[i].memberstate) + << " weight=" << gdata[i].weight; + if (gdata[i].memberstate == SRT_GST_RUNNING) + { + cout << " (main) "; + mane = &gdata[i]; + } + else + { + cout << " (backup) "; + backup = &gdata[i]; + } + } + if (backup == nullptr) + { + if (--repeat_check) + { + cout << "BACKUP STILL RUNNING. AGAIN\n"; + this_thread::sleep_for(milliseconds(250)); + goto CheckLinksAgain; + } + } + cout << endl; + + ASSERT_NE(mane, nullptr); + ASSERT_NE(backup, nullptr); + ASSERT_EQ(mane->weight, 1); + ASSERT_EQ(backup->weight, 0); + + cout << "MAIN (expected active):[" << mane->token << "] weight=" << mane->weight << endl; + cout << "BACKUP (expected idle):[" << backup->token << "] weight=" << backup->weight << endl; + + // Ok, now both links should be running (this state lasts + // for the "temporary activation" period. + EXPECT_EQ(mane->memberstate, SRT_GST_RUNNING); + EXPECT_EQ(backup->memberstate, SRT_GST_IDLE); + + this_thread::sleep_for(seconds(1)); + + cout << "Closing receiver thread [A]\n"; + + acthr.join(); + + srt_close(ss); +} + + diff --git a/test/test_connection_timeout.cpp b/test/test_connection_timeout.cpp index ec52d9dd8..70ee20ce8 100644 --- a/test/test_connection_timeout.cpp +++ b/test/test_connection_timeout.cpp @@ -1,3 +1,4 @@ + #include #include #include @@ -170,20 +171,22 @@ TEST_F(TestConnectionTimeout, Nonblocking) { */ TEST_F(TestConnectionTimeout, BlockingLoop) { - const SRTSOCKET client_sock = srt_create_socket(); - ASSERT_GT(client_sock, 0); // socket_id should be > 0 - - // Set connection timeout to 999 ms to reduce the test execution time. - // Also need to hit a time point between two threads: - // srt_connect will check TTL every second, - // CRcvQueue::worker will wait on a socket for 10 ms. - // Need to have a condition, when srt_connect will process the timeout. - const int connection_timeout_ms = 999; - EXPECT_EQ(srt_setsockopt(client_sock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), SRT_SUCCESS); - const sockaddr* psa = reinterpret_cast(&m_sa); + const int connection_timeout_ms = 999; for (int i = 0; i < 10; ++i) { + const SRTSOCKET client_sock = srt_create_socket(); + EXPECT_GT(client_sock, 0); // socket_id should be > 0 + if (client_sock <= 0) + break; + + // Set connection timeout to 999 ms to reduce the test execution time. + // Also need to hit a time point between two threads: + // srt_connect will check TTL every second, + // CRcvQueue::worker will wait on a socket for 10 ms. + // Need to have a condition, when srt_connect will process the timeout. + EXPECT_EQ(srt_setsockopt(client_sock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), SRT_SUCCESS); + const chrono::steady_clock::time_point chrono_ts_start = chrono::steady_clock::now(); EXPECT_EQ(srt_connect(client_sock, psa, sizeof m_sa), SRT_ERROR); @@ -200,12 +203,11 @@ TEST_F(TestConnectionTimeout, BlockingLoop) << error_code << " " << srt_getlasterror_str() << "\n"; break; } - } - EXPECT_EQ(srt_close(client_sock), SRT_SUCCESS); + EXPECT_EQ(srt_close(client_sock), SRT_SUCCESS); + } } - TEST(TestConnectionAPI, Accept) { using namespace std::chrono;