Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 3 additions & 41 deletions mooncake-store/include/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <limits>
#include <memory>
#include <string>
#include <unordered_map>

#include "cachelib_memory_allocator/MemoryAllocator.h"
#include "offset_allocator/offset_allocator.hpp"
Expand All @@ -20,33 +19,6 @@ namespace mooncake {
static constexpr size_t kAllocatorUnknownFreeSpace =
std::numeric_limits<size_t>::max();

/**
* @brief Status of a buffer in the system
*/
enum class BufStatus {
INIT = 0, // Initial state
COMPLETE = 1, // Complete state (buffer has been used)
FAILED = 2, // Failed state (allocation failed, upstream should set handle
// to this state)
UNREGISTERED = 3, // Buffer metadata has been deleted
};

/**
* @brief Stream operator for BufStatus
*/
inline std::ostream& operator<<(std::ostream& os,
const BufStatus& status) noexcept {
static const std::unordered_map<BufStatus, std::string_view> status_strings{
{BufStatus::INIT, "INIT"},
{BufStatus::COMPLETE, "COMPLETE"},
{BufStatus::FAILED, "FAILED"},
{BufStatus::UNREGISTERED, "UNREGISTERED"}};

os << (status_strings.count(status) ? status_strings.at(status)
: "UNKNOWN");
return os;
}

// Forward declarations
class BufferAllocatorBase;

Expand All @@ -58,12 +30,10 @@ class AllocatedBuffer {
struct Descriptor;

AllocatedBuffer(std::shared_ptr<BufferAllocatorBase> allocator,
std::string segment_name, void* buffer_ptr,
std::size_t size,
void* buffer_ptr, std::size_t size,
std::optional<offset_allocator::OffsetAllocationHandle>&&
offset_handle = std::nullopt)
: allocator_(std::move(allocator)),
segment_name_(std::move(segment_name)),
buffer_ptr_(buffer_ptr),
size_(size),
offset_handle_(std::move(offset_handle)) {}
Expand All @@ -86,9 +56,7 @@ class AllocatedBuffer {
// Serialize the buffer into a descriptor for transfer
[[nodiscard]] Descriptor get_descriptor() const;

[[nodiscard]] std::string getSegmentName() const noexcept {
return segment_name_;
}
[[nodiscard]] std::string getSegmentName() const noexcept;

// Friend declaration for operator<<
friend std::ostream& operator<<(std::ostream& os,
Expand All @@ -98,18 +66,12 @@ class AllocatedBuffer {
struct Descriptor {
uint64_t size_;
uintptr_t buffer_address_;
BufStatus status_;
std::string transport_endpoint_;
YLT_REFL(Descriptor, size_, buffer_address_, status_,
transport_endpoint_);
YLT_REFL(Descriptor, size_, buffer_address_, transport_endpoint_);
};

void mark_complete() { status = BufStatus::COMPLETE; }

private:
std::weak_ptr<BufferAllocatorBase> allocator_;
std::string segment_name_;
BufStatus status{BufStatus::INIT};
void* buffer_ptr_{nullptr};
std::size_t size_{0};
// RAII handle for buffer allocated by offset allocator
Expand Down
6 changes: 0 additions & 6 deletions mooncake-store/include/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,6 @@ class Replica {
void mark_complete() {
if (status_ == ReplicaStatus::PROCESSING) {
status_ = ReplicaStatus::COMPLETE;
if (is_memory_replica()) {
auto& mem_data = std::get<MemoryReplicaData>(data_);
for (const auto& buf_ptr : mem_data.buffers) {
buf_ptr->mark_complete();
}
}
} else if (status_ == ReplicaStatus::COMPLETE) {
LOG(WARNING) << "Replica already marked as complete";
} else {
Expand Down
32 changes: 18 additions & 14 deletions mooncake-store/src/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@

namespace mooncake {

std::string AllocatedBuffer::getSegmentName() const noexcept {
auto alloc = allocator_.lock();
if (alloc) {
return alloc->getSegmentName();
}
return std::string();
}

AllocatedBuffer::~AllocatedBuffer() {
auto alloc = allocator_.lock();
if (alloc) {
alloc->deallocate(this);
VLOG(1) << "buf_handle_deallocated segment_name=" << segment_name_
<< " size=" << size_;
VLOG(1) << "buf_handle_deallocated size=" << size_;
} else {
MasterMetricManager::instance().dec_allocated_size(size_);
VLOG(1) << "allocator=expired_or_null in buf_handle_destructor";
Expand All @@ -28,20 +35,21 @@ AllocatedBuffer::Descriptor AllocatedBuffer::get_descriptor() const {
if (alloc) {
endpoint = alloc->getTransportEndpoint();
} else {
LOG(ERROR) << "allocator=expired_or_null in get_descriptor, where "
"segment_name="
<< segment_name_;
LOG(ERROR) << "allocator=expired_or_null in get_descriptor";
}
return {static_cast<uint64_t>(size()),
reinterpret_cast<uintptr_t>(buffer_ptr_), status, endpoint};
reinterpret_cast<uintptr_t>(buffer_ptr_), endpoint};
}

// Define operator<< using public accessors or get_descriptor if appropriate
std::ostream& operator<<(std::ostream& os, const AllocatedBuffer& buffer) {
return os << "AllocatedBuffer: { "
<< "segment_name: " << buffer.segment_name_ << ", "
<< "segment_name: "
<< (buffer.allocator_.lock()
? buffer.allocator_.lock()->getSegmentName()
: std::string("<expired>"))
<< ", "
<< "size: " << buffer.size() << ", "
<< "status: " << buffer.status << ", "
<< "buffer_ptr: " << static_cast<void*>(buffer.data()) << " }";
}

Expand Down Expand Up @@ -110,15 +118,13 @@ std::unique_ptr<AllocatedBuffer> CachelibBufferAllocator::allocate(
<< " segment=" << segment_name_ << " address=" << buffer;
cur_size_.fetch_add(size);
MasterMetricManager::instance().inc_allocated_size(size);
return std::make_unique<AllocatedBuffer>(shared_from_this(), segment_name_,
buffer, size);
return std::make_unique<AllocatedBuffer>(shared_from_this(), buffer, size);
}

void CachelibBufferAllocator::deallocate(AllocatedBuffer* handle) {
try {
// Deallocate memory using CacheLib.
memory_allocator_->free(handle->buffer_ptr_);
handle->status = BufStatus::UNREGISTERED;
size_t freed_size =
handle->size_; // Store size before handle might become invalid
cur_size_.fetch_sub(freed_size);
Expand Down Expand Up @@ -199,8 +205,7 @@ std::unique_ptr<AllocatedBuffer> OffsetBufferAllocator::allocate(size_t size) {
// Create a custom AllocatedBuffer that manages the
// OffsetAllocationHandle
allocated_buffer = std::make_unique<AllocatedBuffer>(
shared_from_this(), segment_name_, buffer_ptr, size,
std::move(allocation_handle));
shared_from_this(), buffer_ptr, size, std::move(allocation_handle));
VLOG(1) << "allocation_succeeded size=" << size
<< " segment=" << segment_name_ << " address=" << buffer_ptr;
} catch (const std::exception& e) {
Expand All @@ -222,7 +227,6 @@ void OffsetBufferAllocator::deallocate(AllocatedBuffer* handle) {
// when the OffsetAllocationHandle goes out of scope
size_t freed_size = handle->size();
handle->offset_handle_.reset();
handle->status = BufStatus::UNREGISTERED;
cur_size_.fetch_sub(freed_size);
MasterMetricManager::instance().dec_allocated_size(freed_size);
VLOG(1) << "deallocation_succeeded address=" << handle->data()
Expand Down
1 change: 0 additions & 1 deletion mooncake-store/tests/buffer_allocator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class BufferAllocatorTest : public ::testing::Test {
EXPECT_EQ(bufHandle.getSegmentName(), segment_name);
EXPECT_EQ(descriptor.transport_endpoint_, transport_endpoint);
EXPECT_EQ(descriptor.size_, alloc_size);
EXPECT_EQ(descriptor.status_, BufStatus::INIT);
EXPECT_NE(bufHandle.data(), nullptr);
}

Expand Down
3 changes: 0 additions & 3 deletions mooncake-store/tests/client_buffer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,14 @@ TEST_F(ClientBufferTest, CalculateTotalSizeMemoryReplica) {
AllocatedBuffer::Descriptor buf1;
buf1.size_ = 1024;
buf1.buffer_address_ = 0x1000;
buf1.status_ = BufStatus::COMPLETE;

AllocatedBuffer::Descriptor buf2;
buf2.size_ = 2048;
buf2.buffer_address_ = 0x2000;
buf2.status_ = BufStatus::COMPLETE;

AllocatedBuffer::Descriptor buf3;
buf3.size_ = 512;
buf3.buffer_address_ = 0x3000;
buf3.status_ = BufStatus::COMPLETE;

mem_desc.buffer_descriptors = {buf1, buf2, buf3};
replica.descriptor_variant = mem_desc;
Expand Down
5 changes: 0 additions & 5 deletions mooncake-store/tests/master_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,6 @@ TEST_F(MasterServiceTest, MultiSliceMultiReplicaFlow) {
i++) {
const auto& handle =
replica.get_memory_descriptor().buffer_descriptors[i];
EXPECT_EQ(BufStatus::INIT, handle.status_);

EXPECT_EQ(slice_lengths[i], handle.size_);
}
Expand All @@ -986,10 +985,6 @@ TEST_F(MasterServiceTest, MultiSliceMultiReplicaFlow) {
EXPECT_EQ(ReplicaStatus::COMPLETE, replica.status);
ASSERT_EQ(slice_lengths.size(),
replica.get_memory_descriptor().buffer_descriptors.size());
for (const auto& handle :
replica.get_memory_descriptor().buffer_descriptors) {
EXPECT_EQ(BufStatus::COMPLETE, handle.status_);
}
}
}

Expand Down
Loading