Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions velox/benchmarks/unstable/MemoryAllocationBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class MemoryPoolAllocationBenchMark {
switch (type_) {
case Type::kMmap:
manager_ = std::make_shared<MemoryManager>(
IMemoryManager::Options{.alignment = alignment});
MemoryManagerOptions{.alignment = alignment});
break;
case Type::kStd:
manager_ = std::make_shared<MemoryManager>(
IMemoryManager::Options{.alignment = alignment});
MemoryManagerOptions{.alignment = alignment});
break;
default:
VELOX_USER_FAIL("Unknown allocator type: {}", static_cast<int>(type_));
Expand Down Expand Up @@ -139,7 +139,7 @@ class MemoryPoolAllocationBenchMark {
const size_t minSize_;
const size_t maxSize_;
folly::Random::DefaultGenerator rng_;
std::shared_ptr<IMemoryManager> manager_;
std::shared_ptr<MemoryManager> manager_;
std::shared_ptr<MemoryPool> pool_;
uint64_t sumAllocBytes_{0};
uint64_t numAllocs_{0};
Expand Down
19 changes: 17 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ constexpr folly::StringPiece kDefaultRootName{"__default_root__"};
constexpr folly::StringPiece kDefaultLeafName("__default_leaf__");
} // namespace

MemoryManager::MemoryManager(const Options& options)
MemoryManager::MemoryManager(const MemoryManagerOptions& options)
: capacity_{options.capacity},
allocator_{options.allocator->shared_from_this()},
arbitrator_(MemoryArbitrator::create(MemoryArbitrator::Config{
Expand Down Expand Up @@ -88,6 +88,21 @@ MemoryManager::~MemoryManager() {
}
}

// static
MemoryManager& MemoryManager::getInstance(
const MemoryManagerOptions& options,
bool ensureCapacity) {
static MemoryManager manager{options};
auto actualCapacity = manager.capacity();
VELOX_USER_CHECK(
!ensureCapacity || actualCapacity == options.capacity,
"Process level manager manager created with input capacity: {}, actual capacity: {}",
options.capacity,
actualCapacity);

return manager;
}

int64_t MemoryManager::capacity() const {
return capacity_;
}
Expand Down Expand Up @@ -245,7 +260,7 @@ std::vector<std::shared_ptr<MemoryPool>> MemoryManager::getAlivePools() const {
return pools;
}

IMemoryManager& defaultMemoryManager() {
MemoryManager& defaultMemoryManager() {
return MemoryManager::getInstance();
}

Expand Down
152 changes: 54 additions & 98 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MemoryPool.h"

DECLARE_int32(memory_usage_aggregation_interval_millis);
DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_memory_pool_debug_enabled);

Expand All @@ -60,158 +59,115 @@ namespace facebook::velox::memory {
"{}", \
errorMessage);

/// This class provides the interface of memory manager. The memory manager is
/// responsible for enforcing the memory capacity is within the capacity as well
/// as managing the memory pools.
class IMemoryManager {
public:
struct Options {
/// Specifies the default memory allocation alignment.
uint16_t alignment{MemoryAllocator::kMaxAlignment};
struct MemoryManagerOptions {
/// Specifies the default memory allocation alignment.
uint16_t alignment{MemoryAllocator::kMaxAlignment};

/// Specifies the max memory capacity in bytes.
int64_t capacity{kMaxMemory};

/// If true, check the memory pool and usage leaks on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
/// have been fixed.
bool checkUsageLeak{FLAGS_velox_memory_leak_check_enabled};

/// Specifies the max memory capacity in bytes.
int64_t capacity{kMaxMemory};
/// If true, the memory pool will be running in debug mode to track the
/// allocation and free call stacks to detect the source of memory leak for
/// testing purpose.
bool debugEnabled{FLAGS_velox_memory_pool_debug_enabled};

/// If true, check the memory pool and usage leaks on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
/// have been fixed.
bool checkUsageLeak{FLAGS_velox_memory_leak_check_enabled};
/// Specifies the backing memory allocator.
MemoryAllocator* allocator{MemoryAllocator::getInstance()};

/// If true, the memory pool will be running in debug mode to track the
/// allocation and free call stacks to detect the source of memory leak for
/// testing purpose.
bool debugEnabled{FLAGS_velox_memory_pool_debug_enabled};
/// Specifies the memory arbitration config.
MemoryArbitrator::Config arbitratorConfig{};
};

/// Specifies the backing memory allocator.
MemoryAllocator* allocator{MemoryAllocator::getInstance()};
/// 'MemoryManager' is responsible for managing the memory pools. For now, users
/// wanting multiple different allocators would need to instantiate different
/// MemoryManager classes and manage them across static boundaries.
class MemoryManager {
public:
explicit MemoryManager(
const MemoryManagerOptions& options = MemoryManagerOptions{});

/// Specifies the memory arbitration config.
MemoryArbitrator::Config arbitratorConfig{};
};
~MemoryManager();

virtual ~IMemoryManager() = default;
/// Tries to get the singleton memory manager. If not previously initialized,
/// the process singleton manager will be initialized with the given capacity.
FOLLY_EXPORT static MemoryManager& getInstance(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to cc? thanks!

const MemoryManagerOptions& options = MemoryManagerOptions{},
bool ensureCapacity = false);

/// Returns the memory capacity of this memory manager which puts a hard cap
/// on the memory usage, and any allocation that would exceed this capacity
/// throws.
virtual int64_t capacity() const = 0;
int64_t capacity() const;

/// Returns the memory allocation alignment of this memory manager.
virtual uint16_t alignment() const = 0;
uint16_t alignment() const;

/// Creates a root memory pool with specified 'name' and 'capacity'. If 'name'
/// is missing, the memory manager generates a default name internally to
/// ensure uniqueness.
virtual std::shared_ptr<MemoryPool> addRootPool(
std::shared_ptr<MemoryPool> addRootPool(
const std::string& name = "",
int64_t capacity = kMaxMemory,
std::unique_ptr<MemoryReclaimer> reclaimer = nullptr) = 0;
std::unique_ptr<MemoryReclaimer> reclaimer = nullptr);

/// Creates a leaf memory pool for direct memory allocation use with specified
/// 'name'. If 'name' is missing, the memory manager generates a default name
/// internally to ensure uniqueness. The leaf memory pool is created as the
/// child of the memory manager's default root memory pool. If 'threadSafe' is
/// true, then we track its memory usage in a non-thread-safe mode to reduce
/// its cpu cost.
virtual std::shared_ptr<MemoryPool> addLeafPool(
std::shared_ptr<MemoryPool> addLeafPool(
const std::string& name = "",
bool threadSafe = true) = 0;
bool threadSafe = true);

/// Invoked to grows a memory pool's free capacity with at least
/// 'incrementBytes'. The function returns true on success, otherwise false.
virtual bool growPool(MemoryPool* pool, uint64_t incrementBytes) = 0;
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

/// Default unmanaged leaf pool with no threadsafe stats support. Libraries
/// using this method can get a pool that is shared with other threads. The
/// goal is to minimize lock contention while supporting such use cases.
///
/// TODO: deprecate this API after all the use cases are able to manage the
/// lifecycle of the allocated memory pools properly.
virtual MemoryPool& deprecatedSharedLeafPool() = 0;

/// Returns the number of alive memory pools allocated from addRootPool() and
/// addLeafPool().
///
/// NOTE: this doesn't count the memory manager's internal default root and
/// leaf memory pools.
virtual size_t numPools() const = 0;
MemoryPool& deprecatedSharedLeafPool();

/// Returns the current total memory usage under this memory manager.
virtual int64_t getTotalBytes() const = 0;
int64_t getTotalBytes() const;

/// Reserves size for the allocation. Returns true if the total usage remains
/// under capacity after the reservation. Caller is responsible for releasing
/// the offending reservation.
///
/// TODO: deprecate this and enforce the memory usage capacity by memory
/// allocator.
virtual bool reserve(int64_t size) = 0;
bool reserve(int64_t size);

/// Subtracts from current total memory usage.
///
/// TODO: deprecate this and enforce the memory usage capacity by memory
/// allocator.
virtual void release(int64_t size) = 0;

/// Returns debug string of this memory manager.
virtual std::string toString() const = 0;
};

/// For now, users wanting multiple different allocators would need to
/// instantiate different MemoryManager classes and manage them across static
/// boundaries.
class MemoryManager final : public IMemoryManager {
public:
/// Tries to get the singleton memory manager. If not previously initialized,
/// the process singleton manager will be initialized with the given capacity.
FOLLY_EXPORT static MemoryManager& getInstance(
const Options& options = Options{},
bool ensureCapacity = false) {
static MemoryManager manager{options};
auto actualCapacity = manager.capacity();
VELOX_USER_CHECK(
!ensureCapacity || actualCapacity == options.capacity,
"Process level manager manager created with input capacity: {}, actual capacity: {}",
options.capacity,
actualCapacity);

return manager;
}

explicit MemoryManager(const Options& options = Options{});

~MemoryManager();

int64_t capacity() const final;

uint16_t alignment() const final;

std::shared_ptr<MemoryPool> addRootPool(
const std::string& name = "",
int64_t maxBytes = kMaxMemory,
std::unique_ptr<MemoryReclaimer> reclaimer = nullptr) final;

std::shared_ptr<MemoryPool> addLeafPool(
const std::string& name = "",
bool threadSafe = true) final;

bool growPool(MemoryPool* pool, uint64_t incrementBytes) final;

MemoryPool& deprecatedSharedLeafPool() final;

int64_t getTotalBytes() const final;

bool reserve(int64_t size) final;
void release(int64_t size) final;
void release(int64_t size);

size_t numPools() const final;
/// Returns the number of alive memory pools allocated from addRootPool() and
/// addLeafPool().
///
/// NOTE: this doesn't count the memory manager's internal default root and
/// leaf memory pools.
size_t numPools() const;

MemoryAllocator& allocator();

MemoryArbitrator* arbitrator();

std::string toString() const final;
/// Returns debug string of this memory manager.
std::string toString() const;

/// Returns the memory manger's internal default root memory pool for testing
/// purpose.
Expand Down Expand Up @@ -249,7 +205,7 @@ class MemoryManager final : public IMemoryManager {
std::unordered_map<std::string, std::weak_ptr<MemoryPool>> pools_;
};

IMemoryManager& defaultMemoryManager();
MemoryManager& defaultMemoryManager();

/// Creates a leaf memory pool from the default memory manager for memory
/// allocation use. If 'threadSafe' is true, then creates a leaf memory pool
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ constexpr int64_t kMaxMemory = std::numeric_limits<int64_t>::max();
/// node pool that corresponds to the plan node from which the operator is
/// created. Operator and node pools are owned by the Task via 'childPools_'.
///
/// The query pool is created from IMemoryManager::getChild() as a child of a
/// The query pool is created from MemoryManager::getChild() as a child of a
/// singleton root pool object (system pool). There is only one system pool for
/// a velox process. Hence each query pool objects forms a subtree rooted from
/// the system pool.
Expand All @@ -95,7 +95,7 @@ constexpr int64_t kMaxMemory = std::numeric_limits<int64_t>::max();
///
/// NOTE: for the users that integrate at expression evaluation level, we don't
/// need to build the memory pool hierarchy as described above. Users can either
/// create a single memory pool from IMemoryManager::getChild() to share with
/// create a single memory pool from MemoryManager::getChild() to share with
/// all the concurrent expression evaluations or create one dedicated memory
/// pool for each expression evaluation if they need per-expression memory quota
/// enforcement.
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ByteStreamTest : public testing::Test {
options.capacity = kMaxMappedMemory;
mmapAllocator_ = std::make_shared<MmapAllocator>(options);
MemoryAllocator::setDefaultInstance(mmapAllocator_.get());
memoryManager_ = std::make_unique<MemoryManager>(IMemoryManager::Options{
memoryManager_ = std::make_unique<MemoryManager>(MemoryManagerOptions{
.capacity = kMaxMemory, .allocator = MemoryAllocator::getInstance()});
pool_ = memoryManager_->addLeafPool("ByteStreamTest");
}
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/tests/ConcurrentAllocationBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ class MemoryAllocationBenchMark {
memory::MmapAllocator::Options mmapOptions;
mmapOptions.capacity = maxMemory;
allocator_ = std::make_shared<MmapAllocator>(mmapOptions);
manager_ = std::make_shared<MemoryManager>(IMemoryManager::Options{
manager_ = std::make_shared<MemoryManager>(MemoryManagerOptions{
.capacity = maxMemory, .allocator = allocator_.get()});
} break;
case Type::kMalloc:
manager_ = std::make_shared<MemoryManager>(
IMemoryManager::Options{.capacity = maxMemory});
MemoryManagerOptions{.capacity = maxMemory});
break;
default:
VELOX_USER_FAIL(
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/tests/MemoryAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ class MemoryAllocatorTest : public testing::TestWithParam<bool> {
MemoryAllocator::setDefaultInstance(allocator_.get());
}
instance_ = MemoryAllocator::getInstance();
memoryManager_ = std::make_unique<MemoryManager>(IMemoryManager::Options{
.capacity = kMaxMemory, .allocator = instance_});
memoryManager_ = std::make_unique<MemoryManager>(
MemoryManagerOptions{.capacity = kMaxMemory, .allocator = instance_});
pool_ = memoryManager_->addLeafPool("allocatorTest");
if (useMmap_) {
ASSERT_EQ(instance_->kind(), MemoryAllocator::Kind::kMmap);
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/tests/MemoryCapExceededTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ TEST_P(MemoryCapExceededTest, multipleDrivers) {
TEST_P(MemoryCapExceededTest, memoryManagerCapacityExeededError) {
// Executes a plan with no memory pool capacity limit but very small memory
// manager's limit.
memory::IMemoryManager::Options options{.capacity = 1 << 20};
memory::MemoryManagerOptions options{.capacity = 1 << 20};
memory::MemoryManager manager{options};

vector_size_t size = 1'024;
Expand Down
Loading