Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ void ObjectDirectory::RegisterBackend() {
ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
const plasma::ObjectBuffer &plasma_buffer) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
// Append the addition entry to the object table.
Expand All @@ -128,9 +127,12 @@ ray::Status ObjectDirectory::ReportObjectAdded(
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
data->inline_object_metadata.assign(inline_object_metadata.begin(),
inline_object_metadata.end());
data->inline_object_data.assign(
plasma_buffer.data->data(),
plasma_buffer.data->data() + plasma_buffer.data->size());
data->inline_object_metadata.assign(
plasma_buffer.metadata->data(),
plasma_buffer.metadata->data() + plasma_buffer.metadata->size());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
Expand Down
12 changes: 6 additions & 6 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <unordered_set>
#include <vector>

#include "plasma/client.h"

#include "ray/gcs/client.h"
#include "ray/id.h"
#include "ray/object_manager/format/object_manager_generated.h"
Expand Down Expand Up @@ -100,14 +102,13 @@ class ObjectDirectoryInterface {
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param inline_object_data Object data. Only for inlined objects.
/// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \param plasma_buffer Object data and metadata from plasma. This data is
/// only valid for inlined objects (i.e., when inline_object_flag=true).
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) = 0;
const plasma::ObjectBuffer &plasma_buffer) = 0;

/// Report objects removed from this client's store to the object directory.
///
Expand Down Expand Up @@ -162,8 +163,7 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) override;
const plasma::ObjectBuffer &plasma_buffer) override;

ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
Expand Down
16 changes: 5 additions & 11 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,23 @@ void ObjectManager::HandleObjectAdded(
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool inline_object_flag = false;
plasma::ObjectBuffer object_buffer;
if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) {
// Inline object. Try to get the data from the object store.
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data != nullptr) {
// The object exists. Store the object data in the GCS entry.
// The object exists. Set inline_object_flag so that the object data
// will be stored in the GCS entry.
inline_object_flag = true;
inline_object_data.assign(
object_buffer.data->data(),
object_buffer.data->data() + object_buffer.data->size());
inline_object_metadata.assign(
object_buffer.metadata->data(),
object_buffer.metadata->data() + object_buffer.metadata->size());
// Mark this object as inlined, so that if this object is later
// evicted, we do not report it to the GCS.
local_inlined_objects_.insert(object_id);
}
}

RAY_CHECK_OK(object_directory_->ReportObjectAdded(
object_id, client_id_, object_info, inline_object_flag, inline_object_data,
inline_object_metadata));
RAY_CHECK_OK(object_directory_->ReportObjectAdded(object_id, client_id_, object_info,
inline_object_flag, object_buffer));
}
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.
Expand Down
34 changes: 26 additions & 8 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

#include "ray/object_manager/object_manager.h"

namespace {
std::string store_executable;
int64_t wait_timeout_ms;
bool test_inline_objects = false;
}

namespace ray {

static inline void flushall_redis(void) {
Expand All @@ -15,8 +21,6 @@ static inline void flushall_redis(void) {
redisFree(context);
}

std::string store_executable;

class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
Expand Down Expand Up @@ -337,32 +341,40 @@ class TestObjectManager : public TestObjectManagerBase {
}

void NextWaitTest() {
int data_size;
// Set the data size under or over the inline objects limit depending on
// the test configuration.
if (test_inline_objects) {
data_size = RayConfig::instance().inline_object_max_size_bytes() / 2;
} else {
data_size = RayConfig::instance().inline_object_max_size_bytes() * 2;
}
current_wait_test += 1;
switch (current_wait_test) {
case 0: {
// Ensure timeout_ms = 0 is handled correctly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, /*timeout_ms=*/0, false, false);
TestWait(data_size, 5, 3, /*timeout_ms=*/0, false, false);
} break;
case 1: {
// Ensure timeout_ms = 1000 is handled correctly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, /*timeout_ms=*/1000, false, false);
TestWait(data_size, 5, 3, wait_timeout_ms, false, false);
} break;
case 2: {
// Generate objects locally to ensure local object code-path works properly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, 1000, false, /*test_local=*/true);
TestWait(data_size, 5, 3, wait_timeout_ms, false, /*test_local=*/true);
} break;
case 3: {
// Wait on an object that's never registered with GCS to ensure timeout works
// properly.
TestWait(600, /*num_objects=*/5, /*required_objects=*/6, 1000,
TestWait(data_size, /*num_objects=*/5, /*required_objects=*/6, wait_timeout_ms,
/*include_nonexistent=*/true, false);
} break;
case 4: {
// Ensure infinite time code-path works properly.
TestWait(600, 5, 5, /*timeout_ms=*/-1, false, false);
TestWait(data_size, 5, 5, /*timeout_ms=*/-1, false, false);
} break;
}
}
Expand Down Expand Up @@ -475,6 +487,7 @@ class TestObjectManager : public TestObjectManagerBase {
};

TEST_F(TestObjectManager, StartTestObjectManager) {
// TODO: Break this test suite into unit tests.
auto AsyncStartTests = main_service.wrap([this]() { WaitConnections(); });
AsyncStartTests();
main_service.run();
Expand All @@ -484,6 +497,11 @@ TEST_F(TestObjectManager, StartTestObjectManager) {

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
ray::store_executable = std::string(argv[1]);
store_executable = std::string(argv[1]);
wait_timeout_ms = std::stoi(std::string(argv[2]));
// If a third argument is provided, then test with inline objects.
if (argc > 3) {
test_inline_objects = true;
}
return RUN_ALL_TESTS();
}
4 changes: 2 additions & 2 deletions src/ray/raylet/reconstruction_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
MOCK_METHOD6(ReportObjectAdded,
MOCK_METHOD5(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &,
const object_manager::protocol::ObjectInfoT &, bool,
const std::vector<uint8_t> &, const std::string &));
const plasma::ObjectBuffer &));

MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));

Expand Down
5 changes: 4 additions & 1 deletion src/ray/test/run_object_manager_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ sleep 1s
# Run tests.
$CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
sleep 1s
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC
# Use timeout=1000ms for the Wait tests.
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000
# Run tests again with inlined objects.
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000 true
$REDIS_DIR/redis-cli -p 6379 shutdown
sleep 1s

Expand Down
8 changes: 6 additions & 2 deletions src/ray/test/run_object_manager_valgrind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ sleep 1s
${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
sleep 1s

# Run tests.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC
# Run tests. Use timeout=10000ms for the Wait tests since tests run slower
# in valgrind.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000
sleep 1s
# Run tests again with inlined objects.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 true
sleep 1s
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
$REDIS_DIR/redis-cli -p 6379 shutdown
Expand Down