Skip to content

Commit

Permalink
iox-eclipse-iceoryx#2177 Address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albtam committed May 1, 2024
1 parent 8cc9277 commit 2da68fd
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 48 deletions.
13 changes: 7 additions & 6 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,30 +98,31 @@ class SpscSofi
// |--A--|-----|-----|
// ^
// w=3, r=3
// 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to pop CapacityValue elements
// 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to
// pop CapacityValue elements
// ========================================================================
static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1;

/// @brief Internal capacity of the queue at creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_CAPACITY = CapacityValue + INTERNAL_CAPACITY_ADDON;

public:
/// @brief default constructor which constructs an empty sofi
/// @brief default constructor which constructs an empty SpscSofi
SpscSofi() noexcept = default;

/// @brief push an element into sofi. if sofi is full the oldest data will be
/// @brief push an element into SpscSofi. if SpscSofi is full the oldest data will be
/// returned and the pushed element is stored in its place instead.
/// @param[in] value_in value which should be stored
/// @param[out] value_out if sofi is overflowing the value of the overridden value
/// @param[out] value_out if SpscSofi is overflowing the value of the overridden value
/// is stored here
/// @note restricted thread safe: can only be called from one thread. The authorization to push into the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return return true if push was successful else false.
/// @code
/// 1. sofi is empty |-----|-----|
/// 1. SpscSofi is empty |-----|-----|
/// 2. push an element |--A--|-----|
/// 3. push an element |--A--|--B--|
/// 5. sofi is full
/// 5. SpscSofi is full
/// 6. push an element |--C--|--B--| -> value_out is set to 'A'
bool push(const ValueType& valueIn, ValueType& valueOut) noexcept;

Expand Down
98 changes: 56 additions & 42 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ inline bool SpscSofi<ValueType, CapacityValue>::setCapacity(const uint64_t newSi
template <class ValueType, uint64_t CapacityValue>
inline bool SpscSofi<ValueType, CapacityValue>::empty() const noexcept
{
auto [readPost, writePos] = getReadWritePositions();
return readPost == writePos;
auto [readPosition, writePosition] = getReadWritePositions();
return readPosition == writePosition;
}

template <class ValueType, uint64_t CapacityValue>
Expand All @@ -115,35 +115,40 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3. The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
// TODO(@albtam): Explain yo which release/store statement it corresponds
uint64_t currentReadPos = m_readPosition.load(std::memory_order_acquire);
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);
uint64_t nextReadPosition{0};

bool popWasSuccessful{true};

do
{
// SYNC POINT READ: m_data
// See explanation of the corresponding synchronization point in push()
if (currentReadPos == m_writePosition.load(std::memory_order_acquire))
if (currentReadPosition == m_writePosition.load(std::memory_order_acquire))
{
return false;
// We don't need to check if read has changed, as it is enough to know that the empty state
// was valid in the past. The same race can also happen after the while loop and before the
// return operation
nextReadPosition = currentReadPosition;
popWasSuccessful = false;
}
else
{
// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));
nextReadPosition = currentReadPosition + 1U;
popWasSuccessful = true;
}
// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPos % m_size], sizeof(ValueType));

// We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted
// Memory order relaxed is enough as:
// - synchronization is not needed with m_readPosition
// - there is no operation reordering possible
// While memory synchronization is not needed for m_readPosition we need to have a
// corresponding m_readPosition.store(release) to the m_readPosition.load(acquire) in the
// push method
// =============================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz
// while this thread is blocked, we would still need more than 500 years to overflow
// m_readPosition and encounter the ABA problem
} while (!m_readPosition.compare_exchange_weak(
currentReadPos, currentReadPos + 1U, std::memory_order_relaxed, std::memory_order_relaxed));
currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire));

return true;
return popWasSuccessful;
}

template <class ValueType, uint64_t CapacityValue>
Expand All @@ -152,55 +157,64 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
constexpr bool SOFI_OVERFLOW{false};

// Memory order relaxed is enough since:
// - no synchronization needed as we are loading a value only modified in this method and this method cannot be accessed concurrently
// - no synchronization needed as we are loading a value only modified in this method and this method cannot be
// accessed concurrently
// - the operation cannot move below without observable changes
uint64_t currentWritePos = m_writePosition.load(std::memory_order_relaxed);
uint64_t nextWritePos = currentWritePos + 1U;
uint64_t currentWritePosition = m_writePosition.load(std::memory_order_relaxed);
uint64_t nextWritePosition = currentWritePosition + 1U;

m_data[currentWritePos % m_size] = valueIn;
m_data[currentWritePosition % m_size] = valueIn;
// SYNC POINT WRITE: m_data
// We need to make sure that writing the value happens before incrementing the
// m_writePosition otherwise the following scenario can happen:
// 1. m_writePosition is increased (but the value has not been written yet)
// 2. Another thread calls pop(): we check if the queue is empty => no (e.g. m_writePosition == 1
// and m_readPosition == 0)
// 3. In pop(), a data race can occur
// 2. The consumer thread calls pop(): we check if the queue is empty => no
// 3. In pop(), when we read a value a data race can occur
// With memory_order_release, this cannot happen as it is guaranteed that writing the data
// happens before incrementing m_writePosition
// =======================================
// Note that the following situation can still happen (but is not a problem):
// 1. A value is written (m_writePosition hasn't been incremented yet)
// 2. Another thread calls pop(): we check if the queue is empty => yes (e.g. m_writePosition ==
// m_readPosition == 0 )
// 3. An element was already stored so we could have popped the element
m_writePosition.store(nextWritePos, std::memory_order_release);
// Note that the following situation can still happen (but, although it is an inherent race with
// concurrent algorithms, it is not a data race and therefore not a problem):
// 1. There is an empty queue
// 2. A push operation is in progress, the value has been written but 'm_writePosition' was not
// yet advanced
// 3. The consumer thread performs a pop operation and the check for an empty queue is true
// resulting in a failed pop
// 4. The push operation is finished by advancing m_writePos and synchronizing the memory
// 5. The consumer thread missed the chance to pop the element in the blink of an eye
m_writePosition.store(nextWritePosition, std::memory_order_release);

// Memory order relaxed is enough since:
// - synchronization is not needed with m_readPosition
// - operation reordering:
// - cannot move below if statement otherwise, the code won't compile
// - if it moves above, we might get an outdated read position that will be caught by the
// compare_exchange check
uint64_t currentReadPos = m_readPosition.load(std::memory_order_relaxed);
// While memory synchronization is not needed with m_readPosition, we need
// memory_order_acquire to avoid the reordering of the operation
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);

// Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to
// check if there is a free position for the *next* write position
if (nextWritePos < currentReadPos + m_size)
if (nextWritePosition < currentReadPosition + m_size)
{
return !SOFI_OVERFLOW;
}

// This is an overflow situation so we will need to read the overwritten value
// however, it could be that pop() was called in the meantime, i.e. m_readPosition was increased.
// Memory order relaxed is enough for both success and failure cases since:
// - synchronization is not needed with m_readPosition
// - operation reordering cannot happen
// Memory order success needs to be memory_order_acq_rel to prevent the reordering of
// m_writePosition.store(...) after the increment of the m_readPosition. Otherwise, in case of
// an overflow, this might result in the pop thread getting one element less than the capacity
// of the SoFi if the push thread is suspended in between this two statements.
// It's still possible to get more elements than the capacity, but this is an inherent issue
// with concurrent queues and cannot be prevented since there can always be a push during a pop
// operation.
// Another issue might be that two consecutive pushes (not concurrent) happen on different CPU
// cores without synchronization, then the memory also needs to be synchronized for the overflow
// case.
// Memory order failure is memory_order_relaxed since there is no further synchronization ////
// needed if there is no overflow
// ======================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while
// this thread is blocked, we would still need more than 500 years to overflow m_readPosition and
// encounter the ABA problem
if (m_readPosition.compare_exchange_strong(
currentReadPos, currentReadPos + 1U, std::memory_order_relaxed, std::memory_order_relaxed))
currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_relaxed))
{
// Since INTERNAL_SOFI_CAPACITY = CapacityValue + 1, it can happen that we return more
// elements than the CapacityValue by calling push and pop concurrently (in case of an
Expand Down Expand Up @@ -229,7 +243,7 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
// |-(A)-|-(B)-|-(C)-|
// ^
// w=3, r=3
std::memcpy(&valueOut, &m_data[currentReadPos % m_size], sizeof(ValueType));
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));
return SOFI_OVERFLOW;
}

Expand Down

0 comments on commit 2da68fd

Please sign in to comment.