Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions src/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,46 @@

#include <memory_pool.h>

MemoryPool::MemoryPool(int num_bins, size_t bin_size) {
MemoryPool::MemoryPool(int num_bins, size_t bin_size) :
numBins(num_bins), binSize(bin_size), memPool(num_bins * bin_size) {
spin_init(&lock);
for (int i = 0; i < num_bins; ++i) {
memPool.push_back((uint8_t *) malloc(bin_size));
enQueue(i);
for (int i = num_bins - 1; i >= 0; --i) {
push(i);
}
}

MemoryPool::~MemoryPool() {
spin_destroy(&lock);
for (auto &it : memPool) {
free(it);
}
}

const int MemoryPool::fetchBlock(uint8_t **buf) {
int ret = deQueue();
if (ret == -1) {
const int ret = pop();
if (ret < 0) {
*buf = nullptr;
} else {
*buf = memPool.at(ret);
*buf = &memPool.data()[ret * binSize];
}
return ret;
}

void MemoryPool::returnBlock(int index) {
if (index >= 0 && index < static_cast<int>(memPool.size())) {
enQueue(index);
if (index >= 0 && index < numBins) {
push(index);
}
}

inline void MemoryPool::enQueue(int index) {
inline void MemoryPool::push(int index) {
spin_lock(&lock);
indexQ.push(index);
indexes.push(index);
spin_unlock(&lock);
}

inline int MemoryPool::deQueue() {
inline int MemoryPool::pop() {
int data = -1;
spin_lock(&lock);
if (indexQ.size()) {
data = indexQ.front();
indexQ.pop();
if (indexes.size()) {
data = indexes.top();
indexes.pop();
}
spin_unlock(&lock);
return data;
Expand Down
28 changes: 16 additions & 12 deletions src/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <stdlib.h>

#include <queue>
#include <stack>
#include <vector>

#include "common.h"
Expand All @@ -30,14 +30,14 @@ class MemoryPool {
it contains:
- A vector of pre-allocated heap memory (bins).
- Each bin will be of the initially provided bin size.
- A queue containing the indexes of the available bins from the vector.
- A stack containing the indexes of the available bins from the vector.
- A fetchBlock() operation gets an available resource and returns the index
to the acquired bin, removes the index entry from the queue.
- A returnBlock() operation requeues the provided index back into the queue,
to the acquired bin, removes the index entry from the stack.
- A returnBlock() operation requeues the provided index back into the stack,
thereby making the bin pointed to by the index available again.

+---+---+---+---+---+---+----
queue: | 0 | 1 | 2 | 3 | 4 | 5 | ...
stack: | 0 | 1 | 2 | 3 | 4 | 5 | ...
+---+---+---+---+---+---+----
| ____________
+-----> | bin1 |
Expand Down Expand Up @@ -73,19 +73,23 @@ class MemoryPool {
private:

/**
* Pushes entry into queue.
* Pushes entry into stack.
*/
void enQueue(int index);
void push(int index);

/**
* Pops entry from queue.
* Pops entry from stack.
*/
int deQueue();
int pop();

// Spin lock for queue ops
spin_t lock;
// Queue of indexes
std::queue<int> indexQ;
// Stack of free bins in the pre-allocated memory
std::stack<int> indexes;
// Vector of pre-allocated memory
std::vector<uint8_t*> memPool;
std::vector<uint8_t> memPool;
// Number of bins inside the queue
const int numBins;
// Bin sizes
const size_t binSize;
};
20 changes: 14 additions & 6 deletions tests/unit/mempool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,27 @@ struct worker_args{
StatAggregator *sa;
};

inline uint64_t xorshiftstar(uint64_t *seed) {
*seed ^= *seed >> 12;
*seed ^= *seed << 25;
*seed ^= *seed >> 27;
return *seed * 2685821657736338717;
}

void *basic_tester(void *args_)
{
TEST_INIT();
struct worker_args *args = (struct worker_args *)args_;
size_t bin_size = args->bin_size;
for (int i = args->num_runs; i; --i) {
uint8_t *buf;
uint64_t seed = 7;
volatile uint8_t *buf;
ts_nsec start = get_monotonic_ts();
const int idx = args->mp->fetchBlock(&buf);
const int idx = args->mp->fetchBlock(const_cast<uint8_t**>(&buf));
ts_nsec end = get_monotonic_ts();
TEST_CHK(idx != -1);
for (int j = 100; j; --j) {
buf[rand() % (bin_size - 1)] = 'X';
for (int j = 0; j < 100; j++) {
buf[xorshiftstar(&seed) % args->bin_size] = 'X';
}
collect_stat(args->sa, (end - start));
args->mp->returnBlock(idx);
Expand Down Expand Up @@ -149,7 +157,7 @@ void multi_thread_test(int num_threads, int iterations, int num_bins,

int main()
{
basic_test(10000, 8, 10485760); //1000 runs of 8 x 10MB buffers
multi_thread_test(8, 10000, 8, 10485760); // repeat with 8 threads
basic_test(1000000, 8, 1024 * 1024); //1000000 runs of 8 x 1MB buffers
multi_thread_test(8, 1000000, 8, 1024 * 1024); // repeat with 8 threads
return 0;
}