Skip to content

Commit

Permalink
Added fixes for proper report blocking. Testing all group types in Co…
Browse files Browse the repository at this point in the history
…nnectNonBlocking
  • Loading branch information
Mikołaj Małecki committed Dec 7, 2020
1 parent 1cd8bcd commit 5834866
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 103 deletions.
78 changes: 62 additions & 16 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
m_iLastSchedSeqNo = curseq;
}

// }

// { send_CheckBrokenSockets()

// Make an extra loop check to see if we could be
// in a condition of "all sockets either blocked or pending"

Expand All @@ -1384,10 +1388,6 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
}
}

// }

// { send_CheckBrokenSockets()

if (!pending.empty() || nblocked)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets, polling them.");
Expand Down Expand Up @@ -1462,7 +1462,6 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
// writable (connected) before this function had a chance
// to check them.
m_pGlobal->m_EPoll.clear_ready_usocks(*m_SndEpolld, SRT_EPOLL_CONNECT);

}
}

Expand Down Expand Up @@ -1598,7 +1597,6 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

int ercode = 0;

// XXX WITH BLOCKED CHECK ALSO PENDING!!!
if (was_blocked)
{
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
Expand Down Expand Up @@ -3372,7 +3370,7 @@ size_t CUDTGroup::sendBackup_CheckNeedActivate(const vector<gli_t>& idler
}

// [[using locked(this->m_GroupLock)]]
void CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, vector<SRTSOCKET>& w_wipeme)
bool CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, int nsuccessful, int nblocked, vector<SRTSOCKET>& w_wipeme)
{
// If we have at least one stable link, then select a link that have the
// highest priority and silence the rest.
Expand All @@ -3384,7 +3382,8 @@ void CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, vecto
// we have one link that is stable and the freshly activated link is actually
// stable too, we'll check this next time.
//
if (!pending.empty())
bool is_pending_blocked = false;
if (!pending.empty() || nblocked)
{
HLOGC(gslog.Debug, log << "grp/send*: found pending sockets, polling them.");

Expand All @@ -3400,6 +3399,18 @@ void CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, vecto
}
else
{
int swait_timeout = 0;

// There's also a hidden condition here that is the upper if condition.
is_pending_blocked = (nsuccessful == 0);

// If this is the case when
if (m_bSynSending && is_pending_blocked)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: will block for " << m_iSndTimeOut << " - waiting for any writable in blocking mode");
swait_timeout = m_iSndTimeOut;
}

// Some sockets could have been closed in the meantime.
if (m_SndEpolld->watch_empty())
{
Expand All @@ -3410,7 +3421,7 @@ void CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, vecto
{
InvertedLock ug(m_GroupLock);
m_pGlobal->m_EPoll.swait(
*m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened
*m_SndEpolld, sready, swait_timeout, false /*report by retval*/); // Just check if anything happened
}

if (m_bClosing)
Expand All @@ -3432,6 +3443,9 @@ void CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, vecto
int no_events = 0;
m_pGlobal->m_EPoll.update_usock(m_SndEID, *i, &no_events);
}

if (CEPoll::isready(sready, *i, SRT_EPOLL_OUT))
is_pending_blocked = false;
}

// After that, all sockets that have been reported
Expand All @@ -3443,6 +3457,8 @@ void CUDTGroup::send_CheckPendingSockets(const vector<SRTSOCKET>& pending, vecto
m_pGlobal->m_EPoll.clear_ready_usocks(*m_SndEpolld, SRT_EPOLL_OUT);
}
}

return is_pending_blocked;
}

// [[using locked(this->m_GroupLock)]]
Expand Down Expand Up @@ -3951,6 +3967,10 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
// and therefore need to be activated.
set<uint16_t> sendable_pri;

// Likely will need to survive unlock-lock cycle on the group,
// so keep this by IDs.
vector<SRTSOCKET> blocked;

// We believe that we need to send the payload over every sendable link anyway.
for (vector<gli_t>::iterator snd = sendable.begin(); snd != sendable.end(); ++snd)
{
Expand Down Expand Up @@ -3992,6 +4012,9 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
if (is_unstable && is_zero(u.m_tsUnstableSince)) // Add to unstable only if it wasn't unstable already
insert_uniq((unstable), d);

if (is_unstable)
blocked.push_back(d->id);

const Sendstate cstate = {d->id, d, stat, erc};
sendstates.push_back(cstate);
d->sndresult = stat;
Expand Down Expand Up @@ -4164,7 +4187,22 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
<< " unstable=" << unstable.size());
}

send_CheckPendingSockets(pending, (wipeme));
int nsuccess = 0;
int nblocked = 0;
for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
if (is->stat == -1)
{
if (is->code == SRT_EASYNCSND)
++nblocked;
}
else
{
nsuccess++;
}
}

bool is_pending_blocked = send_CheckPendingSockets(pending, nsuccess, nblocked, (wipeme));

// Re-check after the waiting lock has been reacquired
if (m_bClosing)
Expand All @@ -4187,14 +4225,22 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)

if (none_succeeded)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: all links broken (none succeeded to send a payload)");
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
// Reparse error code, if set.
// It might be set, if the last operation was failed.
// If any operation succeeded, this will not be executed anyway.
if (!m_bSynSending && (is_pending_blocked || nblocked))
{
HLOGC(gslog.Debug, log << "grp/sendBackup: no links are ready for sending");
throw CUDTException(MJ_AGAIN, MN_WRAVAIL);
}
else
{
HLOGC(gslog.Debug, log << "grp/sendBackup: all links broken (none succeeded to send a payload)");
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
// Reparse error code, if set.
// It might be set, if the last operation was failed.
// If any operation succeeded, this will not be executed anyway.

throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
}

// Now fill in the socket table. Check if the size is enough, if not,
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class CUDTGroup
std::vector<gli_t>& w_parallel,
std::vector<SRTSOCKET>& w_wipeme,
const std::string& activate_reason);
void send_CheckPendingSockets(const std::vector<SRTSOCKET>& pending, std::vector<SRTSOCKET>& w_wipeme);
bool send_CheckPendingSockets(const std::vector<SRTSOCKET>& pending, int nsuccessful, int nblocked, std::vector<SRTSOCKET>& w_wipeme);
void send_CloseBrokenSockets(std::vector<SRTSOCKET>& w_wipeme);
void sendBackup_CheckParallelLinks(const std::vector<gli_t>& unstable,
std::vector<gli_t>& w_parallel,
Expand Down
Loading

0 comments on commit 5834866

Please sign in to comment.