Skip to content

Commit

Permalink
SystemLayerImplSelect: make safe with unbalanced use of Request/Clear…
Browse files Browse the repository at this point in the history
…Callback (#21349)

Original version (3ca630c) could cause non balanced dispatch_suspend/dispatch_resume which is not allowed by the dispatch API, as pointed out in #pullrequestreview-1051517126.

This version does not use dispatch_suspend/resume any more to prevent the problem, additionally making the implementation behaving as similar as possible as, and compatible with, the select() based fallback.

The fallback is required by some tests even on dispatch based platforms, when no dispatch queue can be started.

But for real applications on dispatch based platforms, there should always be a dispatch queue, so the implementation now issues a warning when RequestCallbackOnPendingXXX is called without a dispatch queue available.
  • Loading branch information
plan44 authored and pull[bot] committed Feb 1, 2024
1 parent 3865d6f commit 4470625
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 46 deletions.
111 changes: 65 additions & 46 deletions src/system/SystemLayerImplSelect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ void LayerImplSelect::Shutdown()
}
}
mTimerPool.ReleaseAll();

for (auto & w : mSocketWatchPool)
{
w.DisableAndClear();
}

#else // CHIP_SYSTEM_CONFIG_USE_DISPATCH
mTimerList.Clear();
mTimerPool.ReleaseAll();
Expand Down Expand Up @@ -248,23 +254,32 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingRead(SocketWatchToken token)
watch->mPendingIO.Set(SocketEventFlags::kRead);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (watch->mWrSource)
if (watch->mRdSource == nullptr)
{
dispatch_resume(watch->mRdSource);
}
else
{
if (dispatchQueue)
// First time requesting callback for read events: install a dispatch source
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue == nullptr)
{
// Note: if no dispatch queue is available, callbacks most probably will not work,
// unless, as in some tests from a test-specific local loop,
// the select based event handling (Prepare/WaitFor/HandleEvents) is invoked.
ChipLogError(DeviceLayer,
"RequestCallbackOnPendingRead with no dispatch queue: callback may not work (might be ok in tests)");
}
else
{
watch->mRdSource =
dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast<uintptr_t>(watch->mFD), 0, dispatchQueue);
ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY);
dispatch_source_set_event_handler(watch->mRdSource, ^{
SocketEvents events;
events.Set(SocketEventFlags::kRead);
watch->mCallback(events, watch->mCallbackData);
if (watch->mPendingIO.Has(SocketEventFlags::kRead) && watch->mCallback != nullptr)
{
SocketEvents events;
events.Set(SocketEventFlags::kRead);
watch->mCallback(events, watch->mCallbackData);
}
});
// only now we are sure the source exists and can become active
dispatch_activate(watch->mRdSource);
}
}
Expand All @@ -281,23 +296,33 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingWrite(SocketWatchToken token
watch->mPendingIO.Set(SocketEventFlags::kWrite);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mWrSource)
{
dispatch_resume(watch->mWrSource);
}
else
if (watch->mWrSource == nullptr)
{
// First time requesting callback for read events: install a dispatch source
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue)
if (dispatchQueue == nullptr)
{
// Note: if no dispatch queue is available, callbacks most probably will not work,
// unless, as in some tests from a test-specific local loop,
// the select based event handling (Prepare/WaitFor/HandleEvents) is invoked.
ChipLogError(DeviceLayer,
"RequestCallbackOnPendingWrite with no dispatch queue: callback may not work (might be ok in tests)");
}
else
{
watch->mWrSource =
dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast<uintptr_t>(watch->mFD), 0, dispatchQueue);
ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY);
dispatch_source_set_event_handler(watch->mWrSource, ^{
SocketEvents events;
events.Set(SocketEventFlags::kWrite);
watch->mCallback(events, watch->mCallbackData);
if (watch->mPendingIO.Has(SocketEventFlags::kWrite) && watch->mCallback != nullptr)
{
SocketEvents events;
events.Set(SocketEventFlags::kWrite);
watch->mCallback(events, watch->mCallbackData);
}
});
// only now we are sure the source exists and can become active
watch->mPendingIO.Set(SocketEventFlags::kWrite);
dispatch_activate(watch->mWrSource);
}
}
Expand All @@ -310,14 +335,8 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingRead(SocketWatchToken token)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Clear(SocketEventFlags::kRead);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mRdSource)
{
dispatch_suspend(watch->mRdSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
watch->mPendingIO.Clear(SocketEventFlags::kRead);

return CHIP_NO_ERROR;
}
Expand All @@ -326,14 +345,8 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingWrite(SocketWatchToken token)
{
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Clear(SocketEventFlags::kWrite);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mWrSource)
{
dispatch_suspend(watch->mWrSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
watch->mPendingIO.Clear(SocketEventFlags::kWrite);

return CHIP_NO_ERROR;
}
Expand All @@ -347,21 +360,10 @@ CHIP_ERROR LayerImplSelect::StopWatchingSocket(SocketWatchToken * tokenInOut)
VerifyOrReturnError(watch->mFD >= 0, CHIP_ERROR_INCORRECT_STATE);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mRdSource)
{
dispatch_cancel(watch->mRdSource);
dispatch_release(watch->mRdSource);
}
if (watch->mWrSource)
{
dispatch_cancel(watch->mWrSource);
dispatch_release(watch->mWrSource);
}
#endif

watch->DisableAndClear();
#else
watch->Clear();

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
// Wake the thread calling select so that it stops selecting on the socket.
Signal();
#endif
Expand Down Expand Up @@ -511,5 +513,22 @@ void LayerImplSelect::SocketWatch::Clear()
#endif
}

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
void LayerImplSelect::SocketWatch::DisableAndClear()
{
if (mRdSource)
{
dispatch_source_cancel(mRdSource);
dispatch_release(mRdSource);
}
if (mWrSource)
{
dispatch_source_cancel(mWrSource);
dispatch_release(mWrSource);
}
Clear();
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

} // namespace System
} // namespace chip
1 change: 1 addition & 0 deletions src/system/SystemLayerImplSelect.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class LayerImplSelect : public LayerSocketsLoop
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_source_t mRdSource;
dispatch_source_t mWrSource;
void DisableAndClear();
#endif
intptr_t mCallbackData;
};
Expand Down

0 comments on commit 4470625

Please sign in to comment.