diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index f9fd299c47cb..5253f79fb62e 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -96,12 +96,6 @@ class RayConfig { return object_manager_push_timeout_ms_; } - int object_manager_max_sends() const { return object_manager_max_sends_; } - - int object_manager_max_receives() const { - return object_manager_max_receives_; - } - uint64_t object_manager_default_chunk_size() const { return object_manager_default_chunk_size_; } @@ -142,9 +136,7 @@ class RayConfig { // transfers. object_manager_pull_timeout_ms_(20), object_manager_push_timeout_ms_(10000), - object_manager_max_sends_(2), - object_manager_max_receives_(2), - object_manager_default_chunk_size_(100000000), + object_manager_default_chunk_size_(1000000), num_workers_per_process_(1) {} ~RayConfig() {} @@ -244,15 +236,10 @@ class RayConfig { /// 0: giving up retrying immediately. int object_manager_push_timeout_ms_; - /// Maximum number of concurrent sends allowed by the object manager. - int object_manager_max_sends_; - - /// Maximum number of concurrent receives allowed by the object manager. - int object_manager_max_receives_; - /// Default chunk size for multi-chunk transfers to use in the object manager. /// In the object manager, no single thread is permitted to transfer more - /// data than what is specified by the chunk size. + /// data than what is specified by the chunk size unless the number of object + /// chunks exceeds the number of available sending threads. uint64_t object_manager_default_chunk_size_; /// Number of workers per process diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 1ab9069bb36c..85bbcc0578cd 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -3,8 +3,8 @@ namespace ray { ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, - uint64_t chunk_size, int release_delay) - : chunk_size_(chunk_size) { + uint64_t chunk_size, int max_chunks, int release_delay) + : default_chunk_size_(chunk_size), max_chunks_(static_cast(max_chunks)) { store_socket_name_ = store_socket_name; ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", release_delay)); } @@ -24,13 +24,24 @@ ObjectBufferPool::~ObjectBufferPool() { ARROW_CHECK_OK(store_client_.Disconnect()); } +uint64_t ObjectBufferPool::GetChunkSize(uint64_t data_size) { + // If the number of chunks generated by the default chunk size exceeds the number of + // send threads, then use a chunk size such that the number of chunks is exactly + // the number of send threads. + if (data_size / default_chunk_size_ >= max_chunks_) { + return (data_size + default_chunk_size_ - 1) / max_chunks_; + } + return default_chunk_size_; +} + uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) { - return (data_size + chunk_size_ - 1) / chunk_size_; + uint64_t chunk_size = GetChunkSize(data_size); + return (data_size + chunk_size - 1) / chunk_size; } uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) { - return (chunk_index + 1) * chunk_size_ > data_size ? data_size % chunk_size_ - : chunk_size_; + uint64_t chunk_size = GetChunkSize(data_size); + return (chunk_index + 1) * chunk_size > data_size ? data_size % chunk_size : chunk_size; } std::pair ObjectBufferPool::GetChunk( @@ -168,17 +179,18 @@ void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { std::vector ObjectBufferPool::BuildChunks( const ObjectID &object_id, uint8_t *data, uint64_t data_size) { + uint64_t chunk_size = GetChunkSize(data_size); uint64_t space_remaining = data_size; std::vector chunks; int64_t position = 0; while (space_remaining) { position = data_size - space_remaining; - if (space_remaining < chunk_size_) { + if (space_remaining < chunk_size) { chunks.emplace_back(chunks.size(), data + position, space_remaining); space_remaining = 0; } else { - chunks.emplace_back(chunks.size(), data + position, chunk_size_); - space_remaining -= chunk_size_; + chunks.emplace_back(chunks.size(), data + position, chunk_size); + space_remaining -= chunk_size; } } return chunks; diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 4030e09e7d46..2b8fcfb6772c 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -44,7 +44,7 @@ class ObjectBufferPool { /// \param release_delay The number of release calls before objects are released /// from the store client (FIFO). ObjectBufferPool(const std::string &store_socket_name, const uint64_t chunk_size, - const int release_delay); + int max_chunks, const int release_delay); ~ObjectBufferPool(); @@ -125,6 +125,9 @@ class ObjectBufferPool { void SealChunk(const ObjectID &object_id, uint64_t chunk_index); private: + /// Gets the chunk size based on data size. + uint64_t GetChunkSize(uint64_t data_size); + /// Abort the create operation associated with an object. This destroys the buffer /// state, including create operations in progress for all chunks of the object. void AbortCreate(const ObjectID &object_id); @@ -177,7 +180,9 @@ class ObjectBufferPool { /// get_buffer_state_, create_buffer_state_, and store_client_. std::mutex pool_mutex_; /// Determines the maximum chunk size to be transferred by a single thread. - const uint64_t chunk_size_; + const uint64_t default_chunk_size_; + /// The maximum number of chunks allowed. + const uint64_t max_chunks_; /// The state of a buffer that's currently being used. std::unordered_map get_buffer_state_; /// The state of a buffer that's currently being used. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 1d5da6384a34..4f1568cddb9e 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -17,6 +17,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. buffer_pool_(config_.store_socket_name, config_.object_chunk_size, + config_.max_sends, /*release_delay=*/2 * config_.max_sends), send_work_(send_service_), receive_work_(receive_service_), @@ -40,6 +41,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. buffer_pool_(config_.store_socket_name, config_.object_chunk_size, + config_.max_sends, /*release_delay=*/2 * config_.max_sends), send_work_(send_service_), receive_work_(receive_service_), diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 192b4e997566..35f450d54ab9 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -111,9 +111,6 @@ class TestObjectManagerBase : public ::testing::Test { store_id_2 = StartStore(UniqueID::from_random().hex()); uint pull_timeout_ms = 1; - int max_sends = 2; - int max_receives = 2; - uint64_t object_chunk_size = static_cast(std::pow(10, 3)); push_timeout_ms = 1000; // start first server @@ -192,6 +189,10 @@ class TestObjectManagerBase : public ::testing::Test { std::string store_id_2; uint push_timeout_ms; + + int max_sends = 2; + int max_receives = 2; + uint64_t object_chunk_size = static_cast(std::pow(10, 3)); }; class TestObjectManager : public TestObjectManagerBase { @@ -436,7 +437,16 @@ class TestObjectManager : public TestObjectManagerBase { })); } - void TestWaitComplete() { main_service.stop(); } + void TestWaitComplete() { TestBufferPool(); } + + void TestBufferPool() { + // Ensure the number of chunks generated do not exceed the number of send threads. + for (uint64_t i = object_chunk_size / 2; i < 10 * object_chunk_size; ++i) { + uint64_t num_chunks = server1->object_manager_.buffer_pool_.GetNumChunks(i); + ASSERT_LE(num_chunks, max_sends); + } + main_service.stop(); + } void TestConnections() { RAY_LOG(DEBUG) << "\n" diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 2a7a837ef0fc..a49d83d3f384 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -54,14 +54,23 @@ int main(int argc, char *argv[]) { object_manager_config.store_socket_name = store_socket_name; object_manager_config.pull_timeout_ms = RayConfig::instance().object_manager_pull_timeout_ms(); - object_manager_config.max_sends = RayConfig::instance().object_manager_max_sends(); - object_manager_config.max_receives = - RayConfig::instance().object_manager_max_receives(); object_manager_config.push_timeout_ms = RayConfig::instance().object_manager_push_timeout_ms(); + + int num_cpus = static_cast(static_resource_conf["CPU"]); + object_manager_config.max_sends = std::max(1, num_cpus / 4); + object_manager_config.max_receives = std::max(1, num_cpus / 4); object_manager_config.object_chunk_size = RayConfig::instance().object_manager_default_chunk_size(); + RAY_LOG(INFO) << "Starting object manager with configuration: \n" + "max_sends = " + << object_manager_config.max_sends << "\n" + "max_receives = " + << object_manager_config.max_receives << "\n" + "object_chunk_size = " + << object_manager_config.object_chunk_size; + // initialize mock gcs & object directory auto gcs_client = std::make_shared(); RAY_LOG(INFO) << "Initializing GCS client "