Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
71da1ed
added store_client_ to object_manager and node_manager
istoica Jan 9, 2019
e43a221
half through...
istoica Jan 10, 2019
94cea3a
all code in, and compiling! Nothing tested though...
istoica Jan 10, 2019
bd71d14
something is working ;-)
istoica Jan 10, 2019
2c058f5
added a few more comments
istoica Jan 10, 2019
ded447e
now, add only one entry to the in GCS for inlined objects
istoica Jan 12, 2019
3e6f9b2
more comments
istoica Jan 12, 2019
4855736
remove a spurious todo
istoica Jan 12, 2019
71bdf21
some comment updates
istoica Jan 12, 2019
52e2611
add test
istoica Jan 12, 2019
9931dd1
Merge branch 'inline-objects' into inline-objects
pcmoritz Jan 12, 2019
88ed5e2
added support for meta data for inline objects
istoica Jan 13, 2019
4b6dd43
avoid some copies
istoica Jan 14, 2019
0869d26
Initialize plasma client in tests
stephanie-wang Jan 14, 2019
0fa9835
Better comments. Enable configuring nline_object_max_size_bytes.
istoica Jan 17, 2019
3c52065
Update src/ray/object_manager/object_manager.cc
zhijunfu Jan 17, 2019
2633db2
Update src/ray/raylet/node_manager.cc
zhijunfu Jan 17, 2019
dd02029
Update src/ray/raylet/node_manager.cc
zhijunfu Jan 17, 2019
42cb64c
fiexed comments
istoica Jan 20, 2019
477f731
fixed various typos in comments
istoica Jan 20, 2019
0052225
updated comments in object_manager.h and object_manager.cc
istoica Jan 22, 2019
d8df8d8
addressed all comments...hopefully ;-)
istoica Jan 22, 2019
0174f63
Only add eviction entries for objects that are not inlined
stephanie-wang Jan 24, 2019
f5cf045
fixed a bunch of comments
istoica Jan 25, 2019
f8345d3
fixed a bunch of comments
istoica Jan 25, 2019
efaa8d6
Fix test
stephanie-wang Jan 27, 2019
461934e
Fix object transfer dump test
stephanie-wang Jan 28, 2019
ffd5663
lint
stephanie-wang Jan 28, 2019
71ee334
Comments
stephanie-wang Jan 28, 2019
0e26655
Fix test?
stephanie-wang Jan 28, 2019
dd2be9a
Fix test?
stephanie-wang Jan 29, 2019
270b489
Merge branch 'master' into inline-objects
stephanie-wang Jan 29, 2019
e95b510
lint
stephanie-wang Jan 29, 2019
f14d1f4
fix build
stephanie-wang Jan 30, 2019
feb1e68
Merge branch 'master' into inline-objects
stephanie-wang Jan 30, 2019
ffcd402
Fix build
stephanie-wang Jan 30, 2019
90721a6
Merge branch 'master' into inline-objects
stephanie-wang Jan 30, 2019
f32a7b0
lint
stephanie-wang Jan 31, 2019
d284fa8
Use const ref
stephanie-wang Jan 31, 2019
a374ce6
Fixes, don't let object manager hang
stephanie-wang Feb 1, 2019
40b5d27
Increase object transfer retry time for travis?
stephanie-wang Feb 1, 2019
b663753
Fix test
stephanie-wang Feb 1, 2019
c29ab7c
Fix test?
stephanie-wang Feb 5, 2019
08f705a
Add internal config to java, fix PlasmaFreeTest
stephanie-wang Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
Expand Down Expand Up @@ -50,6 +52,7 @@ public class RayConfig {
public final Long objectStoreSize;

public final String rayletSocketName;
public final List<String> rayletConfigParameters;

public final String redisServerExecutablePath;
public final String redisModulePath;
Expand Down Expand Up @@ -162,6 +165,14 @@ public RayConfig(Config config) {
// raylet socket name
rayletSocketName = config.getString("ray.raylet.socket-name");

// raylet parameters
rayletConfigParameters = new ArrayList<String>();
Config rayletConfig = config.getConfig("ray.raylet.config");
for (java.util.Map.Entry<java.lang.String,ConfigValue> entry : rayletConfig.entrySet()) {
String parameter = entry.getKey() + "," + String.valueOf(entry.getValue().unwrapped());
rayletConfigParameters.add(parameter);
}

// library path
this.libraryPath = new ImmutableList.Builder<String>().add(
rayHome + "/build/src/plasma",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private void startRaylet() {
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
"", // The internal config list.
String.join(",", rayConfig.rayletConfigParameters), // The internal config list.
buildPythonWorkerCommand(), // python worker command
buildWorkerCommandRaylet() // java worker command
);
Expand Down
4 changes: 4 additions & 0 deletions java/runtime/src/main/resources/ray.default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ ray {
raylet {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet

// See src/ray/ray_config_def.h for options.
config {
}
}

}
2 changes: 2 additions & 0 deletions java/test/src/main/java/org/ray/api/test/BaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class BaseTest {
public void setUp() {
System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4,RES-A:4");
System.setProperty("ray.raylet.config.inline_object_max_size_bytes", "0");
Ray.init();
}

Expand All @@ -29,6 +30,7 @@ public void tearDown() {
// unset system properties
System.clearProperty("ray.home");
System.clearProperty("ray.resources");
System.clearProperty("ray.raylet.config.inline_object_max_size_bytes");
}

}
8 changes: 8 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ table FunctionTableData {
table ObjectTableData {
// The size of the object.
object_size: long;
// Is object in-lined? Inline objects are objects whose data and metadata are
// inlined in the GCS object table entry, which normally only specifies
// the object location.
inline_object_flag: bool;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// The number of times this object has been evicted from this node so far.
num_evictions: int;
// In-line object data.
inline_object_data: [ubyte];
// In-line object metadata.
inline_object_metadata: [ubyte];
}

table TaskReconstructionData {
Expand Down
110 changes: 86 additions & 24 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,67 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,

namespace {

/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
/// Process a suffix of the object table log.
/// If object is inlined (inline_object_flag = TRUE), its data and metadata are
/// stored with the object's entry so we read them into inline_object_data, and
/// inline_object_metadata, respectively.
/// If object is not inlined, store the result in client_ids.
/// This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix.
/// This function also stores a bool in has_been_created indicating whether the
/// object has ever been created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *has_been_created) {
bool *inline_object_flag,
std::vector<uint8_t> *inline_object_data,
std::string *inline_object_metadata, bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
// client1.is_eviction = true
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
//
// If object is inlined each entry contains both the object's data and metadata,
// so we don't care about its location.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
*has_been_created = true;
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (object_table_data.inline_object_flag) {
if (!*inline_object_flag) {
// This is the first time we're receiving the inline object data. Read
// object's data from the GCS entry.
*inline_object_flag = object_table_data.inline_object_flag;
inline_object_data->assign(object_table_data.inline_object_data.begin(),
object_table_data.inline_object_data.end());
inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(),
object_table_data.inline_object_metadata.end());
}
// We got the data and metadata of the object so exit the loop.
break;
}

if (!object_table_data.is_eviction) {
client_ids->insert(client_id);
} else {
client_ids->erase(client_id);
}
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;

if (!*inline_object_flag) {
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
}
}
}
Expand All @@ -62,6 +88,8 @@ void ObjectDirectory::RegisterBackend() {
// Update entries for this object.
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.inline_object_flag, &it->second.inline_object_data,
&it->second.inline_object_metadata,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
Expand All @@ -74,6 +102,8 @@ void ObjectDirectory::RegisterBackend() {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
it->second.inline_object_flag, it->second.inline_object_data,
it->second.inline_object_metadata,
it->second.has_been_created);
}
};
Expand All @@ -84,20 +114,32 @@ void ObjectDirectory::RegisterBackend() {

ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) {
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
// Append the addition entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->num_evictions = object_evictions_[object_id];
data->object_size = object_info.data_size;
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
data->inline_object_metadata.assign(inline_object_metadata.begin(),
inline_object_metadata.end());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
return status;
}

ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id;
// Append the eviction entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
Expand Down Expand Up @@ -147,16 +189,19 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
UpdateObjectLocations({}, gcs_client_->client_table(),
&listener.second.current_object_locations,
&listener.second.has_been_created);
UpdateObjectLocations(
{}, gcs_client_->client_table(), &listener.second.current_object_locations,
&listener.second.inline_object_flag, &listener.second.inline_object_data,
&listener.second.inline_object_metadata, &listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.has_been_created);
callback_pair.second(
object_id, listener.second.current_object_locations,
listener.second.inline_object_flag, listener.second.inline_object_data,
listener.second.inline_object_metadata, listener.second.has_been_created);
}
}
}
Expand All @@ -182,8 +227,14 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
auto inline_object_flag = listener_state.inline_object_flag;
const auto &inline_object_data = listener_state.inline_object_data;
const auto &inline_object_metadata = listener_state.inline_object_metadata;
io_service_.post([callback, locations, inline_object_flag, inline_object_data,
inline_object_metadata, object_id]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata,
/*has_been_created=*/true);
});
}
return status;
Expand Down Expand Up @@ -216,20 +267,31 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool inline_object_flag = false;
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &has_been_created);
&client_ids, &inline_object_flag, &inline_object_data,
&inline_object_metadata, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, has_been_created);
callback(object_id, client_ids, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
// If object inlined, we already have the object's data.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
bool inline_object_flag = it->second.inline_object_flag;
const auto &inline_object_data = it->second.inline_object_data;
const auto &inline_object_metadata = it->second.inline_object_metadata;
io_service_.post([callback, object_id, locations, inline_object_flag,
inline_object_data, inline_object_metadata, has_been_created]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
});
}
return status;
Expand Down
31 changes: 24 additions & 7 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;

/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;
using OnLocationsFound = std::function<void(
const ray::ObjectID &object_id, const std::unordered_set<ray::ClientID> &, bool,
const std::vector<uint8_t> &, const std::string &, bool has_been_created)>;

/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
Expand Down Expand Up @@ -99,10 +99,15 @@ class ObjectDirectoryInterface {
/// \param object_id The object id that was put into the store.
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param inline_object_data Object data. Only for inlined objects.
/// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) = 0;
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) = 0;

/// Report objects removed from this client's store to the object directory.
///
Expand Down Expand Up @@ -154,9 +159,12 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id) override;

ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) override;
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) override;

ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;

Expand All @@ -174,6 +182,15 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// Specify whether the object is inlined. The data and the metadata of
/// an inlined object are stored in the object's GCS entry. In this flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// an inlined object are stored in the object's GCS entry. In this flag
/// an inlined object are stored in the object's GCS entry. If this flag this set

/// (i.e., the object is inlined) the content of current_object_locations
/// can be ignored.
bool inline_object_flag;
/// Inlined object data, if inline_object_flag == true.
std::vector<uint8_t> inline_object_data;
/// Inlined object metadata, if inline_object_flag == true.
std::string inline_object_metadata;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
Expand Down
Loading