Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some questions about IndexQueue. #2297

Open
Xcliu opened this issue May 30, 2024 · 10 comments
Open

Some questions about IndexQueue. #2297

Xcliu opened this issue May 30, 2024 · 10 comments

Comments

@Xcliu
Copy link

Xcliu commented May 30, 2024

Brief description

In my application, I've used the lock-free queue in iceoryx and noticed that the push operation is taking approximately 2 seconds to complete in some cases. Upon reviewing the lock-free queue's implementation, I've observed that the pop function within the IndexQueue could potentially lead to an infinite loop scenario when multiple threads are attempting to pop items simultaneously without any threads pushing new items. Am I correct in this assessment, or could there be other factors at play that I might have overlooked?

I would greatly appreciate any advice or insights on this matter.

Detailed information

inline bool MpmcIndexQueue<Capacity, ValueType>::pop(ValueType& index) noexcept

@elfenpiff
Copy link
Contributor

elfenpiff commented May 31, 2024

@Xcliu

Am I correct in this assessment, or could there be other factors at play that I might have overlooked?

No, you are correct, and it is valid behavior for lock-free algorithms. The guarantee that a lock-free algorithm gives you is that at least one thread always exists that makes progress. The much harder-to-implement, wait-free algorithm comes with a stronger guarantee that every thread always makes progress. (See: https://en.wikipedia.org/wiki/Non-blocking_algorithm)

This means that, in practice, when you have many producers that add data to the index queue and one consumer that acquires the data from the queue, it could happen that the producing threads always win and make progress, and the consumer may end up in a busy starvation loop.

One solution to this exemplary pop-starvation-problem could be to prioritize the pop-thread over the push-threads.

Would you be able to explain your problem in a bit more detail, maybe with some pseudo code? We also have other constructs in iceoryx available that may help you in your particular setup and we could recommend when we would know a bit more about the context.

@Xcliu
Copy link
Author

Xcliu commented May 31, 2024

@elfenpiff
Thanks for your reply!

It appears that if three threads attempt to pop from an MPMCIndexQueue and no threads are pushing to it, only one of the pop threads will make progress, while the remaining two will block until new data is pushed to the MPMCIndexQueue. Is this understanding correct?

In the pub/sub mode of Iceoryx, each subscriber has a dedicated receiving buffer (LockFreeQueue). The publisher will push messages to its history queue when no subscribers are present, and will only push (pushing to LockFreeQueue corresponds to popping from MPMCIndexQueue) to the receiving buffer when a subscriber is available. This approach may reduce the likelihood of blocking.

In my use case, we have the following scenario:

iox::concurrent::ResizeableLockFreeQueue<T, kMaxCapacity> buffer;

// Approximately 10 producer threads, with one of them having a high priority (e.g., RR-19).
// The high-priority thread and one of the ordinary threads are blocked for about 2 seconds.
buffer.push(item);

// There is only one consumer thread.
T item;
buffer.pop(item);

@elfenpiff
Copy link
Contributor

@Xcliu

It appears that if three threads attempt to pop from an MPMCIndexQueue and no threads are pushing to it, only one of the pop threads will make progress, while the remaining two will block until new data is pushed to the MPMCIndexQueue. Is this understanding correct?

No, in this case all threads should run without any obstructions and return false.

// Approximately 10 producer threads, with one of them having a high priority (e.g., RR-19).
// The high-priority thread and one of the ordinary threads are blocked for about 2 seconds.

I think that from the queue point of view this shouldn't happen. Do you have other synchronization primitives like a mutex or other lock-free/threadsafe constructs?
I am asking since this seems like a priority inversion problem - it can also originate from the index queue, I just wanted to exclude other possibilities first.

Another explanation I could think of is that ordinary producer threads send with an extremely high frequency, and even though their priority is lower, the number of publishers prevents the high-prio publisher from being scheduled.

Also, this architecture would require that the consumer thread has at least the priority of the high-priority producer thread. Otherwise, the queue would be constantly full since the consumer is too rarely scheduled. To reduce contention, you could also try to split up the queue a bit and use one queue per producer-consumer pair, in your case 10. Here you could use the spsc_fifo
On the consumer side you would always iterate over all fifos and see who has something new to consume.

@Xcliu
Copy link
Author

Xcliu commented May 31, 2024

@elfenpiff

When the first CAS failed, the value of cellIsValidToRead appears to remain constant (readPosition is not updated) in the loop unless a push is executed, which updates the the value of value.getCycle()?

template <uint64_t Capacity, typename ValueType>
bool IndexQueue<Capacity, ValueType>::pop(ValueType& index) noexcept
{
    bool ownershipGained = false;
    Index value;
    auto readPosition = m_readPosition.load(std::memory_order_relaxed);
    do
    {
        value = loadvalueAt(readPosition, std::memory_order_relaxed);
        
        auto cellIsValidToRead = readPosition.getCycle() == value.getCycle();

        if (cellIsValidToRead)
        {
            Index newReadPosition(readPosition + 1U);
            ownershipGained = m_readPosition.compare_exchange_weak(
                readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed);
        }
        else
        {
            // readPosition is ahead by one cycle, queue was empty at value load
            auto isEmpty = value.isOneCycleBehind(readPosition);

            if (isEmpty)
            {
                return false;
            }

            readPosition = m_readPosition.load(std::memory_order_relaxed);
        }

        // readPosition is outdated, retry operation

    } while (!ownershipGained); // we leave if we gain ownership of readPosition

    index = value.getIndex();
    return true;
}

@elBoberido
Copy link
Member

@Xcliu I currently don't have the full code in front of me but from the snippet you posted, readPosition is updated each time the CAS fails

@Xcliu
Copy link
Author

Xcliu commented May 31, 2024

@elfenpiff
When cellIsValidToRead is true, the subsequent compare_exchange_weak operation might fail. The line "readPosition = m_readPosition.load(std::memory_order_relaxed);" will not execute. Which line will update "readPosition"?

@elfenpiff
Copy link
Contributor

@Xcliu

When cellIsValidToRead is true, the subsequent compare_exchange_weak operation might fail. The line "readPosition = m_readPosition.load(std::memory_order_relaxed);" will not execute. Which line will update "readPosition"?

This line updates readPosition

ownershipGained = m_readPosition.compare_exchange_weak(
  readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed); 

When compare_exchange_weak fails the current_read_position is updated with the value contained in m_readPosition, see: https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange

There it is described as:

bool compare_exchange_weak( T& expected, T desired,
                            [std::memory_order](http://en.cppreference.com/w/cpp/atomic/memory_order) success,
                            [std::memory_order](http://en.cppreference.com/w/cpp/atomic/memory_order) failure ) noexcept;

Atomically compares the object representation(until C++20)value representation(since C++20) of *this with that of expected. If those are bitwise-equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in *this into expected (performs load operation).

@Xcliu
Copy link
Author

Xcliu commented May 31, 2024

@elfenpiff
Thanks for your clarification! I understand the point you're making.

@Xcliu
Copy link
Author

Xcliu commented May 31, 2024

@elfenpiff @elBoberido

Something about the LockFreeQueue

If the m_freeIndices are exhausted and the item isn't added to m_usedIndices to make it full ( blocked by higher priority thread) in another push thread (T1),will the current push thread be blocked as a result of thread T1 being obstructed by a higher priority thread?

template <typename ElementType, uint64_t Capacity>
template <typename T>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::pushImpl(T&& value) noexcept
{
    cxx::optional<ElementType> evictedValue;

    BufferIndex index;

    while (!m_freeIndices.pop(index))
    {
        // only pop the index if the queue is still full
        // note, this leads to issues if an index is lost
        // (only possible due to an application crash)
        // then the queue can never be full and we may never leave if no one calls a concurrent pop
        // A quick remedy is not to use a conditional pop such as popIfFull here, but a normal one.
        // However, then it can happen that due to a concurrent pop it was not really necessary to
        // evict a value (i.e. we may needlessly lose values in rare cases)
        // Whether there is another acceptable solution needs to be explored.
        if (m_usedIndices.popIfFull(index))
        {
            evictedValue = readBufferAt(index);
            break;
        }
        // if m_usedIndices was not full we try again (m_freeIndices should contain an index in this case)
        // note that it is theoretically possible to be unsuccessful indefinitely
        // (and thus we would have an infinite loop)
        // but this requires a timing of concurrent pushes and pops which is exceptionally unlikely in practice
    }

    // if we removed from a full queue via popIfFull it might not be full anymore when a concurrent pop occurs

    writeBufferAt(index, value); //&& version is called due to explicit conversion via std::move

    m_usedIndices.push(index);

    return evictedValue; // value was moved into the queue, if a value was evicted to do so return it
}

@elBoberido
Copy link
Member

@Xcliu I'm not that familiar with the LockFreeQueue class but I think this comment is related to #1546

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants