Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TESTS] Added unit tests for bonding #1559

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,26 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i

vector<SRTSOCKET> broken;

// Return value rules:
// In non-blocking mode:
// - return Socket ID, if:
// - you requested only one connection in this call
// In blocking mode:
// - return Socket ID, if:
// - you requested only one connection in this call
// - you connect a group that was not connected yet
// - otherwise return 0

// Leave the last SID value in retval if you had only one
// connection to start. Otherwise override it with 0.
if (arraysize > 1)
retval = 0;

// For blocking mode only, and only in case when the group
// was not yet connected, this retval could be overridden
// again with the first ready socket ID, and this socket ID
// will be returned.

while (block_new_opened)
{
if (spawned.empty())
Expand Down
58 changes: 49 additions & 9 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4951,6 +4951,11 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
s->m_Status = SRTS_CONNECTED;

// acknowledde any waiting epolls to write
// This must be done AFTER the group member status is upgraded to IDLE because
// this state change will trigger the waiting function in blocking-mode groupConnect
// and this may be immediately followed by exit from connect and start sending function,
// which must see this very link already as IDLE, not PENDING, which will make this
// link unable to be used and therefore the sending call would fail.
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_CONNECT, true);

CGlobEvent::triggerEvent();
Expand Down Expand Up @@ -5541,6 +5546,12 @@ void * srt::CUDT::tsbpd(void* param)
gkeeper.group->updateLatestRcv(self->m_parent);
}
}

// After re-acquisition of the m_RecvLock, re-check the closing flag
if (self->m_bClosing)
{
break;
}
#endif
CGlobEvent::triggerEvent();
tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
Expand Down Expand Up @@ -5582,15 +5593,38 @@ void * srt::CUDT::tsbpd(void* param)
*/
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack");
self->m_bTsbPdNeedsWakeup = true;
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();
}

HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - "
<< "NOW=" << FormatTime(steady_clock::now()));
bWokeUpOnSignal = false;
while (!bWokeUpOnSignal)
{
// For safety reasons, do wakeup once per 1/8s and re-check the flag.
// This should be enough long time that during a normal transmission
// the TSBPD thread would be woken up much earlier when required by
// ACK per ACK timer (at most 10ms since the last check) and in case
// when this might result in a deadlock, it would only hold up to 125ms,
// which should be little harmful for the application. NOTE THAT THIS
// IS A SANITY CHECK FOR A SITUATION THAT SHALL NEVER HAPPEN.
THREAD_PAUSED();
bWokeUpOnSignal = tsbpd_cc.wait_for(milliseconds_from(125));
THREAD_RESUMED();
if (self->m_bClosing && !bWokeUpOnSignal)
{
HLOGC(tslog.Debug, log << "tsbpd: IPE: Closing flag set in the meantime of waiting. Continue to EXIT");

// This break doesn't have to be done in case when signaled
// because if so this current loop will be interrupted anyway,
// and the outer loop will be terminated at the check of self->m_bClosing.
// This is only a sanity check.
break;
}
}
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - "
<< "NOW=" << FormatTime(steady_clock::now()));

}
}

THREAD_EXIT();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING");
return NULL;
Expand Down Expand Up @@ -6328,7 +6362,7 @@ bool srt::CUDT::closeInternal() ATR_NOEXCEPT
// Inform the threads handler to stop.
m_bClosing = true;

HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock");
HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock");

ScopedLock connectguard(m_ConnectionLock);

Expand Down Expand Up @@ -7824,6 +7858,11 @@ void srt::CUDT::destroySynch()
void srt::CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
if (!m_bClosing)
{
HLOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!");
m_bClosing = true;
}
// wake up user calls
CSync::lock_notify_one(m_SendBlockCond, m_SendBlockLock);

Expand Down Expand Up @@ -9931,7 +9970,7 @@ void srt::CUDT::processClose()
m_bBroken = true;
m_iBrokenCounter = 60;

HLOGP(smlog.Debug, "processClose: sent message and set flags");
HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags");

if (m_bTsbPd)
{
Expand Down Expand Up @@ -11683,6 +11722,7 @@ void srt::CUDT::checkTimers()

void srt::CUDT::updateBrokenConnection()
{
HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events");
m_bClosing = true;
releaseSynch();
// app can call any UDT API to learn the connection_broken error
Expand Down
2 changes: 1 addition & 1 deletion srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const
if (!pwait)
{
// As this is mapped in the socket's data, it should be impossible.
LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
HLOGC(eilog.Debug, log << "epoll/update: IPE: update struck E"
<< (*i) << " which is NOT SUBSCRIBED to @" << uid);
continue;
}
Expand Down
63 changes: 58 additions & 5 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ void CUDTGroup::deriveSettings(CUDT* u)
#undef IMF
}

// XXX This function is likely of no use now.
bool CUDTGroup::applyFlags(uint32_t flags, HandshakeSide)
{
const bool synconmsg = IsSet(flags, SRT_GFLAG_SYNCONMSG);
Expand Down Expand Up @@ -1388,7 +1389,26 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

// { send_CheckBrokenSockets()

if (!pendingSockets.empty())
// Make an extra loop check to see if we could be
// in a condition of "all sockets either blocked or pending"

int nsuccessful = 0; // number of successfully connected sockets
int nblocked = 0; // number of sockets blocked in connection
bool is_pending_blocked = false;
for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
if (is->stat != -1)
{
nsuccessful++;
}
// is->stat == -1
else if (is->code == SRT_EASYNCSND)
{
++nblocked;
}
}

if (!pendingSockets.empty() || nblocked)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets, polling them.");

Expand All @@ -1405,12 +1425,24 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
}
else
{
int swait_timeout = 0;

// There's also a hidden condition here that is the upper if condition.
is_pending_blocked = (nsuccessful == 0);

// If this is the case when
if (m_bSynSending && is_pending_blocked)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: will block for " << m_iSndTimeOut << " - waiting for any writable in blocking mode");
swait_timeout = m_iSndTimeOut;
}

{
InvertedLock ug(m_GroupLock);

THREAD_PAUSED();
m_Global.m_EPoll.swait(
*m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened
*m_SndEpolld, (sready), swait_timeout, false /*report by retval*/); // Just check if anything happened
THREAD_RESUMED();
}

Expand All @@ -1423,6 +1455,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
HLOGC(gslog.Debug, log << "grp/sendBroadcast: RDY: " << DisplayEpollResults(sready));

// sockets in EX: should be moved to wipeme.
// IMPORTANT: we check only PENDING sockets (not blocked) because only
// pending sockets might report ERR epoll without being explicitly broken.
// Sockets that did connect and just have buffer full will be always broken,
// if they're going to report ERR in epoll.
for (vector<SRTSOCKET>::iterator i = pendingSockets.begin(); i != pendingSockets.end(); ++i)
{
if (CEPoll::isready(sready, *i, SRT_EPOLL_ERR))
Expand All @@ -1434,6 +1470,9 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
int no_events = 0;
m_Global.m_EPoll.update_usock(m_SndEID, *i, &no_events);
}

if (CEPoll::isready(sready, *i, SRT_EPOLL_OUT))
is_pending_blocked = false;
}

// After that, all sockets that have been reported
Expand All @@ -1450,7 +1489,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

send_CloseBrokenSockets(wipeme);
// Just for a case, when a socket that was blocked or pending
// had switched to write-enabled,

send_CloseBrokenSockets((wipeme)); // wipeme will be cleared by this function

// Re-check after the waiting lock has been reacquired
if (m_bClosing)
Expand Down Expand Up @@ -1712,9 +1754,18 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

if (none_succeeded)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
if (!m_bSynSending && (is_pending_blocked || was_blocked))
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: no links are ready for sending");
ercode = SRT_EASYNCSND;
}
else
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
}

// Reparse error code, if set.
// It might be set, if the last operation was failed.
// If any operation succeeded, this will not be executed anyway.
Expand Down Expand Up @@ -3628,6 +3679,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
if (len <= 0)
{
LOGC(gslog.Error, log << "grp/send(backup): negative length: " << len);
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}

Expand All @@ -3647,6 +3699,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
if (m_bClosing)
{
leaveCS(m_Global.m_GlobControlLock);
LOGC(gslog.Error, log << "grp/send(backup): Cannot send, connection lost!");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

Expand Down
Loading
Loading