Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ class RayConfig {
return object_manager_push_timeout_ms_;
}

int object_manager_max_sends() const { return object_manager_max_sends_; }

int object_manager_max_receives() const {
return object_manager_max_receives_;
}

uint64_t object_manager_default_chunk_size() const {
return object_manager_default_chunk_size_;
}
Expand Down Expand Up @@ -142,9 +136,7 @@ class RayConfig {
// transfers.
object_manager_pull_timeout_ms_(20),
object_manager_push_timeout_ms_(10000),
object_manager_max_sends_(2),
object_manager_max_receives_(2),
object_manager_default_chunk_size_(100000000),
object_manager_default_chunk_size_(1000000),
num_workers_per_process_(1) {}

~RayConfig() {}
Expand Down Expand Up @@ -244,15 +236,10 @@ class RayConfig {
/// 0: giving up retrying immediately.
int object_manager_push_timeout_ms_;

/// Maximum number of concurrent sends allowed by the object manager.
int object_manager_max_sends_;

/// Maximum number of concurrent receives allowed by the object manager.
int object_manager_max_receives_;

/// Default chunk size for multi-chunk transfers to use in the object manager.
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size.
/// data than what is specified by the chunk size unless the number of object
/// chunks exceeds the number of available sending threads.
uint64_t object_manager_default_chunk_size_;

/// Number of workers per process
Expand Down
28 changes: 20 additions & 8 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size, int release_delay)
: chunk_size_(chunk_size) {
uint64_t chunk_size, int max_chunks, int release_delay)
: default_chunk_size_(chunk_size), max_chunks_(static_cast<uint64_t>(max_chunks)) {
store_socket_name_ = store_socket_name;
ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", release_delay));
}
Expand All @@ -24,13 +24,24 @@ ObjectBufferPool::~ObjectBufferPool() {
ARROW_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetChunkSize(uint64_t data_size) {
// If the number of chunks generated by the default chunk size exceeds the number of
// send threads, then use a chunk size such that the number of chunks is exactly
// the number of send threads.
if (data_size / default_chunk_size_ >= max_chunks_) {
return (data_size + default_chunk_size_ - 1) / max_chunks_;
}
return default_chunk_size_;
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
return (data_size + chunk_size_ - 1) / chunk_size_;
uint64_t chunk_size = GetChunkSize(data_size);
return (data_size + chunk_size - 1) / chunk_size;
}

uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) {
return (chunk_index + 1) * chunk_size_ > data_size ? data_size % chunk_size_
: chunk_size_;
uint64_t chunk_size = GetChunkSize(data_size);
return (chunk_index + 1) * chunk_size > data_size ? data_size % chunk_size : chunk_size;
}

std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::GetChunk(
Expand Down Expand Up @@ -168,17 +179,18 @@ void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {

std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
const ObjectID &object_id, uint8_t *data, uint64_t data_size) {
uint64_t chunk_size = GetChunkSize(data_size);
uint64_t space_remaining = data_size;
std::vector<ChunkInfo> chunks;
int64_t position = 0;
while (space_remaining) {
position = data_size - space_remaining;
if (space_remaining < chunk_size_) {
if (space_remaining < chunk_size) {
chunks.emplace_back(chunks.size(), data + position, space_remaining);
space_remaining = 0;
} else {
chunks.emplace_back(chunks.size(), data + position, chunk_size_);
space_remaining -= chunk_size_;
chunks.emplace_back(chunks.size(), data + position, chunk_size);
space_remaining -= chunk_size;
}
}
return chunks;
Expand Down
9 changes: 7 additions & 2 deletions src/ray/object_manager/object_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ObjectBufferPool {
/// \param release_delay The number of release calls before objects are released
/// from the store client (FIFO).
ObjectBufferPool(const std::string &store_socket_name, const uint64_t chunk_size,
const int release_delay);
int max_chunks, const int release_delay);

~ObjectBufferPool();

Expand Down Expand Up @@ -125,6 +125,9 @@ class ObjectBufferPool {
void SealChunk(const ObjectID &object_id, uint64_t chunk_index);

private:
/// Gets the chunk size based on data size.
uint64_t GetChunkSize(uint64_t data_size);

/// Abort the create operation associated with an object. This destroys the buffer
/// state, including create operations in progress for all chunks of the object.
void AbortCreate(const ObjectID &object_id);
Expand Down Expand Up @@ -177,7 +180,9 @@ class ObjectBufferPool {
/// get_buffer_state_, create_buffer_state_, and store_client_.
std::mutex pool_mutex_;
/// Determines the maximum chunk size to be transferred by a single thread.
const uint64_t chunk_size_;
const uint64_t default_chunk_size_;
/// The maximum number of chunks allowed.
const uint64_t max_chunks_;
/// The state of a buffer that's currently being used.
std::unordered_map<ray::ObjectID, GetBufferState> get_buffer_state_;
/// The state of a buffer that's currently being used.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
config_.max_sends,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use max_chunks to make it consistent with object_buffer_pool.h.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration max_sends is used in other places in the code where the name max_sends makes more sense than max_chunks. I think keeping it max_sends here makes more sense.

/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
Expand All @@ -40,6 +41,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
config_.max_sends,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
Expand Down
18 changes: 14 additions & 4 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
push_timeout_ms = 1000;

// start first server
Expand Down Expand Up @@ -192,6 +189,10 @@ class TestObjectManagerBase : public ::testing::Test {
std::string store_id_2;

uint push_timeout_ms;

int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
};

class TestObjectManager : public TestObjectManagerBase {
Expand Down Expand Up @@ -436,7 +437,16 @@ class TestObjectManager : public TestObjectManagerBase {
}));
}

void TestWaitComplete() { main_service.stop(); }
void TestWaitComplete() { TestBufferPool(); }

void TestBufferPool() {
// Ensure the number of chunks generated do not exceed the number of send threads.
for (uint64_t i = object_chunk_size / 2; i < 10 * object_chunk_size; ++i) {
uint64_t num_chunks = server1->object_manager_.buffer_pool_.GetNumChunks(i);
ASSERT_LE(num_chunks, max_sends);
}
main_service.stop();
}

void TestConnections() {
RAY_LOG(DEBUG) << "\n"
Expand Down
15 changes: 12 additions & 3 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,23 @@ int main(int argc, char *argv[]) {
object_manager_config.store_socket_name = store_socket_name;
object_manager_config.pull_timeout_ms =
RayConfig::instance().object_manager_pull_timeout_ms();
object_manager_config.max_sends = RayConfig::instance().object_manager_max_sends();
object_manager_config.max_receives =
RayConfig::instance().object_manager_max_receives();
object_manager_config.push_timeout_ms =
RayConfig::instance().object_manager_push_timeout_ms();

int num_cpus = static_cast<int>(static_resource_conf["CPU"]);
object_manager_config.max_sends = std::max(1, num_cpus / 4);
object_manager_config.max_receives = std::max(1, num_cpus / 4);
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();

RAY_LOG(INFO) << "Starting object manager with configuration: \n"
"max_sends = "
<< object_manager_config.max_sends << "\n"
"max_receives = "
<< object_manager_config.max_receives << "\n"
"object_chunk_size = "
<< object_manager_config.object_chunk_size;

// initialize mock gcs & object directory
auto gcs_client = std::make_shared<ray::gcs::AsyncGcsClient>();
RAY_LOG(INFO) << "Initializing GCS client "
Expand Down