diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh
index a983a6c5e05b..d5fb8d2da17a 100755
--- a/.travis/install-dependencies.sh
+++ b/.travis/install-dependencies.sh
@@ -24,7 +24,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then
wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-Linux-x86_64.sh -O miniconda.sh -nv
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
- pip install -q cython==0.29.0 cmake tensorflow gym opencv-python pyyaml pandas==0.23.4 requests \
+ pip install -q cython==0.29.0 cmake tensorflow gym==0.10.11 opencv-python pyyaml pandas==0.23.4 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
@@ -50,7 +50,7 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
- pip install -q cython==0.29.0 cmake tensorflow gym opencv-python pyyaml pandas==0.23.4 requests \
+ pip install -q cython==0.29.0 cmake tensorflow gym==0.10.11 opencv-python pyyaml pandas==0.23.4 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
# check that brew is installed
diff --git a/cmake/Modules/ArrowExternalProject.cmake b/cmake/Modules/ArrowExternalProject.cmake
index f691d990b283..b6cba2e0072c 100644
--- a/cmake/Modules/ArrowExternalProject.cmake
+++ b/cmake/Modules/ArrowExternalProject.cmake
@@ -15,17 +15,17 @@
# - PLASMA_SHARED_LIB
set(arrow_URL https://github.com/ray-project/arrow.git)
-# This commit is based on https://github.com/apache/arrow/pull/3410. We
+# This commit is based on https://github.com/apache/arrow/pull/3526. We
# include the link here to make it easier to find the right commit because
# Arrow often rewrites git history and invalidates certain commits.
# It has been patched to fix an upstream symbol clash with TensorFlow,
# the patch is available at
-# https://github.com/ray-project/arrow/commit/511dae1149e3656bbf84f461729f2306d2ebf2e5
+# https://github.com/ray-project/arrow/commit/007e1ca289e979bac80231fa9ee7510be744b60b
# See the discussion in https://github.com/apache/arrow/pull/3177
# WARNING: If the arrow version is updated, you need to also update the
# SETUPTOOLS_SCM_PRETEND_VERSION version string in the ThirdpartyToolchain.cmake
# file
-set(arrow_TAG 511dae1149e3656bbf84f461729f2306d2ebf2e5)
+set(arrow_TAG 007e1ca289e979bac80231fa9ee7510be744b60b)
set(ARROW_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}/external/arrow-install)
set(ARROW_HOME ${ARROW_INSTALL_PREFIX})
diff --git a/doc/source/example-parameter-server.rst b/doc/source/example-parameter-server.rst
index 3411d50513d5..f104ca00afb0 100644
--- a/doc/source/example-parameter-server.rst
+++ b/doc/source/example-parameter-server.rst
@@ -123,5 +123,5 @@ resulting gradients.
current_weights = ps.apply_gradients.remote(*gradients)
Both of these examples implement the parameter server using a single actor,
-however they can be easily extended to **shard the parameters across multiple
+however they can be easily extended to **split the parameters across multiple
actors**.
diff --git a/docker/examples/Dockerfile b/docker/examples/Dockerfile
index 286de726f4f5..9244ec3983dc 100644
--- a/docker/examples/Dockerfile
+++ b/docker/examples/Dockerfile
@@ -5,7 +5,7 @@ FROM ray-project/deploy
# This updates numpy to 1.14 and mutes errors from other libraries
RUN conda install -y numpy
RUN apt-get install -y zlib1g-dev
-RUN pip install gym[atari] opencv-python==3.2.0.8 tensorflow lz4 keras pytest-timeout smart_open
+RUN pip install gym[atari]==0.10.11 opencv-python==3.2.0.8 tensorflow lz4 keras pytest-timeout smart_open
RUN pip install -U h5py # Mutes FutureWarnings
RUN pip install --upgrade bayesian-optimization
RUN pip install --upgrade git+git://github.com/hyperopt/hyperopt.git
diff --git a/java/pom.xml b/java/pom.xml
index b5b13d417703..0ccb62a5811d 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -29,7 +29,7 @@
org.apache.arrow
arrow-plasma
- 0.10.0
+ 0.13.0-SNAPSHOT
de.ruedigermoeller
diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java
index d4d90f24ece2..12a62dd1c6bb 100644
--- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java
+++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java
@@ -6,8 +6,10 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValue;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
@@ -50,6 +52,7 @@ public class RayConfig {
public final Long objectStoreSize;
public final String rayletSocketName;
+ public final List rayletConfigParameters;
public final String redisServerExecutablePath;
public final String redisModulePath;
@@ -162,6 +165,14 @@ public RayConfig(Config config) {
// raylet socket name
rayletSocketName = config.getString("ray.raylet.socket-name");
+ // raylet parameters
+ rayletConfigParameters = new ArrayList();
+ Config rayletConfig = config.getConfig("ray.raylet.config");
+ for (java.util.Map.Entry entry : rayletConfig.entrySet()) {
+ String parameter = entry.getKey() + "," + String.valueOf(entry.getValue().unwrapped());
+ rayletConfigParameters.add(parameter);
+ }
+
// library path
this.libraryPath = new ImmutableList.Builder().add(
rayHome + "/build/src/plasma",
diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java
index 466233e9b32f..3f67a0b827dd 100644
--- a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java
+++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java
@@ -1,11 +1,13 @@
package org.ray.runtime.objectstore;
+import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.arrow.plasma.ObjectStoreLink;
+import org.apache.arrow.plasma.ObjectStoreLink.ObjectStoreData;
import org.ray.api.id.UniqueId;
import org.ray.runtime.RayDevRuntime;
import org.ray.runtime.raylet.MockRayletClient;
@@ -57,12 +59,18 @@ public List get(byte[][] objectIds, int timeoutMs, boolean isMetadata) {
}
@Override
- public List wait(byte[][] objectIds, int timeoutMs, int numReturns) {
- ArrayList rets = new ArrayList<>();
+ public List get(byte[][] objectIds, int timeoutMs) {
+ ArrayList rets = new ArrayList<>();
+ // TODO(yuhguo): make ObjectStoreData's constructor public.
for (byte[] objId : objectIds) {
- //tod test
- if (data.containsKey(new UniqueId(objId))) {
- rets.add(objId);
+ UniqueId uniqueId = new UniqueId(objId);
+ try {
+ Constructor constructor = ObjectStoreData.class.getConstructor(
+ byte[].class, byte[].class);
+ constructor.setAccessible(true);
+ rets.add(constructor.newInstance(metadata.get(uniqueId), data.get(uniqueId)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
return rets;
@@ -73,11 +81,6 @@ public byte[] hash(byte[] objectId) {
return null;
}
- @Override
- public void fetch(byte[][] objectIds) {
-
- }
-
@Override
public long evict(long numBytes) {
return 0;
@@ -89,8 +92,12 @@ public void release(byte[] objectId) {
}
@Override
- public boolean contains(byte[] objectId) {
+ public void delete(byte[] objectId) {
+ return;
+ }
+ @Override
+ public boolean contains(byte[] objectId) {
return data.containsKey(new UniqueId(objectId));
}
diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java
index 2bbb457dd870..31026930f347 100644
--- a/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java
+++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java
@@ -4,6 +4,7 @@
import java.util.List;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.arrow.plasma.PlasmaClient;
+import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
@@ -12,6 +13,8 @@
import org.ray.runtime.config.RunMode;
import org.ray.runtime.util.Serializer;
import org.ray.runtime.util.UniqueIdUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Object store proxy, which handles serialization and deserialization, and utilize a {@code
@@ -19,6 +22,8 @@
*/
public class ObjectStoreProxy {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ObjectStoreProxy.class);
+
private static final int GET_TIMEOUT_MS = 1000;
private final AbstractRayRuntime runtime;
@@ -82,11 +87,19 @@ public List> get(List ids, int timeoutMs, boole
}
public void put(UniqueId id, Object obj, Object metadata) {
- objectStore.get().put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata));
+ try {
+ objectStore.get().put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata));
+ } catch (DuplicateObjectException e) {
+ LOGGER.warn(e.getMessage());
+ }
}
public void putSerialized(UniqueId id, byte[] obj, byte[] metadata) {
- objectStore.get().put(id.getBytes(), obj, metadata);
+ try {
+ objectStore.get().put(id.getBytes(), obj, metadata);
+ } catch (DuplicateObjectException e) {
+ LOGGER.warn(e.getMessage());
+ }
}
public enum GetStatus {
diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java
index 7b25882dd600..0233d15c5e48 100644
--- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java
+++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java
@@ -205,7 +205,7 @@ private void startRaylet() {
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
- "", // The internal config list.
+ String.join(",", rayConfig.rayletConfigParameters), // The internal config list.
buildPythonWorkerCommand(), // python worker command
buildWorkerCommandRaylet() // java worker command
);
diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf
index b45d7dc6376d..a77e1bc4dd76 100644
--- a/java/runtime/src/main/resources/ray.default.conf
+++ b/java/runtime/src/main/resources/ray.default.conf
@@ -85,6 +85,10 @@ ray {
raylet {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet
+
+ // See src/ray/ray_config_def.h for options.
+ config {
+ }
}
}
diff --git a/java/test/src/main/java/org/ray/api/test/BaseTest.java b/java/test/src/main/java/org/ray/api/test/BaseTest.java
index 23f893a46425..55e27dbda90c 100644
--- a/java/test/src/main/java/org/ray/api/test/BaseTest.java
+++ b/java/test/src/main/java/org/ray/api/test/BaseTest.java
@@ -13,6 +13,7 @@ public class BaseTest {
public void setUp() {
System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4,RES-A:4");
+ System.setProperty("ray.raylet.config.inline_object_max_size_bytes", "0");
Ray.init();
}
@@ -29,6 +30,7 @@ public void tearDown() {
// unset system properties
System.clearProperty("ray.home");
System.clearProperty("ray.resources");
+ System.clearProperty("ray.raylet.config.inline_object_max_size_bytes");
}
}
diff --git a/python/ray/__init__.py b/python/ray/__init__.py
index 4d99dead0f6e..c5d3687203e8 100644
--- a/python/ray/__init__.py
+++ b/python/ray/__init__.py
@@ -51,7 +51,7 @@
from ray._raylet import (UniqueID, ObjectID, DriverID, ClientID, ActorID,
ActorHandleID, FunctionID, ActorClassID, TaskID,
- Config as _Config) # noqa: E402
+ _ID_TYPES, Config as _Config) # noqa: E402
_config = _Config()
@@ -70,14 +70,15 @@
from ray.actor import method # noqa: E402
# Ray version string.
-__version__ = "0.6.2"
+__version__ = "0.6.3"
__all__ = [
"error_info", "init", "connect", "disconnect", "get", "put", "wait",
"remote", "profile", "actor", "method", "get_gpu_ids", "get_resource_ids",
"get_webui_url", "register_custom_serializer", "shutdown",
"is_initialized", "SCRIPT_MODE", "WORKER_MODE", "LOCAL_MODE",
- "PYTHON_MODE", "global_state", "_config", "__version__", "internal"
+ "PYTHON_MODE", "global_state", "_config", "__version__", "internal",
+ "_ID_TYPES"
]
__all__ += [
diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py
index 350a6250cdd2..096b081f2ac8 100644
--- a/python/ray/autoscaler/commands.py
+++ b/python/ray/autoscaler/commands.py
@@ -82,35 +82,37 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
provider = get_node_provider(config["provider"], config["cluster_name"])
- def remaining_nodes():
- if workers_only:
- A = []
- else:
- A = [
+ try:
+
+ def remaining_nodes():
+ if workers_only:
+ A = []
+ else:
+ A = [
+ node_id for node_id in provider.nodes({
+ TAG_RAY_NODE_TYPE: "head"
+ })
+ ]
+
+ A += [
node_id for node_id in provider.nodes({
- TAG_RAY_NODE_TYPE: "head"
+ TAG_RAY_NODE_TYPE: "worker"
})
]
-
- A += [
- node_id for node_id in provider.nodes({
- TAG_RAY_NODE_TYPE: "worker"
- })
- ]
- return A
-
- # Loop here to check that both the head and worker nodes are actually
- # really gone
- A = remaining_nodes()
- with LogTimer("teardown_cluster: Termination done."):
- while A:
- logger.info("teardown_cluster: "
- "Terminating {} nodes...".format(len(A)))
- provider.terminate_nodes(A)
- time.sleep(1)
- A = remaining_nodes()
-
- provider.cleanup()
+ return A
+
+ # Loop here to check that both the head and worker nodes are actually
+ # really gone
+ A = remaining_nodes()
+ with LogTimer("teardown_cluster: Termination done."):
+ while A:
+ logger.info("teardown_cluster: "
+ "Terminating {} nodes...".format(len(A)))
+ provider.terminate_nodes(A)
+ time.sleep(1)
+ A = remaining_nodes()
+ finally:
+ provider.cleanup()
def kill_node(config_file, yes, override_cluster_name):
@@ -147,121 +149,125 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
override_cluster_name):
"""Create the cluster head node, which in turn creates the workers."""
provider = get_node_provider(config["provider"], config["cluster_name"])
- head_node_tags = {
- TAG_RAY_NODE_TYPE: "head",
- }
- nodes = provider.nodes(head_node_tags)
- if len(nodes) > 0:
+ try:
+ head_node_tags = {
+ TAG_RAY_NODE_TYPE: "head",
+ }
+ nodes = provider.nodes(head_node_tags)
+ if len(nodes) > 0:
+ head_node = nodes[0]
+ else:
+ head_node = None
+
+ if not head_node:
+ confirm("This will create a new cluster", yes)
+ elif not no_restart:
+ confirm("This will restart cluster services", yes)
+
+ launch_hash = hash_launch_conf(config["head_node"], config["auth"])
+ if head_node is None or provider.node_tags(head_node).get(
+ TAG_RAY_LAUNCH_CONFIG) != launch_hash:
+ if head_node is not None:
+ confirm("Head node config out-of-date. It will be terminated",
+ yes)
+ logger.info(
+ "get_or_create_head_node: "
+ "Terminating outdated head node {}".format(head_node))
+ provider.terminate_node(head_node)
+ logger.info("get_or_create_head_node: Launching new head node...")
+ head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
+ head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
+ config["cluster_name"])
+ provider.create_node(config["head_node"], head_node_tags, 1)
+
+ nodes = provider.nodes(head_node_tags)
+ assert len(nodes) == 1, "Failed to create head node."
head_node = nodes[0]
- else:
- head_node = None
-
- if not head_node:
- confirm("This will create a new cluster", yes)
- elif not no_restart:
- confirm("This will restart cluster services", yes)
-
- launch_hash = hash_launch_conf(config["head_node"], config["auth"])
- if head_node is None or provider.node_tags(head_node).get(
- TAG_RAY_LAUNCH_CONFIG) != launch_hash:
- if head_node is not None:
- confirm("Head node config out-of-date. It will be terminated", yes)
- logger.info("get_or_create_head_node: "
- "Terminating outdated head node {}".format(head_node))
- provider.terminate_node(head_node)
- logger.info("get_or_create_head_node: Launching new head node...")
- head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
- head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
- config["cluster_name"])
- provider.create_node(config["head_node"], head_node_tags, 1)
-
- nodes = provider.nodes(head_node_tags)
- assert len(nodes) == 1, "Failed to create head node."
- head_node = nodes[0]
-
- # TODO(ekl) right now we always update the head node even if the hash
- # matches. We could prompt the user for what they want to do in this case.
- runtime_hash = hash_runtime_conf(config["file_mounts"], config)
- logger.info("get_or_create_head_node: Updating files on head node...")
-
- # Rewrite the auth config so that the head node can update the workers
- remote_key_path = "~/ray_bootstrap_key.pem"
- remote_config = copy.deepcopy(config)
- remote_config["auth"]["ssh_private_key"] = remote_key_path
-
- # Adjust for new file locations
- new_mounts = {}
- for remote_path in config["file_mounts"]:
- new_mounts[remote_path] = remote_path
- remote_config["file_mounts"] = new_mounts
- remote_config["no_restart"] = no_restart
-
- # Now inject the rewritten config and SSH key into the head node
- remote_config_file = tempfile.NamedTemporaryFile(
- "w", prefix="ray-bootstrap-")
- remote_config_file.write(json.dumps(remote_config))
- remote_config_file.flush()
- config["file_mounts"].update({
- remote_key_path: config["auth"]["ssh_private_key"],
- "~/ray_bootstrap_config.yaml": remote_config_file.name
- })
-
- if restart_only:
- init_commands = config["head_start_ray_commands"]
- elif no_restart:
- init_commands = (
- config["setup_commands"] + config["head_setup_commands"])
- else:
- init_commands = (
- config["setup_commands"] + config["head_setup_commands"] +
- config["head_start_ray_commands"])
-
- updater = NodeUpdaterThread(
- head_node,
- config["provider"],
- provider,
- config["auth"],
- config["cluster_name"],
- config["file_mounts"],
- init_commands,
- runtime_hash,
- )
- updater.start()
- updater.join()
-
- # Refresh the node cache so we see the external ip if available
- provider.nodes(head_node_tags)
-
- if updater.exitcode != 0:
- logger.error("get_or_create_head_node: "
- "Updating {} failed".format(
- provider.external_ip(head_node)))
- sys.exit(1)
- logger.info("get_or_create_head_node: "
- "Head node up-to-date, IP address is: {}".format(
- provider.external_ip(head_node)))
-
- monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
- for s in init_commands:
- if ("ray start" in s and "docker exec" in s
- and "--autoscaling-config" in s):
- monitor_str = "docker exec {} /bin/sh -c {}".format(
- config["docker"]["container_name"], quote(monitor_str))
- if override_cluster_name:
- modifiers = " --cluster-name={}".format(quote(override_cluster_name))
- else:
- modifiers = ""
- print("To monitor auto-scaling activity, you can run:\n\n"
- " ray exec {} {}{}\n".format(config_file, quote(monitor_str),
- modifiers))
- print("To open a console on the cluster:\n\n"
- " ray attach {}{}\n".format(config_file, modifiers))
- print("To ssh manually to the cluster, run:\n\n"
- " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
- config["auth"]["ssh_user"],
- provider.external_ip(head_node)))
- provider.cleanup()
+ # TODO(ekl) right now we always update the head node even if the hash
+ # matches. We could prompt the user for what they want to do here.
+ runtime_hash = hash_runtime_conf(config["file_mounts"], config)
+ logger.info("get_or_create_head_node: Updating files on head node...")
+
+ # Rewrite the auth config so that the head node can update the workers
+ remote_key_path = "~/ray_bootstrap_key.pem"
+ remote_config = copy.deepcopy(config)
+ remote_config["auth"]["ssh_private_key"] = remote_key_path
+
+ # Adjust for new file locations
+ new_mounts = {}
+ for remote_path in config["file_mounts"]:
+ new_mounts[remote_path] = remote_path
+ remote_config["file_mounts"] = new_mounts
+ remote_config["no_restart"] = no_restart
+
+ # Now inject the rewritten config and SSH key into the head node
+ remote_config_file = tempfile.NamedTemporaryFile(
+ "w", prefix="ray-bootstrap-")
+ remote_config_file.write(json.dumps(remote_config))
+ remote_config_file.flush()
+ config["file_mounts"].update({
+ remote_key_path: config["auth"]["ssh_private_key"],
+ "~/ray_bootstrap_config.yaml": remote_config_file.name
+ })
+
+ if restart_only:
+ init_commands = config["head_start_ray_commands"]
+ elif no_restart:
+ init_commands = (
+ config["setup_commands"] + config["head_setup_commands"])
+ else:
+ init_commands = (
+ config["setup_commands"] + config["head_setup_commands"] +
+ config["head_start_ray_commands"])
+
+ updater = NodeUpdaterThread(
+ head_node,
+ config["provider"],
+ provider,
+ config["auth"],
+ config["cluster_name"],
+ config["file_mounts"],
+ init_commands,
+ runtime_hash,
+ )
+ updater.start()
+ updater.join()
+
+ # Refresh the node cache so we see the external ip if available
+ provider.nodes(head_node_tags)
+
+ if updater.exitcode != 0:
+ logger.error("get_or_create_head_node: "
+ "Updating {} failed".format(
+ provider.external_ip(head_node)))
+ sys.exit(1)
+ logger.info("get_or_create_head_node: "
+ "Head node up-to-date, IP address is: {}".format(
+ provider.external_ip(head_node)))
+
+ monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
+ for s in init_commands:
+ if ("ray start" in s and "docker exec" in s
+ and "--autoscaling-config" in s):
+ monitor_str = "docker exec {} /bin/sh -c {}".format(
+ config["docker"]["container_name"], quote(monitor_str))
+ if override_cluster_name:
+ modifiers = " --cluster-name={}".format(
+ quote(override_cluster_name))
+ else:
+ modifiers = ""
+ print("To monitor auto-scaling activity, you can run:\n\n"
+ " ray exec {} {}{}\n".format(config_file, quote(monitor_str),
+ modifiers))
+ print("To open a console on the cluster:\n\n"
+ " ray attach {}{}\n".format(config_file, modifiers))
+ print("To ssh manually to the cluster, run:\n\n"
+ " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
+ config["auth"]["ssh_user"],
+ provider.external_ip(head_node)))
+ finally:
+ provider.cleanup()
def attach_cluster(config_file, start, use_tmux, override_cluster_name, new):
@@ -314,43 +320,45 @@ def exec_cluster(config_file, cmd, screen, tmux, stop, start,
config, config_file, override_cluster_name, create_if_needed=start)
provider = get_node_provider(config["provider"], config["cluster_name"])
- updater = NodeUpdaterThread(
- head_node,
- config["provider"],
- provider,
- config["auth"],
- config["cluster_name"],
- config["file_mounts"],
- [],
- "",
- )
- if stop:
- cmd += ("; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes "
+ try:
+ updater = NodeUpdaterThread(
+ head_node,
+ config["provider"],
+ provider,
+ config["auth"],
+ config["cluster_name"],
+ config["file_mounts"],
+ [],
+ "",
+ )
+ if stop:
+ cmd += (
+ "; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes "
"--workers-only; sudo shutdown -h now")
- _exec(
- updater,
- cmd,
- screen,
- tmux,
- expect_error=stop,
- port_forward=port_forward)
-
- if tmux or screen:
- attach_command_parts = ["ray attach", config_file]
- if override_cluster_name is not None:
- attach_command_parts.append(
- "--cluster-name={}".format(override_cluster_name))
- if tmux:
- attach_command_parts.append("--tmux")
- elif screen:
- attach_command_parts.append("--screen")
-
- attach_command = " ".join(attach_command_parts)
- attach_info = "Use `{}` to check on command status.".format(
- attach_command)
- logger.info(attach_info)
-
- provider.cleanup()
+ _exec(
+ updater,
+ cmd,
+ screen,
+ tmux,
+ expect_error=stop,
+ port_forward=port_forward)
+
+ if tmux or screen:
+ attach_command_parts = ["ray attach", config_file]
+ if override_cluster_name is not None:
+ attach_command_parts.append(
+ "--cluster-name={}".format(override_cluster_name))
+ if tmux:
+ attach_command_parts.append("--tmux")
+ elif screen:
+ attach_command_parts.append("--screen")
+
+ attach_command = " ".join(attach_command_parts)
+ attach_info = "Use `{}` to check on command status.".format(
+ attach_command)
+ logger.info(attach_info)
+ finally:
+ provider.cleanup()
def _exec(updater, cmd, screen, tmux, expect_error=False, port_forward=None):
@@ -395,23 +403,24 @@ def rsync(config_file, source, target, override_cluster_name, down):
config, config_file, override_cluster_name, create_if_needed=False)
provider = get_node_provider(config["provider"], config["cluster_name"])
- updater = NodeUpdaterThread(
- head_node,
- config["provider"],
- provider,
- config["auth"],
- config["cluster_name"],
- config["file_mounts"],
- [],
- "",
- )
- if down:
- rsync = updater.rsync_down
- else:
- rsync = updater.rsync_up
- rsync(source, target, check_error=False)
-
- provider.cleanup()
+ try:
+ updater = NodeUpdaterThread(
+ head_node,
+ config["provider"],
+ provider,
+ config["auth"],
+ config["cluster_name"],
+ config["file_mounts"],
+ [],
+ "",
+ )
+ if down:
+ rsync = updater.rsync_down
+ else:
+ rsync = updater.rsync_up
+ rsync(source, target, check_error=False)
+ finally:
+ provider.cleanup()
def get_head_node_ip(config_file, override_cluster_name):
@@ -422,9 +431,11 @@ def get_head_node_ip(config_file, override_cluster_name):
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
- head_node = _get_head_node(config, config_file, override_cluster_name)
- ip = provider.external_ip(head_node)
- provider.cleanup()
+ try:
+ head_node = _get_head_node(config, config_file, override_cluster_name)
+ ip = provider.external_ip(head_node)
+ finally:
+ provider.cleanup()
return ip
@@ -445,11 +456,13 @@ def _get_head_node(config,
override_cluster_name,
create_if_needed=False):
provider = get_node_provider(config["provider"], config["cluster_name"])
- head_node_tags = {
- TAG_RAY_NODE_TYPE: "head",
- }
- nodes = provider.nodes(head_node_tags)
- provider.cleanup()
+ try:
+ head_node_tags = {
+ TAG_RAY_NODE_TYPE: "head",
+ }
+ nodes = provider.nodes(head_node_tags)
+ finally:
+ provider.cleanup()
if len(nodes) > 0:
head_node = nodes[0]
diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi
index a29476742163..1f31179e4fce 100644
--- a/python/ray/includes/unique_ids.pxi
+++ b/python/ray/includes/unique_ids.pxi
@@ -4,6 +4,8 @@ We define different types for different IDs for type safety.
See https://github.com/ray-project/ray/issues/3721.
"""
+# WARNING: Any additional ID types defined in this file must be added to the
+# _ID_TYPES list at the bottom of this file.
from ray.includes.common cimport (
CUniqueID, CTaskID, CObjectID, CFunctionID, CActorClassID, CActorID,
CActorHandleID, CWorkerID, CDriverID, CConfigID, CClientID,
@@ -278,3 +280,7 @@ cdef class ActorClassID(UniqueID):
def __repr__(self):
return "ActorClassID(" + self.hex() + ")"
+
+
+_ID_TYPES = [UniqueID, ObjectID, TaskID, ClientID, DriverID, ActorID,
+ ActorHandleID, FunctionID, ActorClassID]
diff --git a/python/ray/rllib/optimizers/async_samples_optimizer.py b/python/ray/rllib/optimizers/async_samples_optimizer.py
index 541bcc1fa1c1..60b4eb69176c 100644
--- a/python/ray/rllib/optimizers/async_samples_optimizer.py
+++ b/python/ray/rllib/optimizers/async_samples_optimizer.py
@@ -167,8 +167,7 @@ def _step(self):
for b in self.batch_buffer) >= self.train_batch_size:
train_batch = self.batch_buffer[0].concat_samples(
self.batch_buffer)
- # defensive copy against plasma ref count bugs, see #3884
- self.learner.inqueue.put(train_batch.copy())
+ self.learner.inqueue.put(train_batch)
self.batch_buffer = []
# If the batch was replayed, skip the update below.
diff --git a/python/ray/rllib/setup-rllib-dev.py b/python/ray/rllib/setup-rllib-dev.py
index 3876a83f7988..d85f048d561c 100755
--- a/python/ray/rllib/setup-rllib-dev.py
+++ b/python/ray/rllib/setup-rllib-dev.py
@@ -11,28 +11,38 @@
import ray
-if __name__ == "__main__":
- rllib_home = os.path.abspath(os.path.join(ray.__file__, "../rllib"))
- local_home = os.path.abspath(os.path.dirname(__file__))
- assert os.path.isdir(rllib_home), rllib_home
+
+def do_link(package):
+ package_home = os.path.abspath(
+ os.path.join(ray.__file__, "../{}".format(package)))
+ local_home = os.path.abspath(
+ os.path.join(__file__, "../../{}".format(package)))
+ assert os.path.isdir(package_home), package_home
assert os.path.isdir(local_home), local_home
- click.confirm(
- "This will replace:\n {}\nwith a symlink to:\n {}".format(
- rllib_home, local_home),
- abort=True)
- if os.access(os.path.dirname(rllib_home), os.W_OK):
- subprocess.check_call(["rm", "-rf", rllib_home])
- subprocess.check_call(["ln", "-s", local_home, rllib_home])
+ if not click.confirm(
+ "This will replace:\n {}\nwith a symlink to:\n {}".format(
+ package_home, local_home),
+ default=True):
+ return
+ if os.access(os.path.dirname(package_home), os.W_OK):
+ subprocess.check_call(["rm", "-rf", package_home])
+ subprocess.check_call(["ln", "-s", local_home, package_home])
else:
print("You don't have write permission to {}, using sudo:".format(
- rllib_home))
- subprocess.check_call(["sudo", "rm", "-rf", rllib_home])
- subprocess.check_call(["sudo", "ln", "-s", local_home, rllib_home])
+ package_home))
+ subprocess.check_call(["sudo", "rm", "-rf", package_home])
+ subprocess.check_call(["sudo", "ln", "-s", local_home, package_home])
+
+
+if __name__ == "__main__":
+ do_link("rllib")
+ do_link("tune")
+ do_link("autoscaler")
print("Created links.\n\nIf you run into issues initializing Ray, please "
- "ensure that your local repo and the installed Ray is in sync "
+ "ensure that your local repo and the installed Ray are in sync "
"(pip install -U the latest wheels at "
"https://ray.readthedocs.io/en/latest/installation.html, "
"and ensure you are up-to-date on the master branch on git).\n\n"
- "Note that you may need to delete the rllib symlink when pip "
+ "Note that you may need to delete the package symlinks when pip "
"installing new Ray versions to prevent pip from overwriting files "
"in your git repo.")
diff --git a/python/ray/tune/examples/hyperopt_example.py b/python/ray/tune/examples/hyperopt_example.py
index d70d16b9488e..0dce74d52401 100644
--- a/python/ray/tune/examples/hyperopt_example.py
+++ b/python/ray/tune/examples/hyperopt_example.py
@@ -43,6 +43,19 @@ def easy_objective(config, reporter):
'activation': hp.choice("activation", ["relu", "tanh"])
}
+ current_best_params = [
+ {
+ "width": 1,
+ "height": 2,
+ "activation": 0 # Activation will be relu
+ },
+ {
+ "width": 4,
+ "height": 2,
+ "activation": 1 # Activation will be tanh
+ }
+ ]
+
config = {
"my_exp": {
"run": "exp",
@@ -55,6 +68,10 @@ def easy_objective(config, reporter):
},
}
}
- algo = HyperOptSearch(space, max_concurrent=4, reward_attr="neg_mean_loss")
+ algo = HyperOptSearch(
+ space,
+ max_concurrent=4,
+ reward_attr="neg_mean_loss",
+ points_to_evaluate=current_best_params)
scheduler = AsyncHyperBandScheduler(reward_attr="neg_mean_loss")
run_experiments(config, search_alg=algo, scheduler=scheduler)
diff --git a/python/ray/tune/examples/mnist_pytorch.py b/python/ray/tune/examples/mnist_pytorch.py
index a5fe48e5d1ba..a1cb983b112c 100644
--- a/python/ray/tune/examples/mnist_pytorch.py
+++ b/python/ray/tune/examples/mnist_pytorch.py
@@ -8,7 +8,6 @@
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
-from torch.autograd import Variable
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
@@ -120,7 +119,6 @@ def train(epoch):
for batch_idx, (data, target) in enumerate(train_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
- data, target = Variable(data), Variable(target)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
@@ -131,16 +129,17 @@ def test():
model.eval()
test_loss = 0
correct = 0
- for data, target in test_loader:
- if args.cuda:
- data, target = data.cuda(), target.cuda()
- data, target = Variable(data, volatile=True), Variable(target)
- output = model(data)
- test_loss += F.nll_loss(
- output, target, size_average=False).item() # sum up batch loss
- pred = output.data.max(
- 1, keepdim=True)[1] # get the index of the max log-probability
- correct += pred.eq(target.data.view_as(pred)).long().cpu().sum()
+ with torch.no_grad():
+ for data, target in test_loader:
+ if args.cuda:
+ data, target = data.cuda(), target.cuda()
+ output = model(data)
+ # sum up batch loss
+ test_loss += F.nll_loss(output, target, reduction='sum').item()
+ # get the index of the max log-probability
+ pred = output.argmax(dim=1, keepdim=True)
+ correct += pred.eq(
+ target.data.view_as(pred)).long().cpu().sum()
test_loss = test_loss / len(test_loader.dataset)
accuracy = correct.item() / len(test_loader.dataset)
@@ -176,7 +175,8 @@ def test():
"training_iteration": 1 if args.smoke_test else 20
},
"resources_per_trial": {
- "cpu": 3
+ "cpu": 3,
+ "gpu": int(not args.no_cuda)
},
"run": "train_mnist",
"num_samples": 1 if args.smoke_test else 10,
diff --git a/python/ray/tune/examples/mnist_pytorch_trainable.py b/python/ray/tune/examples/mnist_pytorch_trainable.py
index 24fc4951dd37..52aed6596abf 100644
--- a/python/ray/tune/examples/mnist_pytorch_trainable.py
+++ b/python/ray/tune/examples/mnist_pytorch_trainable.py
@@ -9,7 +9,6 @@
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
-from torch.autograd import Variable
from ray.tune import Trainable
@@ -127,7 +126,6 @@ def _train_iteration(self):
for batch_idx, (data, target) in enumerate(self.train_loader):
if self.args.cuda:
data, target = data.cuda(), target.cuda()
- data, target = Variable(data), Variable(target)
self.optimizer.zero_grad()
output = self.model(data)
loss = F.nll_loss(output, target)
@@ -138,18 +136,17 @@ def _test(self):
self.model.eval()
test_loss = 0
correct = 0
- for data, target in self.test_loader:
- if self.args.cuda:
- data, target = data.cuda(), target.cuda()
- data, target = Variable(data, volatile=True), Variable(target)
- output = self.model(data)
-
- # sum up batch loss
- test_loss += F.nll_loss(output, target, size_average=False).item()
-
- # get the index of the max log-probability
- pred = output.data.max(1, keepdim=True)[1]
- correct += pred.eq(target.data.view_as(pred)).long().cpu().sum()
+ with torch.no_grad():
+ for data, target in self.test_loader:
+ if self.args.cuda:
+ data, target = data.cuda(), target.cuda()
+ output = self.model(data)
+ # sum up batch loss
+ test_loss += F.nll_loss(output, target, reduction='sum').item()
+ # get the index of the max log-probability
+ pred = output.argmax(dim=1, keepdim=True)
+ correct += pred.eq(
+ target.data.view_as(pred)).long().cpu().sum()
test_loss = test_loss / len(self.test_loader.dataset)
accuracy = correct.item() / len(self.test_loader.dataset)
@@ -188,7 +185,8 @@ def _restore(self, checkpoint_path):
"training_iteration": 1 if args.smoke_test else 20,
},
"resources_per_trial": {
- "cpu": 3
+ "cpu": 3,
+ "gpu": int(not args.no_cuda)
},
"run": TrainMNIST,
"num_samples": 1 if args.smoke_test else 20,
diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py
index ff672a329e3f..85fa86afd464 100644
--- a/python/ray/tune/ray_trial_executor.py
+++ b/python/ray/tune/ray_trial_executor.py
@@ -35,7 +35,9 @@ def __init__(self, queue_trials=False):
def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
- num_gpus=trial.resources.gpu)(trial._get_trainable_cls())
+ num_gpus=trial.resources.gpu,
+ resources=trial.resources.custom_resources)(
+ trial._get_trainable_cls())
trial.init_logger()
# We checkpoint metadata here to try mitigating logdir duplication
@@ -229,16 +231,37 @@ def fetch_result(self, trial):
return result
def _commit_resources(self, resources):
+ committed = self._committed_resources
+ all_keys = set(resources.custom_resources).union(
+ set(committed.custom_resources))
+
+ custom_resources = {
+ k: committed.get(k) + resources.get_res_total(k)
+ for k in all_keys
+ }
+
self._committed_resources = Resources(
- self._committed_resources.cpu + resources.cpu_total(),
- self._committed_resources.gpu + resources.gpu_total())
+ committed.cpu + resources.cpu_total(),
+ committed.gpu + resources.gpu_total(),
+ custom_resources=custom_resources)
def _return_resources(self, resources):
+ committed = self._committed_resources
+
+ all_keys = set(resources.custom_resources).union(
+ set(committed.custom_resources))
+
+ custom_resources = {
+ k: committed.get(k) - resources.get_res_total(k)
+ for k in all_keys
+ }
self._committed_resources = Resources(
- self._committed_resources.cpu - resources.cpu_total(),
- self._committed_resources.gpu - resources.gpu_total())
- assert self._committed_resources.cpu >= 0
- assert self._committed_resources.gpu >= 0
+ committed.cpu - resources.cpu_total(),
+ committed.gpu - resources.gpu_total(),
+ custom_resources=custom_resources)
+
+ assert self._committed_resources.is_nonnegative(), (
+ "Resource invalid: {}".format(resources))
def _update_avail_resources(self, num_retries=5):
for i in range(num_retries):
@@ -247,28 +270,37 @@ def _update_avail_resources(self, num_retries=5):
logger.warning("Cluster resources not detected. Retrying...")
time.sleep(0.5)
- num_cpus = resources["CPU"]
- num_gpus = resources["GPU"]
+ resources = resources.copy()
+ num_cpus = resources.pop("CPU")
+ num_gpus = resources.pop("GPU")
+ custom_resources = resources
- self._avail_resources = Resources(int(num_cpus), int(num_gpus))
+ self._avail_resources = Resources(
+ int(num_cpus), int(num_gpus), custom_resources=custom_resources)
self._resources_initialized = True
def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources."""
self._update_avail_resources()
- cpu_avail = self._avail_resources.cpu - self._committed_resources.cpu
- gpu_avail = self._avail_resources.gpu - self._committed_resources.gpu
+ currently_available = Resources.subtract(self._avail_resources,
+ self._committed_resources)
- have_space = (resources.cpu_total() <= cpu_avail
- and resources.gpu_total() <= gpu_avail)
+ have_space = (
+ resources.cpu_total() <= currently_available.cpu
+ and resources.gpu_total() <= currently_available.gpu and all(
+ resources.get_res_total(res) <= currently_available.get(res)
+ for res in resources.custom_resources))
if have_space:
return True
can_overcommit = self._queue_trials
- if (resources.cpu_total() > 0 and cpu_avail <= 0) or \
- (resources.gpu_total() > 0 and gpu_avail <= 0):
+ if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \
+ (resources.gpu_total() > 0 and currently_available.gpu <= 0) or \
+ any((resources.get_res_total(res_name) > 0
+ and currently_available.get(res_name) <= 0)
+ for res_name in resources.custom_resources):
can_overcommit = False # requested resource is already saturated
if can_overcommit:
@@ -287,9 +319,18 @@ def debug_string(self):
"""Returns a human readable message for printing to the console."""
if self._resources_initialized:
- return "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
+ status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
+ customs = ", ".join([
+ "{}/{} {}".format(
+ self._committed_resources.get_res_total(name),
+ self._avail_resources.get_res_total(name), name)
+ for name in self._avail_resources.custom_resources
+ ])
+ if customs:
+ status += " ({})".format(customs)
+ return status
else:
return "Resources requested: ?"
@@ -297,8 +338,15 @@ def resource_string(self):
"""Returns a string describing the total resources available."""
if self._resources_initialized:
- return "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
- self._avail_resources.gpu)
+ res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
+ self._avail_resources.gpu)
+ if self._avail_resources.custom_resources:
+ custom = ", ".join(
+ "{} {}".format(
+ self._avail_resources.get_res_total(name), name)
+ for name in self._avail_resources.custom_resources)
+ res_str += " ({})".format(custom)
+ return res_str
else:
return "? CPUs, ? GPUs"
diff --git a/python/ray/tune/scripts.py b/python/ray/tune/scripts.py
new file mode 100644
index 000000000000..361dedfddffb
--- /dev/null
+++ b/python/ray/tune/scripts.py
@@ -0,0 +1,99 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import click
+import logging
+import glob
+import json
+import os
+import pandas as pd
+# from ray.tune.trial_runner import TrialRunner
+import sys
+
+
+def _flatten_dict(dt):
+ while any(type(v) is dict for v in dt.values()):
+ remove = []
+ add = {}
+ for key, value in dt.items():
+ if type(value) is dict:
+ for subkey, v in value.items():
+ add[":".join([key, subkey])] = v
+ remove.append(key)
+ dt.update(add)
+ for k in remove:
+ del dt[k]
+ return dt
+
+
+@click.group()
+def cli():
+ pass
+
+@cli.command()
+@click.argument("experiment_path", required=True, type=str)
+def list_trials(experiment_path):
+ _list_trials(experiment_path)
+
+def _list_trials(experiment_path):
+ experiment_path = os.path.expanduser(experiment_path)
+ print("start glob")
+ globs = glob.glob(os.path.join(experiment_path, "experiment_state*.json"))
+ print(globs)
+ filename = max(list(globs))
+ print("found")
+ with open(filename) as f:
+ experiment_state = json.load(f)
+
+ for trial_state in experiment_state["checkpoints"]:
+ print("{trial_name}\t{trial_id}\t{status}\t{num_failures}\t{logdir}".format(
+ **trial_state))
+
+@cli.command()
+@click.argument("project_path", required=True, type=str)
+def list_experiments(project_path):
+ _list_experiments(project_path)
+
+def _list_experiments(project_path):
+ base, experiment_paths, _ = list(os.walk(project_path))[0] # clean this
+ experiment_collection = {}
+ for experiment_path in experiment_paths:
+ experiment_state_path = glob.glob(os.path.join(base, experiment_path, "experiment_state*.json"))
+ if not experiment_state_path:
+ continue
+ else:
+ with open(experiment_state_path[0]) as f:
+ experiment_state = json.load(f)
+ # import ipdb; ipdb.set_trace()
+ experiment_collection[experiment_state_path[0]] = (pd.DataFrame(experiment_state["checkpoints"]), experiment_state["runner_data"], experiment_state["time_stamp"])
+
+ # total_ = pd.concat(experiment_collection.values())
+
+ from ray.tune.trial import Trial
+ all_values = []
+ for experiment_path, (df, data, timestamp) in experiment_collection.items():
+ status = {}
+ status["name"] = experiment_path
+ status["timestamp"] = timestamp
+ status["total_running"] = (df["status"] == Trial.RUNNING).sum()
+ status["total_terminated"] = (df["status"] == Trial.TERMINATED).sum()
+ status["total_errored"] = (df["status"] == Trial.ERROR).sum()
+ status["total_trials"] = df.shape[0]
+ all_values += [status]
+
+ final_dataframe = pd.DataFrame(all_values)
+ print(final_dataframe.to_string())
+
+
+
+cli.add_command(list_trials, name="ls")
+cli.add_command(list_experiments, name="lsx")
+
+
+def main():
+ return cli()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/python/ray/tune/suggest/hyperopt.py b/python/ray/tune/suggest/hyperopt.py
index 2c1c1317616d..2c32562505f9 100644
--- a/python/ray/tune/suggest/hyperopt.py
+++ b/python/ray/tune/suggest/hyperopt.py
@@ -10,6 +10,7 @@
hyperopt_logger = logging.getLogger("hyperopt")
hyperopt_logger.setLevel(logging.WARNING)
import hyperopt as hpo
+ from hyperopt.fmin import generate_trials_to_calculate
except Exception:
hpo = None
@@ -33,6 +34,13 @@ class HyperOptSearch(SuggestionAlgorithm):
to 10.
reward_attr (str): The training result objective value attribute.
This refers to an increasing value.
+ points_to_evaluate (list): Initial parameter suggestions to be run
+ first. This is for when you already have some good parameters
+ you want hyperopt to run first to help the TPE algorithm
+ make better suggestions for future parameters. Needs to be
+ a list of dict of hyperopt-named variables.
+ Choice variables should be indicated by their index in the
+ list (see example)
Example:
>>> space = {
@@ -40,6 +48,11 @@ class HyperOptSearch(SuggestionAlgorithm):
>>> 'height': hp.uniform('height', -100, 100),
>>> 'activation': hp.choice("activation", ["relu", "tanh"])
>>> }
+ >>> current_best_params = [{
+ >>> 'width': 10,
+ >>> 'height': 0,
+ >>> 'activation': 0, # The index of "relu"
+ >>> }]
>>> config = {
>>> "my_exp": {
>>> "run": "exp",
@@ -50,13 +63,15 @@ class HyperOptSearch(SuggestionAlgorithm):
>>> }
>>> }
>>> algo = HyperOptSearch(
- >>> space, max_concurrent=4, reward_attr="neg_mean_loss")
+ >>> space, max_concurrent=4, reward_attr="neg_mean_loss",
+ >>> points_to_evaluate=current_best_params)
"""
def __init__(self,
space,
max_concurrent=10,
reward_attr="episode_reward_mean",
+ points_to_evaluate=None,
**kwargs):
assert hpo is not None, "HyperOpt must be installed!"
assert type(max_concurrent) is int and max_concurrent > 0
@@ -64,7 +79,15 @@ def __init__(self,
self._reward_attr = reward_attr
self.algo = hpo.tpe.suggest
self.domain = hpo.Domain(lambda spc: spc, space)
- self._hpopt_trials = hpo.Trials()
+ if points_to_evaluate is None:
+ self._hpopt_trials = hpo.Trials()
+ self._points_to_evaluate = 0
+ else:
+ assert type(points_to_evaluate) == list
+ self._hpopt_trials = generate_trials_to_calculate(
+ points_to_evaluate)
+ self._hpopt_trials.refresh()
+ self._points_to_evaluate = len(points_to_evaluate)
self._live_trial_mapping = {}
self.rstate = np.random.RandomState()
@@ -73,15 +96,20 @@ def __init__(self,
def _suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
- new_ids = self._hpopt_trials.new_trial_ids(1)
- self._hpopt_trials.refresh()
- # Get new suggestion from Hyperopt
- new_trials = self.algo(new_ids, self.domain, self._hpopt_trials,
- self.rstate.randint(2**31 - 1))
- self._hpopt_trials.insert_trial_docs(new_trials)
- self._hpopt_trials.refresh()
- new_trial = new_trials[0]
+ if self._points_to_evaluate > 0:
+ new_trial = self._hpopt_trials.trials[self._points_to_evaluate - 1]
+ self._points_to_evaluate -= 1
+ else:
+ new_ids = self._hpopt_trials.new_trial_ids(1)
+ self._hpopt_trials.refresh()
+
+ # Get new suggestion from Hyperopt
+ new_trials = self.algo(new_ids, self.domain, self._hpopt_trials,
+ self.rstate.randint(2**31 - 1))
+ self._hpopt_trials.insert_trial_docs(new_trials)
+ self._hpopt_trials.refresh()
+ new_trial = new_trials[0]
self._live_trial_mapping[trial_id] = (new_trial["tid"], new_trial)
# Taken from HyperOpt.base.evaluate
diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py
index 735b0659431f..f80479b7cf30 100644
--- a/python/ray/tune/test/trial_runner_test.py
+++ b/python/ray/tune/test/trial_runner_test.py
@@ -23,7 +23,8 @@
from ray.tune.logger import Logger
from ray.tune.util import pin_in_object_store, get_pinned_object
from ray.tune.experiment import Experiment
-from ray.tune.trial import Trial, Resources, ExportFormat
+from ray.tune.trial import (Trial, ExportFormat, Resources, resources_to_json,
+ json_to_resources)
from ray.tune.trial_runner import TrialRunner
from ray.tune.suggest import grid_search, BasicVariantGenerator
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
@@ -736,6 +737,28 @@ def _train(self):
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)
+ def testCustomResources(self):
+ ray.shutdown()
+ ray.init(resources={"hi": 3})
+
+ class train(Trainable):
+ def _train(self):
+ return {"timesteps_this_iter": 1, "done": True}
+
+ trials = run_experiments({
+ "foo": {
+ "run": train,
+ "resources_per_trial": {
+ "cpu": 1,
+ "custom_resources": {
+ "hi": 2
+ }
+ }
+ }
+ })
+ for trial in trials:
+ self.assertEqual(trial.status, Trial.TERMINATED)
+
def testCustomLogger(self):
class CustomLogger(Logger):
def on_result(self, result):
@@ -1083,6 +1106,62 @@ def testExtraResources(self):
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
+ def testCustomResources(self):
+ ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
+ runner = TrialRunner(BasicVariantGenerator())
+ kwargs = {
+ "stopping_criterion": {
+ "training_iteration": 1
+ },
+ "resources": Resources(cpu=1, gpu=0, custom_resources={"a": 2}),
+ }
+ trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
+ for t in trials:
+ runner.add_trial(t)
+
+ runner.step()
+ self.assertEqual(trials[0].status, Trial.RUNNING)
+ self.assertEqual(trials[1].status, Trial.PENDING)
+
+ runner.step()
+ self.assertEqual(trials[0].status, Trial.TERMINATED)
+ self.assertEqual(trials[1].status, Trial.PENDING)
+
+ def testExtraCustomResources(self):
+ ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
+ runner = TrialRunner(BasicVariantGenerator())
+ kwargs = {
+ "stopping_criterion": {
+ "training_iteration": 1
+ },
+ "resources": Resources(
+ cpu=1, gpu=0, extra_custom_resources={"a": 2}),
+ }
+ trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
+ for t in trials:
+ runner.add_trial(t)
+
+ runner.step()
+ self.assertEqual(trials[0].status, Trial.RUNNING)
+ self.assertEqual(trials[1].status, Trial.PENDING)
+
+ runner.step()
+ self.assertTrue(sum(t.status == Trial.RUNNING for t in trials) < 2)
+ self.assertEqual(trials[0].status, Trial.TERMINATED)
+ self.assertEqual(trials[1].status, Trial.PENDING)
+
+ def testCustomResources2(self):
+ ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
+ runner = TrialRunner(BasicVariantGenerator())
+ resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2})
+ self.assertTrue(runner.has_resources(resource1))
+ resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2})
+ self.assertTrue(runner.has_resources(resource2))
+ resource3 = Resources(cpu=1, gpu=0, custom_resources={"a": 3})
+ self.assertFalse(runner.has_resources(resource3))
+ resource4 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 3})
+ self.assertFalse(runner.has_resources(resource4))
+
def testFractionalGpus(self):
ray.init(num_cpus=4, num_gpus=1)
runner = TrialRunner(BasicVariantGenerator())
@@ -1292,6 +1371,7 @@ def testFailureRecoveryNodeRemoval(self):
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
+
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
@@ -1878,5 +1958,61 @@ def _suggest(self, trial_id):
self.assertTrue("d=4" in trial.experiment_tag)
+class ResourcesTest(unittest.TestCase):
+ def testSubtraction(self):
+ resource_1 = Resources(
+ 1,
+ 0,
+ 0,
+ 1,
+ custom_resources={
+ "a": 1,
+ "b": 2
+ },
+ extra_custom_resources={
+ "a": 1,
+ "b": 1
+ })
+ resource_2 = Resources(
+ 1,
+ 0,
+ 0,
+ 1,
+ custom_resources={
+ "a": 1,
+ "b": 2
+ },
+ extra_custom_resources={
+ "a": 1,
+ "b": 1
+ })
+ new_res = Resources.subtract(resource_1, resource_2)
+ self.assertTrue(new_res.cpu == 0)
+ self.assertTrue(new_res.gpu == 0)
+ self.assertTrue(new_res.extra_cpu == 0)
+ self.assertTrue(new_res.extra_gpu == 0)
+ self.assertTrue(all(k == 0 for k in new_res.custom_resources.values()))
+ self.assertTrue(
+ all(k == 0 for k in new_res.extra_custom_resources.values()))
+
+ def testDifferentResources(self):
+ resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
+ resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2})
+ new_res = Resources.subtract(resource_1, resource_2)
+ assert "c" in new_res.custom_resources
+ assert "b" in new_res.custom_resources
+ self.assertTrue(new_res.cpu == 0)
+ self.assertTrue(new_res.gpu == 0)
+ self.assertTrue(new_res.extra_cpu == 0)
+ self.assertTrue(new_res.extra_gpu == 0)
+ self.assertTrue(new_res.get("a") == 0)
+
+ def testSerialization(self):
+ original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
+ jsoned = resources_to_json(original)
+ new_resource = json_to_resources(jsoned)
+ self.assertEquals(original, new_resource)
+
+
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py
index 36057ac2a4ed..4be88ad9b41b 100644
--- a/python/ray/tune/trial.py
+++ b/python/ray/tune/trial.py
@@ -11,10 +11,10 @@
import time
import tempfile
import os
+from numbers import Number
# For compatibility under py2 to consider unicode as str
from six import string_types
-from numbers import Number
import ray
from ray.tune import TuneError
@@ -38,11 +38,12 @@ def date_str():
class Resources(
- namedtuple("Resources", ["cpu", "gpu", "extra_cpu", "extra_gpu"])):
+ namedtuple("Resources", [
+ "cpu", "gpu", "extra_cpu", "extra_gpu", "custom_resources",
+ "extra_custom_resources"
+ ])):
"""Ray resources required to schedule a trial.
- TODO: Custom resources.
-
Attributes:
cpu (float): Number of CPUs to allocate to the trial.
gpu (float): Number of GPUs to allocate to the trial.
@@ -50,21 +51,51 @@ class Resources(
launch additional Ray actors that use CPUs.
extra_gpu (float): Extra GPUs to reserve in case the trial needs to
launch additional Ray actors that use GPUs.
+ custom_resources (dict): Mapping of resource to quantity to allocate
+ to the trial.
+ extra_custom_resources (dict): Extra custom resources to reserve in
+ case the trial needs to launch additional Ray actors that use
+ any of these custom resources.
"""
__slots__ = ()
- def __new__(cls, cpu, gpu, extra_cpu=0, extra_gpu=0):
- for entry in [cpu, gpu, extra_cpu, extra_gpu]:
+ def __new__(cls,
+ cpu,
+ gpu,
+ extra_cpu=0,
+ extra_gpu=0,
+ custom_resources=None,
+ extra_custom_resources=None):
+ custom_resources = custom_resources or {}
+ extra_custom_resources = extra_custom_resources or {}
+ leftovers = set(custom_resources) ^ set(extra_custom_resources)
+
+ for value in leftovers:
+ custom_resources.setdefault(value, 0)
+ extra_custom_resources.setdefault(value, 0)
+
+ all_values = [cpu, gpu, extra_cpu, extra_gpu]
+ all_values += list(custom_resources.values())
+ all_values += list(extra_custom_resources.values())
+ assert len(custom_resources) == len(extra_custom_resources)
+ for entry in all_values:
assert isinstance(entry, Number), "Improper resource value."
- assert entry >= 0, "Resource cannot be negative."
- return super(Resources, cls).__new__(cls, cpu, gpu, extra_cpu,
- extra_gpu)
+ return super(Resources,
+ cls).__new__(cls, cpu, gpu, extra_cpu, extra_gpu,
+ custom_resources, extra_custom_resources)
def summary_string(self):
- return "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu,
- self.gpu + self.extra_gpu)
+ summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu,
+ self.gpu + self.extra_gpu)
+ custom_summary = ", ".join([
+ "{} {}".format(self.get_res_total(res), res)
+ for res in self.custom_resources
+ ])
+ if custom_summary:
+ summary += " ({})".format(custom_summary)
+ return summary
def cpu_total(self):
return self.cpu + self.extra_cpu
@@ -72,6 +103,40 @@ def cpu_total(self):
def gpu_total(self):
return self.gpu + self.extra_gpu
+ def get_res_total(self, key):
+ return self.custom_resources.get(
+ key, 0) + self.extra_custom_resources.get(key, 0)
+
+ def get(self, key):
+ return self.custom_resources.get(key, 0)
+
+ def is_nonnegative(self):
+ all_values = [self.cpu, self.gpu, self.extra_cpu, self.extra_gpu]
+ all_values += list(self.custom_resources.values())
+ all_values += list(self.extra_custom_resources.values())
+ return all(v >= 0 for v in all_values)
+
+ @classmethod
+ def subtract(cls, original, to_remove):
+ cpu = original.cpu - to_remove.cpu
+ gpu = original.gpu - to_remove.gpu
+ extra_cpu = original.extra_cpu - to_remove.extra_cpu
+ extra_gpu = original.extra_gpu - to_remove.extra_gpu
+ all_resources = set(original.custom_resources).union(
+ set(to_remove.custom_resources))
+ new_custom_res = {
+ k: original.custom_resources.get(k, 0) -
+ to_remove.custom_resources.get(k, 0)
+ for k in all_resources
+ }
+ extra_custom_res = {
+ k: original.extra_custom_resources.get(k, 0) -
+ to_remove.extra_custom_resources.get(k, 0)
+ for k in all_resources
+ }
+ return Resources(cpu, gpu, extra_cpu, extra_gpu, new_custom_res,
+ extra_custom_res)
+
def json_to_resources(data):
if data is None or data == "null":
@@ -84,12 +149,13 @@ def json_to_resources(data):
"The field `{}` is no longer supported. Use `extra_cpu` "
"or `extra_gpu` instead.".format(k))
if k not in Resources._fields:
- raise TuneError(
- "Unknown resource type {}, must be one of {}".format(
+ raise ValueError(
+ "Unknown resource field {}, must be one of {}".format(
k, Resources._fields))
return Resources(
data.get("cpu", 1), data.get("gpu", 0), data.get("extra_cpu", 0),
- data.get("extra_gpu", 0))
+ data.get("extra_gpu", 0), data.get("custom_resources"),
+ data.get("extra_custom_resources"))
def resources_to_json(resources):
@@ -100,6 +166,8 @@ def resources_to_json(resources):
"gpu": resources.gpu,
"extra_cpu": resources.extra_cpu,
"extra_gpu": resources.extra_gpu,
+ "custom_resources": resources.custom_resources.copy(),
+ "extra_custom_resources": resources.extra_custom_resources.copy()
}
@@ -452,8 +520,6 @@ def __getstate__(self):
state["runner"] = None
state["result_logger"] = None
- if self.status == Trial.RUNNING:
- state["status"] = Trial.PENDING
if self.result_logger:
self.result_logger.flush()
state["__logger_started__"] = True
@@ -464,6 +530,8 @@ def __getstate__(self):
def __setstate__(self, state):
logger_started = state.pop("__logger_started__")
state["resources"] = json_to_resources(state["resources"])
+ if state["status"] == Trial.RUNNING:
+ state["status"] = Trial.PENDING
for key in [
"_checkpoint", "config", "custom_loggers", "sync_function",
"last_result"
diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py
index 4f11d52267a0..54d463f0b729 100644
--- a/python/ray/tune/trial_runner.py
+++ b/python/ray/tune/trial_runner.py
@@ -112,7 +112,8 @@ def __init__(self,
self._stop_queue = []
self._metadata_checkpoint_dir = metadata_checkpoint_dir
- self._session = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
+ self._start_time = self.datetime.today()
+ self._session = self._start_time.strftime("%Y-%m-%d_%H-%M-%S")
@classmethod
def checkpoint_exists(cls, directory):
@@ -136,7 +137,8 @@ def checkpoint(self):
runner_state = {
"checkpoints": list(
self.trial_executor.get_checkpoints().values()),
- "runner_data": self.__getstate__()
+ "runner_data": self.__getstate__(),
+ "timestamp": datetime.now().timestamp() # clean this up later
}
tmp_file_name = os.path.join(metadata_checkpoint_dir,
".tmp_checkpoint")
@@ -546,14 +548,21 @@ def __getstate__(self):
state = self.__dict__.copy()
for k in [
"_trials", "_stop_queue", "_server", "_search_alg",
- "_scheduler_alg", "trial_executor", "_session"
+ "_scheduler_alg", "trial_executor",
]:
del state[k]
+ state["_start_time"] = self._start_time.timestamp()
state["launch_web_server"] = bool(self._server)
return state
def __setstate__(self, state):
launch_web_server = state.pop("launch_web_server")
+
+ session = state.pop("_session")
+ self.__dict__.setdefault("_session", session)
+ start_time = state.pop("_start_time")
+ self.__dict__.setdefault("_start_time", datetime.fromtimestamp(start_time))
+
self.__dict__.update(state)
if launch_web_server:
self._server = TuneServer(self, self._server_port)
diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py
index 912c4a3130d1..0294497bf1f3 100644
--- a/python/ray/tune/tune.py
+++ b/python/ray/tune/tune.py
@@ -159,7 +159,7 @@ def run_experiments(experiments,
metadata_checkpoint_dir=checkpoint_dir,
launch_web_server=with_server,
server_port=server_port,
- verbose=int(verbose > 1),
+ verbose=bool(verbose > 1),
queue_trials=queue_trials,
trial_executor=trial_executor)
diff --git a/python/ray/worker.py b/python/ray/worker.py
index 72198bcaf27c..afb3010e2738 100644
--- a/python/ray/worker.py
+++ b/python/ray/worker.py
@@ -1099,22 +1099,11 @@ def _initialize_serialization(driver_id, worker=global_worker):
serialization_context.set_pickle(pickle.dumps, pickle.loads)
pyarrow.register_torch_serialization_handlers(serialization_context)
- # Define a custom serializer and deserializer for handling Object IDs.
- def object_id_custom_serializer(obj):
- return obj.binary()
-
- def object_id_custom_deserializer(serialized_obj):
- return ObjectID(serialized_obj)
-
- # We register this serializer on each worker instead of calling
- # register_custom_serializer from the driver so that isinstance still
- # works.
- serialization_context.register_type(
- ObjectID,
- "ray.ObjectID",
- pickle=False,
- custom_serializer=object_id_custom_serializer,
- custom_deserializer=object_id_custom_deserializer)
+ for id_type in ray._ID_TYPES:
+ serialization_context.register_type(
+ id_type,
+ "{}.{}".format(id_type.__module__, id_type.__name__),
+ pickle=True)
def actor_handle_serializer(obj):
return obj._serialization_helper(True)
diff --git a/python/setup.py b/python/setup.py
index 56bd9d0caf7d..95ba3cbf0bb3 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -169,7 +169,8 @@ def find_version(*filepath):
entry_points={
"console_scripts": [
"ray=ray.scripts.scripts:main",
- "rllib=ray.rllib.scripts:cli [rllib]"
+ "rllib=ray.rllib.scripts:cli [rllib]",
+ "tune=ray.tune.scripts:cli"
]
},
include_package_data=True,
diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs
index 6522a334c156..7619349f9b2a 100644
--- a/src/ray/gcs/format/gcs.fbs
+++ b/src/ray/gcs/format/gcs.fbs
@@ -122,12 +122,20 @@ table FunctionTableData {
table ObjectTableData {
// The size of the object.
object_size: long;
+ // Is object in-lined? Inline objects are objects whose data and metadata are
+ // inlined in the GCS object table entry, which normally only specifies
+ // the object location.
+ inline_object_flag: bool;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// The number of times this object has been evicted from this node so far.
num_evictions: int;
+ // In-line object data.
+ inline_object_data: [ubyte];
+ // In-line object metadata.
+ inline_object_metadata: [ubyte];
}
table TaskReconstructionData {
diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc
index db2b4b1490de..ba928e4454f0 100644
--- a/src/ray/object_manager/object_directory.cc
+++ b/src/ray/object_manager/object_directory.cc
@@ -8,15 +8,21 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,
namespace {
-/// Process a suffix of the object table log and store the result in
-/// client_ids. This assumes that client_ids already contains the result of the
-/// object table log up to but not including this suffix. This also stores a
-/// bool in has_been_created indicating whether the object has ever been
-/// created before.
+/// Process a suffix of the object table log.
+/// If object is inlined (inline_object_flag = TRUE), its data and metadata are
+/// stored with the object's entry so we read them into inline_object_data, and
+/// inline_object_metadata, respectively.
+/// If object is not inlined, store the result in client_ids.
+/// This assumes that client_ids already contains the result of the
+/// object table log up to but not including this suffix.
+/// This function also stores a bool in has_been_created indicating whether the
+/// object has ever been created before.
void UpdateObjectLocations(const std::vector &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set *client_ids,
- bool *has_been_created) {
+ bool *inline_object_flag,
+ std::vector *inline_object_data,
+ std::string *inline_object_metadata, bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
@@ -24,6 +30,9 @@ void UpdateObjectLocations(const std::vector &location_history
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
+ //
+ // If object is inlined each entry contains both the object's data and metadata,
+ // so we don't care about its location.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
@@ -31,18 +40,35 @@ void UpdateObjectLocations(const std::vector &location_history
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
+ if (object_table_data.inline_object_flag) {
+ if (!*inline_object_flag) {
+ // This is the first time we're receiving the inline object data. Read
+ // object's data from the GCS entry.
+ *inline_object_flag = object_table_data.inline_object_flag;
+ inline_object_data->assign(object_table_data.inline_object_data.begin(),
+ object_table_data.inline_object_data.end());
+ inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(),
+ object_table_data.inline_object_metadata.end());
+ }
+ // We got the data and metadata of the object so exit the loop.
+ break;
+ }
+
if (!object_table_data.is_eviction) {
client_ids->insert(client_id);
} else {
client_ids->erase(client_id);
}
}
- // Filter out the removed clients from the object locations.
- for (auto it = client_ids->begin(); it != client_ids->end();) {
- if (client_table.IsRemoved(*it)) {
- it = client_ids->erase(it);
- } else {
- it++;
+
+ if (!*inline_object_flag) {
+ // Filter out the removed clients from the object locations.
+ for (auto it = client_ids->begin(); it != client_ids->end();) {
+ if (client_table.IsRemoved(*it)) {
+ it = client_ids->erase(it);
+ } else {
+ it++;
+ }
}
}
}
@@ -62,6 +88,8 @@ void ObjectDirectory::RegisterBackend() {
// Update entries for this object.
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
+ &it->second.inline_object_flag, &it->second.inline_object_data,
+ &it->second.inline_object_metadata,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
@@ -74,6 +102,8 @@ void ObjectDirectory::RegisterBackend() {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
+ it->second.inline_object_flag, it->second.inline_object_data,
+ it->second.inline_object_metadata,
it->second.has_been_created);
}
};
@@ -84,13 +114,24 @@ void ObjectDirectory::RegisterBackend() {
ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
- const object_manager::protocol::ObjectInfoT &object_info) {
+ const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata) {
+ RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
+ << inline_object_flag;
// Append the addition entry to the object table.
auto data = std::make_shared();
data->manager = client_id.binary();
data->is_eviction = false;
data->num_evictions = object_evictions_[object_id];
data->object_size = object_info.data_size;
+ data->inline_object_flag = inline_object_flag;
+ if (inline_object_flag) {
+ // Add object's data to its GCS entry.
+ data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
+ data->inline_object_metadata.assign(inline_object_metadata.begin(),
+ inline_object_metadata.end());
+ }
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
return status;
@@ -98,6 +139,7 @@ ray::Status ObjectDirectory::ReportObjectAdded(
ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
+ RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id;
// Append the eviction entry to the object table.
auto data = std::make_shared();
data->manager = client_id.binary();
@@ -147,16 +189,19 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
- UpdateObjectLocations({}, gcs_client_->client_table(),
- &listener.second.current_object_locations,
- &listener.second.has_been_created);
+ UpdateObjectLocations(
+ {}, gcs_client_->client_table(), &listener.second.current_object_locations,
+ &listener.second.inline_object_flag, &listener.second.inline_object_data,
+ &listener.second.inline_object_metadata, &listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
- callback_pair.second(object_id, listener.second.current_object_locations,
- listener.second.has_been_created);
+ callback_pair.second(
+ object_id, listener.second.current_object_locations,
+ listener.second.inline_object_flag, listener.second.inline_object_data,
+ listener.second.inline_object_metadata, listener.second.has_been_created);
}
}
}
@@ -182,8 +227,14 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
- io_service_.post([callback, locations, object_id]() {
- callback(object_id, locations, /*has_been_created=*/true);
+ auto inline_object_flag = listener_state.inline_object_flag;
+ const auto &inline_object_data = listener_state.inline_object_data;
+ const auto &inline_object_metadata = listener_state.inline_object_metadata;
+ io_service_.post([callback, locations, inline_object_flag, inline_object_data,
+ inline_object_metadata, object_id]() {
+ callback(object_id, locations, inline_object_flag, inline_object_data,
+ inline_object_metadata,
+ /*has_been_created=*/true);
});
}
return status;
@@ -216,20 +267,31 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const std::vector &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set client_ids;
+ bool inline_object_flag = false;
+ std::vector inline_object_data;
+ std::string inline_object_metadata;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
- &client_ids, &has_been_created);
+ &client_ids, &inline_object_flag, &inline_object_data,
+ &inline_object_metadata, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
- callback(object_id, client_ids, has_been_created);
+ callback(object_id, client_ids, inline_object_flag, inline_object_data,
+ inline_object_metadata, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
+ // If object inlined, we already have the object's data.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
- io_service_.post([callback, object_id, locations, has_been_created]() {
- callback(object_id, locations, has_been_created);
+ bool inline_object_flag = it->second.inline_object_flag;
+ const auto &inline_object_data = it->second.inline_object_data;
+ const auto &inline_object_metadata = it->second.inline_object_metadata;
+ io_service_.post([callback, object_id, locations, inline_object_flag,
+ inline_object_data, inline_object_metadata, has_been_created]() {
+ callback(object_id, locations, inline_object_flag, inline_object_data,
+ inline_object_metadata, has_been_created);
});
}
return status;
diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h
index b44197b639ef..f1634c0f4d0e 100644
--- a/src/ray/object_manager/object_directory.h
+++ b/src/ray/object_manager/object_directory.h
@@ -48,9 +48,9 @@ class ObjectDirectoryInterface {
virtual std::vector LookupAllRemoteConnections() const = 0;
/// Callback for object location notifications.
- using OnLocationsFound = std::function &,
- bool has_been_created)>;
+ using OnLocationsFound = std::function &, bool,
+ const std::vector &, const std::string &, bool has_been_created)>;
/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
@@ -99,10 +99,15 @@ class ObjectDirectoryInterface {
/// \param object_id The object id that was put into the store.
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
+ /// \param inline_object_flag Flag specifying whether object is inlined.
+ /// \param inline_object_data Object data. Only for inlined objects.
+ /// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
- const object_manager::protocol::ObjectInfoT &object_info) = 0;
+ const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata) = 0;
/// Report objects removed from this client's store to the object directory.
///
@@ -154,9 +159,12 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id) override;
- ray::Status ReportObjectAdded(
- const ObjectID &object_id, const ClientID &client_id,
- const object_manager::protocol::ObjectInfoT &object_info) override;
+ ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
+ const object_manager::protocol::ObjectInfoT &object_info,
+ bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata) override;
+
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
@@ -174,6 +182,15 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map callbacks;
/// The current set of known locations of this object.
std::unordered_set current_object_locations;
+ /// Specify whether the object is inlined. The data and the metadata of
+ /// an inlined object are stored in the object's GCS entry. In this flag
+ /// (i.e., the object is inlined) the content of current_object_locations
+ /// can be ignored.
+ bool inline_object_flag;
+ /// Inlined object data, if inline_object_flag == true.
+ std::vector inline_object_data;
+ /// Inlined object metadata, if inline_object_flag == true.
+ std::string inline_object_metadata;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc
index e9904d9603b9..241d1bf8634a 100644
--- a/src/ray/object_manager/object_manager.cc
+++ b/src/ray/object_manager/object_manager.cc
@@ -10,13 +10,15 @@ namespace ray {
ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
- std::shared_ptr object_directory)
+ std::shared_ptr object_directory,
+ plasma::PlasmaClient &store_client)
: config_(config),
object_directory_(std::move(object_directory)),
store_notification_(main_service, config_.store_socket_name),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size),
send_work_(send_service_),
receive_work_(receive_service_),
+ store_client_(store_client),
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
@@ -64,11 +66,39 @@ void ObjectManager::HandleObjectAdded(
const object_manager::protocol::ObjectInfoT &object_info) {
// Notify the object directory that the object has been added to this node.
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
+ RAY_LOG(DEBUG) << "Object added " << object_id;
RAY_CHECK(local_objects_.count(object_id) == 0);
local_objects_[object_id].object_info = object_info;
- ray::Status status =
- object_directory_->ReportObjectAdded(object_id, client_id_, object_info);
+ // If this object was created from inlined data, this means it is already in GCS,
+ // so no need to write it again.
+ if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) {
+ std::vector inline_object_data;
+ std::string inline_object_metadata;
+ bool inline_object_flag = false;
+ if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) {
+ // Inline object. Try to get the data from the object store.
+ plasma::ObjectBuffer object_buffer;
+ plasma::ObjectID plasma_id = object_id.to_plasma_id();
+ RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
+ if (object_buffer.data != nullptr) {
+ // The object exists. Store the object data in the GCS entry.
+ inline_object_flag = true;
+ inline_object_data.assign(
+ object_buffer.data->data(),
+ object_buffer.data->data() + object_buffer.data->size());
+ inline_object_metadata.assign(
+ object_buffer.metadata->data(),
+ object_buffer.metadata->data() + object_buffer.metadata->size());
+ // Mark this object as inlined, so that if this object is later
+ // evicted, we do not report it to the GCS.
+ local_inlined_objects_.insert(object_id);
+ }
+ }
+ RAY_CHECK_OK(object_directory_->ReportObjectAdded(
+ object_id, client_id_, object_info, inline_object_flag, inline_object_data,
+ inline_object_metadata));
+ }
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.
auto iter = unfulfilled_push_requests_.find(object_id);
@@ -90,10 +120,16 @@ void ObjectManager::HandleObjectAdded(
}
void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) {
+ RAY_LOG(DEBUG) << "Object removed " << object_id;
auto it = local_objects_.find(object_id);
RAY_CHECK(it != local_objects_.end());
+ if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) {
+ // Inline object data can be retrieved by any node by contacting the GCS,
+ // so only report that the object was evicted if it wasn't inlined.
+ RAY_CHECK_OK(object_directory_->ReportObjectRemoved(object_id, client_id_));
+ }
local_objects_.erase(it);
- ray::Status status = object_directory_->ReportObjectRemoved(object_id, client_id_);
+ local_inlined_objects_.erase(object_id);
}
ray::Status ObjectManager::SubscribeObjAdded(
@@ -108,6 +144,26 @@ ray::Status ObjectManager::SubscribeObjDeleted(
return ray::Status::OK();
}
+void ObjectManager::PutInlineObject(const ObjectID &object_id,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata) {
+ if (local_objects_.find(object_id) == local_objects_.end()) {
+ // Inline object is not in the local object store. Create it from
+ // inline_object_data, and inline_object_metadata, respectively.
+ //
+ // Since this function is called on notification or when reading the
+ // object's entry from GCS, we know this object's entry is already in GCS.
+ // Remember this by adding the object to local_inlined_objects_. This way
+ // we avoid writing another copy of this object to GCS in HandleObjectAdded().
+ local_inlined_objects_.insert(object_id);
+ auto status = store_client_.CreateAndSeal(
+ object_id.to_plasma_id(),
+ std::string(inline_object_data.begin(), inline_object_data.end()),
+ inline_object_metadata);
+ RAY_CHECK(status.IsPlasmaObjectExists() || status.ok()) << status.message();
+ }
+}
+
ray::Status ObjectManager::Pull(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Pull on " << client_id_ << " of object " << object_id;
// Check if object is already local.
@@ -127,7 +183,13 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id,
[this](const ObjectID &object_id, const std::unordered_set &client_ids,
- bool created) {
+ bool inline_object_flag, const std::vector &inline_object_data,
+ const std::string &inline_object_metadata, bool created) {
+ if (inline_object_flag) {
+ // This is an inlined object. Store it in the Plasma store and return.
+ PutInlineObject(object_id, inline_object_data, inline_object_metadata);
+ return;
+ }
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
@@ -169,7 +231,14 @@ void ObjectManager::TryPull(const ObjectID &object_id) {
RAY_CHECK(local_objects_.count(object_id) == 0);
// Make sure that there is at least one client which is not the local client.
// TODO(rkn): It may actually be possible for this check to fail.
- RAY_CHECK(client_vector.size() != 1 || client_vector[0] != client_id_);
+ if (client_vector.size() == 1 && client_vector[0] == client_id_) {
+ RAY_LOG(ERROR) << "The object manager with client ID " << client_id_
+ << " is trying to pull object " << object_id
+ << " but the object table suggests that this object manager "
+ << "already has the object. The object may have been evicted.";
+ it->second.timer_set = false;
+ return;
+ }
// Choose a random client to pull the object from.
// Generate a random index.
@@ -572,11 +641,19 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id,
[this, wait_id](const ObjectID &lookup_object_id,
- const std::unordered_set &client_ids, bool created) {
+ const std::unordered_set &client_ids,
+ bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata, bool created) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
- if (!client_ids.empty()) {
+ if (!client_ids.empty() || inline_object_flag) {
wait_state.remaining.erase(lookup_object_id);
wait_state.found.insert(lookup_object_id);
+ if (inline_object_flag) {
+ // This is an inlined object. Store it in the Plasma store and return.
+ PutInlineObject(lookup_object_id, inline_object_data,
+ inline_object_metadata);
+ }
}
RAY_LOG(DEBUG) << "Wait request " << wait_id << ": " << client_ids.size()
<< " locations found for object " << lookup_object_id;
@@ -610,8 +687,11 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id,
[this, wait_id](const ObjectID &subscribe_object_id,
- const std::unordered_set &client_ids, bool created) {
- if (!client_ids.empty()) {
+ const std::unordered_set &client_ids,
+ bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata, bool created) {
+ if (!client_ids.empty() || inline_object_flag) {
RAY_LOG(DEBUG) << "Wait request " << wait_id
<< ": subscription notification received for object "
<< subscribe_object_id;
@@ -623,6 +703,11 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
// notification.
return;
}
+ if (inline_object_flag) {
+ // This is an inlined object. Store it in the Plasma store.
+ PutInlineObject(subscribe_object_id, inline_object_data,
+ inline_object_metadata);
+ }
auto &wait_state = object_id_wait_state->second;
wait_state.remaining.erase(subscribe_object_id);
wait_state.found.insert(subscribe_object_id);
diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h
index 29c75c57a773..b170e072781f 100644
--- a/src/ray/object_manager/object_manager.h
+++ b/src/ray/object_manager/object_manager.h
@@ -76,9 +76,12 @@ class ObjectManager : public ObjectManagerInterface {
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param object_directory An object implementing the object directory interface.
+ /// \param store_client Reference to Plasma store. This is used to get and put
+ /// inlined objects in the local object store.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
- std::shared_ptr object_directory);
+ std::shared_ptr object_directory,
+ plasma::PlasmaClient &store_client);
~ObjectManager();
@@ -351,6 +354,12 @@ class ObjectManager : public ObjectManagerInterface {
/// Handle Push task timeout.
void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id);
+ /// Add inline object to object store. Called when reading the object entry
+ /// from GCS or upon receiving a notification about an inline object.
+ void PutInlineObject(const ObjectID &object_id,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata);
+
ClientID client_id_;
const ObjectManagerConfig config_;
std::shared_ptr object_directory_;
@@ -380,6 +389,10 @@ class ObjectManager : public ObjectManagerInterface {
/// all incoming object transfers.
std::vector receive_threads_;
+ /// Reference to Plasma Store. This is used to get and put inlined objects in
+ /// the local object store.
+ plasma::PlasmaClient &store_client_;
+
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;
@@ -387,6 +400,12 @@ class ObjectManager : public ObjectManagerInterface {
/// including when the object was last pushed to other object managers.
std::unordered_map local_objects_;
+ /// Set of objects created from inlined data whose locations and/or evictions
+ /// should not be reported to the GCS. This includes objects that were
+ /// created from data retrieved from the GCS, since a GCS entry with the
+ /// inlined data already exists.
+ std::unordered_set local_inlined_objects_;
+
/// This is used as the callback identifier in Pull for
/// SubscribeObjectLocations. We only need one identifier because we never need to
/// subscribe multiple times to the same object during Pull.
diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc
index 91b0ffc3d576..e7092955cc1f 100644
--- a/src/ray/object_manager/test/object_manager_stress_test.cc
+++ b/src/ray/object_manager/test/object_manager_stress_test.cc
@@ -30,13 +30,16 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
- std::shared_ptr gcs_client)
+ std::shared_ptr gcs_client,
+ const std::string &store_name)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
- std::make_shared(main_service, gcs_client_)) {
+ std::make_shared(main_service, gcs_client_),
+ store_client_) {
+ RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str()));
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
@@ -88,6 +91,7 @@ class MockServer {
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr gcs_client_;
+ plasma::PlasmaClient store_client_;
ObjectManager object_manager_;
};
@@ -142,7 +146,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_receives = max_receives_a;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
- server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
+ server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1));
// start second server
gcs_client_2 = std::shared_ptr(
@@ -154,7 +158,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_receives = max_receives_b;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
- server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
+ server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2));
// connect to stores.
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc
index 98ad9bbfbf97..904f1ed2a83c 100644
--- a/src/ray/object_manager/test/object_manager_test.cc
+++ b/src/ray/object_manager/test/object_manager_test.cc
@@ -21,13 +21,16 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
- std::shared_ptr gcs_client)
+ std::shared_ptr gcs_client,
+ const std::string &store_name)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
- std::make_shared(main_service, gcs_client_)) {
+ std::make_shared(main_service, gcs_client_),
+ store_client_) {
+ RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str()));
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
@@ -79,6 +82,7 @@ class MockServer {
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr gcs_client_;
+ plasma::PlasmaClient store_client_;
ObjectManager object_manager_;
};
@@ -127,7 +131,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
- server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
+ server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1));
// start second server
gcs_client_2 = std::shared_ptr(
@@ -139,7 +143,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
- server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
+ server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2));
// connect to stores.
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
@@ -291,8 +295,10 @@ class TestObjectManager : public TestObjectManagerBase {
sub_id, object_1,
[this, sub_id, object_1, object_2](
const ray::ObjectID &object_id,
- const std::unordered_set &clients, bool created) {
- if (!clients.empty()) {
+ const std::unordered_set &clients, bool inline_object_flag,
+ const std::vector inline_object_data,
+ const std::string inline_object_metadata, bool created) {
+ if (!clients.empty() || inline_object_flag) {
TestWaitWhileSubscribed(sub_id, object_1, object_2);
}
}));
diff --git a/src/ray/ray_config_def.h b/src/ray/ray_config_def.h
index 894109f22af6..138d474bc9e3 100644
--- a/src/ray/ray_config_def.h
+++ b/src/ray/ray_config_def.h
@@ -130,6 +130,11 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000);
/// chunks exceeds the number of available sending threads.
RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000);
+/// Maximum size of an inline object (bytes).
+/// Inline objects are objects whose data and metadata are inlined in the
+/// GCS object table entry, which normally only specifies the object locations.
+RAY_CONFIG(int64_t, inline_object_max_size_bytes, 512);
+
/// Number of workers per process
RAY_CONFIG(int, num_workers_per_process, 1);
diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc
index af1f42167587..93e56a93a81b 100644
--- a/src/ray/raylet/lineage_cache.cc
+++ b/src/ray/raylet/lineage_cache.cc
@@ -324,8 +324,8 @@ void GetUncommittedLineageHelper(const TaskID &task_id, const Lineage &lineage_f
}
}
-Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id,
- const ClientID &node_id) const {
+Lineage LineageCache::GetUncommittedLineageOrDie(const TaskID &task_id,
+ const ClientID &node_id) const {
Lineage uncommitted_lineage;
// Add all uncommitted ancestors from the lineage cache to the uncommitted
// lineage of the requested task.
@@ -445,7 +445,7 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
UnsubscribeTask(task_id);
}
-const Task &LineageCache::GetTask(const TaskID &task_id) const {
+const Task &LineageCache::GetTaskOrDie(const TaskID &task_id) const {
const auto &entries = lineage_.GetEntries();
auto it = entries.find(task_id);
RAY_CHECK(it != entries.end());
diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h
index 89ff84d6c278..1816d97f7cf2 100644
--- a/src/ray/raylet/lineage_cache.h
+++ b/src/ray/raylet/lineage_cache.h
@@ -263,11 +263,13 @@ class LineageCache {
/// The uncommitted lineage consists of all tasks in the given task's lineage
/// that have not been committed in the GCS, as far as we know.
///
- /// \param task_id The ID of the task to get the uncommitted lineage for.
+ /// \param task_id The ID of the task to get the uncommitted lineage for. It is
+ /// a fatal error if the task is not found.
/// \param node_id The ID of the receiving node.
/// \return The uncommitted, unforwarded lineage of the task. The returned lineage
/// includes the entry for the requested entry_id.
- Lineage GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const;
+ Lineage GetUncommittedLineageOrDie(const TaskID &task_id,
+ const ClientID &node_id) const;
/// Handle the commit of a task entry in the GCS. This attempts to evict the
/// task if possible.
@@ -279,7 +281,7 @@ class LineageCache {
///
/// \param task_id The ID of the task to get.
/// \return A const reference to the task data.
- const Task &GetTask(const TaskID &task_id) const;
+ const Task &GetTaskOrDie(const TaskID &task_id) const;
/// Get whether the lineage cache contains the task.
///
diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc
index 32a0e593268b..973483759e4b 100644
--- a/src/ray/raylet/lineage_cache_test.cc
+++ b/src/ray/raylet/lineage_cache_test.cc
@@ -140,7 +140,7 @@ std::vector InsertTaskChain(LineageCache &lineage_cache,
return arguments;
}
-TEST_F(LineageCacheTest, TestGetUncommittedLineage) {
+TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) {
// Insert two independent chains of tasks.
std::vector tasks1;
auto return_values1 =
@@ -160,7 +160,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) {
// Get the uncommitted lineage for the last task (the leaf) of one of the chains.
auto uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(task_ids1.back(), ClientID::nil());
+ lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::nil());
// Check that the uncommitted lineage is exactly equal to the first chain of tasks.
ASSERT_EQ(task_ids1.size(), uncommitted_lineage.GetEntries().size());
for (auto &task_id : task_ids1) {
@@ -180,8 +180,8 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) {
}
// Get the uncommitted lineage for the inserted task.
- uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(combined_task_ids.back(), ClientID::nil());
+ uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(
+ combined_task_ids.back(), ClientID::nil());
// Check that the uncommitted lineage is exactly equal to the entire set of
// tasks inserted so far.
ASSERT_EQ(combined_task_ids.size(), uncommitted_lineage.GetEntries().size());
@@ -207,9 +207,9 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) {
lineage_cache_.MarkTaskAsForwarded(forwarded_task_id, node_id);
auto uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id);
+ lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id);
auto uncommitted_lineage_all =
- lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id2);
+ lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id2);
ASSERT_EQ(1, uncommitted_lineage.GetEntries().size());
ASSERT_EQ(4, uncommitted_lineage_all.GetEntries().size());
@@ -218,7 +218,7 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) {
// Check that lineage of requested task includes itself, regardless of whether
// it has been forwarded before.
auto uncommitted_lineage_forwarded =
- lineage_cache_.GetUncommittedLineage(forwarded_task_id, node_id);
+ lineage_cache_.GetUncommittedLineageOrDie(forwarded_task_id, node_id);
ASSERT_EQ(1, uncommitted_lineage_forwarded.GetEntries().size());
}
@@ -284,8 +284,8 @@ TEST_F(LineageCacheTest, TestEvictChain) {
// the flushed task, but its lineage should not be evicted yet.
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
- .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(),
- ClientID::nil())
+ .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(),
+ ClientID::nil())
.GetEntries()
.size(),
tasks.size());
@@ -297,8 +297,8 @@ TEST_F(LineageCacheTest, TestEvictChain) {
mock_gcs_.RemoteAdd(tasks.at(1).GetTaskSpecification().TaskId(), task_data));
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
- .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(),
- ClientID::nil())
+ .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(),
+ ClientID::nil())
.GetEntries()
.size(),
tasks.size());
@@ -334,8 +334,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) {
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks);
ASSERT_EQ(lineage_cache_
- .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(),
- ClientID::nil())
+ .GetUncommittedLineageOrDie(child_task.GetTaskSpecification().TaskId(),
+ ClientID::nil())
.GetEntries()
.size(),
total_tasks);
@@ -350,8 +350,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) {
// since the parent tasks have no dependencies.
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks);
ASSERT_EQ(lineage_cache_
- .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(),
- ClientID::nil())
+ .GetUncommittedLineageOrDie(
+ child_task.GetTaskSpecification().TaskId(), ClientID::nil())
.GetEntries()
.size(),
total_tasks);
@@ -376,7 +376,7 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) {
const auto task_id = it->GetTaskSpecification().TaskId();
// Simulate removing the task and forwarding it to another node.
auto uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(task_id, ClientID::nil());
+ lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
// Simulate receiving the task again. Make sure we can add the task back.
flatbuffers::FlatBufferBuilder fbb;
@@ -400,7 +400,7 @@ TEST_F(LineageCacheTest, TestForwardTask) {
tasks.erase(it);
auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId();
auto uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil());
+ lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove));
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), 3);
@@ -450,7 +450,7 @@ TEST_F(LineageCacheTest, TestEviction) {
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
+ lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
// Simulate executing the first task on a remote node and adding it to the
@@ -484,7 +484,7 @@ TEST_F(LineageCacheTest, TestEviction) {
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
+ lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_);
// The remaining task should have no uncommitted lineage.
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), 1);
@@ -510,7 +510,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage =
- lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
+ lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), lineage_size);
diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc
index a7f76edf37cf..d05f28dd0e80 100644
--- a/src/ray/raylet/node_manager.cc
+++ b/src/ray/raylet/node_manager.cc
@@ -42,9 +42,11 @@ namespace raylet {
NodeManager::NodeManager(boost::asio::io_service &io_service,
const NodeManagerConfig &config, ObjectManager &object_manager,
std::shared_ptr gcs_client,
- std::shared_ptr object_directory)
+ std::shared_ptr object_directory,
+ plasma::PlasmaClient &store_client)
: io_service_(io_service),
object_manager_(object_manager),
+ store_client_(store_client),
gcs_client_(std::move(gcs_client)),
object_directory_(std::move(object_directory)),
heartbeat_timer_(io_service),
@@ -89,8 +91,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
-
- RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
}
ray::Status NodeManager::RegisterGcs() {
@@ -1190,10 +1190,16 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
object_id,
[this, task_marked_as_failed, task](
const ray::ObjectID &object_id,
- const std::unordered_set &clients, bool has_been_created) {
+ const std::unordered_set &clients, bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata, bool has_been_created) {
if (!*task_marked_as_failed) {
// Only process the object locations if we haven't already marked the
// task as failed.
+ if (inline_object_flag) {
+ // If object is inlined, we already have its data and metadata, so return.
+ return;
+ }
if (clients.empty() && has_been_created) {
// The object does not exist on any nodes but has been created
// before, so the object has been lost. Mark the task as failed to
@@ -1747,7 +1753,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
"allocation via "
<< "ray.init(redis_max_memory=).";
// Use a copy of the cached task spec to re-execute the task.
- const Task task = lineage_cache_.GetTask(task_id);
+ const Task task = lineage_cache_.GetTaskOrDie(task_id);
ResubmitTask(task);
}));
@@ -1800,7 +1806,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
// Notify the task dependency manager that this object is local.
const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id);
RAY_LOG(DEBUG) << "Object local " << object_id << ", "
- << " on " << gcs_client_->client_table().GetLocalClientId()
+ << "on " << gcs_client_->client_table().GetLocalClientId()
<< ready_task_ids.size() << " tasks ready";
// Transition the tasks whose dependencies are now fulfilled to the ready state.
if (ready_task_ids.size() > 0) {
@@ -1903,9 +1909,17 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
auto task_id = spec.TaskId();
// Get and serialize the task's unforwarded, uncommitted lineage.
- auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id);
- Task &lineage_cache_entry_task =
- uncommitted_lineage.GetEntryMutable(task_id)->TaskDataMutable();
+ Lineage uncommitted_lineage;
+ if (lineage_cache_.ContainsTask(task_id)) {
+ uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(task_id, node_id);
+ } else {
+ // TODO: We expected the lineage to be in cache, but it was evicted (#3813).
+ // This is a bug but is not fatal to the application.
+ RAY_DCHECK(false) << "No lineage cache entry found for task " << task_id;
+ uncommitted_lineage.SetEntry(task, GcsStatus::NONE);
+ }
+ auto entry = uncommitted_lineage.GetEntryMutable(task_id);
+ Task &lineage_cache_entry_task = entry->TaskDataMutable();
// Increment forward count for the forwarded task.
lineage_cache_entry_task.IncrementNumForwards();
@@ -1940,10 +1954,11 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
if (!lineage_cache_.RemoveWaitingTask(task_id)) {
RAY_LOG(WARNING) << "Task " << task_id << " already removed from the lineage"
<< " cache. This is most likely due to reconstruction.";
+ } else {
+ // Mark as forwarded so that the task and its lineage is not
+ // re-forwarded in the future to the receiving node.
+ lineage_cache_.MarkTaskAsForwarded(task_id, node_id);
}
- // Mark as forwarded so that the task and its lineage is not re-forwarded
- // in the future to the receiving node.
- lineage_cache_.MarkTaskAsForwarded(task_id, node_id);
// Notify the task dependency manager that we are no longer responsible
// for executing this task.
diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h
index e60dc4e2ad66..4d9de0c63019 100644
--- a/src/ray/raylet/node_manager.h
+++ b/src/ray/raylet/node_manager.h
@@ -56,10 +56,12 @@ class NodeManager {
///
/// \param resource_config The initial set of node resources.
/// \param object_manager A reference to the local object manager.
+ /// \param reference to the local object store.
NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config,
ObjectManager &object_manager,
std::shared_ptr gcs_client,
- std::shared_ptr object_directory_);
+ std::shared_ptr object_directory_,
+ plasma::PlasmaClient &store_client);
/// Process a new client connection.
///
@@ -400,7 +402,7 @@ class NodeManager {
/// A Plasma object store client. This is used exclusively for creating new
/// objects in the object store (e.g., for actor tasks that can't be run
/// because the actor died).
- plasma::PlasmaClient store_client_;
+ plasma::PlasmaClient &store_client_;
/// A client connection to the GCS.
std::shared_ptr gcs_client_;
/// The object table. This is shared with the object manager.
diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc
index 288f0a80b481..ea4bd3fec6a2 100644
--- a/src/ray/raylet/raylet.cc
+++ b/src/ray/raylet/raylet.cc
@@ -41,9 +41,10 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
std::shared_ptr gcs_client)
: gcs_client_(gcs_client),
object_directory_(std::make_shared(main_service, gcs_client_)),
- object_manager_(main_service, object_manager_config, object_directory_),
+ object_manager_(main_service, object_manager_config, object_directory_,
+ store_client_),
node_manager_(main_service, node_manager_config, object_manager_, gcs_client_,
- object_directory_),
+ object_directory_, store_client_),
socket_name_(socket_name),
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)),
socket_(main_service),
@@ -56,6 +57,8 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
boost::asio::ip::tcp::v4(),
node_manager_config.node_manager_port)),
node_manager_socket_(main_service) {
+ RAY_ARROW_CHECK_OK(
+ store_client_.Connect(node_manager_config.store_socket_name.c_str()));
// Start listening for clients.
DoAccept();
DoAcceptObjectManager();
diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h
index 84274ea6ecfe..8f010ed512a6 100644
--- a/src/ray/raylet/raylet.h
+++ b/src/ray/raylet/raylet.h
@@ -73,6 +73,10 @@ class Raylet {
/// The object table. This is shared between the object manager and node
/// manager.
std::shared_ptr object_directory_;
+ /// Reference to Plasma Store.
+ /// A connection to the Plasma Store. This is shared between the node manager
+ /// and the main thread of the object manager.
+ plasma::PlasmaClient store_client_;
/// Manages client requests for object transfers and availability.
ObjectManager object_manager_;
/// Manages client requests for task submission and execution.
diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc
index d698402994a4..a98df6d493d3 100644
--- a/src/ray/raylet/reconstruction_policy.cc
+++ b/src/ray/raylet/reconstruction_policy.cc
@@ -145,8 +145,10 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
created_object_id,
[this, task_id, reconstruction_attempt](
const ray::ObjectID &object_id,
- const std::unordered_set &clients, bool created) {
- if (clients.empty()) {
+ const std::unordered_set &clients, bool inline_object_flag,
+ const std::vector &inline_object_data,
+ const std::string &inline_object_metadata, bool created) {
+ if (clients.empty() && !inline_object_flag) {
// The required object no longer exists on any live nodes. Attempt
// reconstruction.
AttemptReconstruction(task_id, object_id, reconstruction_attempt, created);
diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc
index 5e9ae6d7e521..9adbc1e893b9 100644
--- a/src/ray/raylet/reconstruction_policy_test.cc
+++ b/src/ray/raylet/reconstruction_policy_test.cc
@@ -29,10 +29,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const ObjectID object_id = callback.first;
auto it = locations_.find(object_id);
if (it == locations_.end()) {
- callback.second(object_id, std::unordered_set(),
+ callback.second(object_id, std::unordered_set(), false, {}, "",
/*created=*/false);
} else {
- callback.second(object_id, it->second, /*created=*/true);
+ callback.second(object_id, it->second, false, {}, "", /*created=*/true);
}
}
callbacks_.clear();
@@ -60,9 +60,11 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
- MOCK_METHOD3(ReportObjectAdded,
+ MOCK_METHOD6(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &,
- const object_manager::protocol::ObjectInfoT &));
+ const object_manager::protocol::ObjectInfoT &, bool,
+ const std::vector &, const std::string &));
+
MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));
private:
diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc
index e0d447824276..fe4364c4491f 100644
--- a/src/ray/raylet/task_dependency_manager.cc
+++ b/src/ray/raylet/task_dependency_manager.cc
@@ -304,28 +304,35 @@ void TaskDependencyManager::TaskCanceled(const TaskID &task_id) {
void TaskDependencyManager::RemoveTasksAndRelatedObjects(
const std::unordered_set &task_ids) {
- if (task_ids.empty()) {
- return;
- }
-
+ // Collect a list of all the unique objects that these tasks were subscribed
+ // to.
+ std::unordered_set required_objects;
for (auto it = task_ids.begin(); it != task_ids.end(); it++) {
+ auto task_it = task_dependencies_.find(*it);
+ if (task_it != task_dependencies_.end()) {
+ // Add the objects that this task was subscribed to.
+ required_objects.insert(task_it->second.object_dependencies.begin(),
+ task_it->second.object_dependencies.end());
+ }
+ // The task no longer depends on anything.
task_dependencies_.erase(*it);
- required_tasks_.erase(*it);
+ // The task is no longer pending execution.
pending_tasks_.erase(*it);
}
- // TODO: the size of required_objects_ could be large, consider to add
- // an index if this turns out to be a perf problem.
- for (auto it = required_objects_.begin(); it != required_objects_.end();) {
- const auto object_id = *it;
+ // Cancel all of the objects that were required by the removed tasks.
+ for (const auto &object_id : required_objects) {
TaskID creating_task_id = ComputeTaskId(object_id);
- if (task_ids.find(creating_task_id) != task_ids.end()) {
- object_manager_.CancelPull(object_id);
- reconstruction_policy_.Cancel(object_id);
- it = required_objects_.erase(it);
- } else {
- it++;
- }
+ required_tasks_.erase(creating_task_id);
+ HandleRemoteDependencyCanceled(object_id);
+ }
+
+ // Make sure that the tasks in task_ids no longer have tasks dependent on
+ // them.
+ for (const auto &task_id : task_ids) {
+ RAY_CHECK(required_tasks_.find(task_id) == required_tasks_.end())
+ << "RemoveTasksAndRelatedObjects was called on" << task_id
+ << ", but another task depends on it that was not included in the argument";
}
}
diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h
index bb49f4bc182a..afb146af6a8f 100644
--- a/src/ray/raylet/task_dependency_manager.h
+++ b/src/ray/raylet/task_dependency_manager.h
@@ -106,10 +106,12 @@ class TaskDependencyManager {
/// \return Return a vector of TaskIDs for tasks registered as pending.
std::vector GetPendingTasks() const;
- /// Remove all of the tasks specified, and all the objects created by
- /// these tasks from task dependency manager.
+ /// Remove all of the tasks specified. These tasks will no longer be
+ /// considered pending and the objects they depend on will no longer be
+ /// required.
///
- /// \param task_ids The collection of task IDs.
+ /// \param task_ids The collection of task IDs. For a given task in this set,
+ /// all tasks that depend on the task must also be included in the set.
void RemoveTasksAndRelatedObjects(const std::unordered_set &task_ids);
/// Returns debug string for class.
diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc
index 1e0528317232..f414d7469565 100644
--- a/src/ray/raylet/task_dependency_manager_test.cc
+++ b/src/ray/raylet/task_dependency_manager_test.cc
@@ -415,6 +415,62 @@ TEST_F(TaskDependencyManagerTest, TestTaskLeaseRenewal) {
Run(sleep_time);
}
+TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) {
+ // Create 3 tasks, each dependent on the previous. The first task has no
+ // arguments.
+ int num_tasks = 3;
+ auto tasks = MakeTaskChain(num_tasks, {}, 1);
+ // No objects should be remote or canceled since each task depends on a
+ // locally queued task.
+ EXPECT_CALL(object_manager_mock_, Pull(_)).Times(0);
+ EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)).Times(0);
+ EXPECT_CALL(object_manager_mock_, CancelPull(_)).Times(0);
+ EXPECT_CALL(reconstruction_policy_mock_, Cancel(_)).Times(0);
+ for (const auto &task : tasks) {
+ // Subscribe to each of the tasks' arguments.
+ const auto &arguments = task.GetDependencies();
+ task_dependency_manager_.SubscribeDependencies(task.GetTaskSpecification().TaskId(),
+ arguments);
+ // Mark each task as pending. A lease entry should be added to the GCS for
+ // each task.
+ EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _));
+ task_dependency_manager_.TaskPending(task);
+ }
+
+ // Simulate executing the first task. This should make the second task
+ // runnable.
+ auto task = tasks.front();
+ TaskID task_id = task.GetTaskSpecification().TaskId();
+ auto return_id = task.GetTaskSpecification().ReturnId(0);
+ task_dependency_manager_.UnsubscribeDependencies(task_id);
+ // Simulate the object notifications for the task's return values.
+ auto ready_tasks = task_dependency_manager_.HandleObjectLocal(return_id);
+ // The second task should be ready to run.
+ ASSERT_EQ(ready_tasks.size(), 1);
+ // Simulate the task finishing execution.
+ task_dependency_manager_.TaskCanceled(task_id);
+
+ // Remove all tasks from the manager except the first task, which already
+ // finished executing.
+ std::unordered_set task_ids;
+ for (const auto &task : tasks) {
+ task_ids.insert(task.GetTaskSpecification().TaskId());
+ }
+ task_ids.erase(task_id);
+ task_dependency_manager_.RemoveTasksAndRelatedObjects(task_ids);
+ // Simulate evicting the return value of the first task. Make sure that this
+ // does not return the second task, which should have been removed.
+ auto waiting_tasks = task_dependency_manager_.HandleObjectMissing(return_id);
+ ASSERT_TRUE(waiting_tasks.empty());
+
+ // Simulate the object notifications for the second task's return values.
+ // Make sure that this does not return the third task, which should have been
+ // removed.
+ return_id = tasks[1].GetTaskSpecification().ReturnId(0);
+ ready_tasks = task_dependency_manager_.HandleObjectLocal(return_id);
+ ASSERT_TRUE(ready_tasks.empty());
+}
+
} // namespace raylet
} // namespace ray
diff --git a/src/ray/util/logging.h b/src/ray/util/logging.h
index 6f657142efd7..d37ab9a73897 100644
--- a/src/ray/util/logging.h
+++ b/src/ray/util/logging.h
@@ -24,10 +24,11 @@ enum class RayLogLevel { DEBUG = -1, INFO = 0, WARNING = 1, ERROR = 2, FATAL = 3
#ifdef NDEBUG
-#define RAY_DCHECK(condition) \
- RAY_IGNORE_EXPR(condition); \
- while (false) ::ray::RayLogBase()
-
+#define RAY_DCHECK(condition) \
+ (condition) ? RAY_IGNORE_EXPR(0) \
+ : ::ray::Voidify() & \
+ ::ray::RayLog(__FILE__, __LINE__, ray::RayLogLevel::ERROR) \
+ << " Debug check failed: " #condition " "
#else
#define RAY_DCHECK(condition) RAY_CHECK(condition)
diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh
index 492f49e73d56..ce96887bbdc9 100755
--- a/test/jenkins_tests/run_multi_node_tests.sh
+++ b/test/jenkins_tests/run_multi_node_tests.sh
@@ -367,12 +367,11 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
--smoke-test
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
- python /ray/python/ray/tune/examples/mnist_pytorch.py \
- --smoke-test
+ python /ray/python/ray/tune/examples/mnist_pytorch.py --smoke-test --no-cuda
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
python /ray/python/ray/tune/examples/mnist_pytorch_trainable.py \
- --smoke-test
+ --smoke-test --no-cuda
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
python /ray/python/ray/tune/examples/genetic_example.py \
diff --git a/test/object_manager_test.py b/test/object_manager_test.py
index 9d67c6b61e4d..afab0292120d 100644
--- a/test/object_manager_test.py
+++ b/test/object_manager_test.py
@@ -210,7 +210,7 @@ def set_weights(self, x):
def test_object_transfer_retry(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
- repeated_push_delay = 4
+ repeated_push_delay = 10
# Force the sending object manager to allow duplicate pushes again sooner.
# Also, force the receiving object manager to retry the Pull sooner.
@@ -246,6 +246,7 @@ def f(size):
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
end_time = time.time()
+ print(end_time - start_time)
# Make sure that the first time the objects get transferred, it happens
# quickly.
assert end_time - start_time < repeated_push_delay
diff --git a/test/runtest.py b/test/runtest.py
index b280d7fc0e68..1e35a08e5844 100644
--- a/test/runtest.py
+++ b/test/runtest.py
@@ -1059,8 +1059,14 @@ def test_object_transfer_dump(ray_start_cluster):
cluster = ray_start_cluster
num_nodes = 3
+ # Set the inline object size to 0 to force all objects to be written to
+ # plasma.
+ config = json.dumps({"inline_object_max_size_bytes": 0})
for i in range(num_nodes):
- cluster.add_node(resources={str(i): 1}, object_store_memory=10**9)
+ cluster.add_node(
+ resources={str(i): 1},
+ object_store_memory=10**9,
+ _internal_config=config)
ray.init(redis_address=cluster.redis_address)
@ray.remote
@@ -2518,6 +2524,56 @@ def f():
assert len(ready_ids) == 1
+def test_inline_objects(shutdown_only):
+ config = json.dumps({"initial_reconstruction_timeout_milliseconds": 200})
+ ray.init(num_cpus=1, object_store_memory=10**7, _internal_config=config)
+
+ @ray.remote
+ class Actor(object):
+ def create_inline_object(self):
+ return "inline"
+
+ def create_non_inline_object(self):
+ return 10000 * [1]
+
+ def get(self):
+ return
+
+ a = Actor.remote()
+ # Count the number of objects that were successfully inlined.
+ inlined = 0
+ for _ in range(100):
+ inline_object = a.create_inline_object.remote()
+ ray.get(inline_object)
+ plasma_id = ray.pyarrow.plasma.ObjectID(inline_object.binary())
+ ray.worker.global_worker.plasma_client.delete([plasma_id])
+ # Make sure we can still get an inlined object created by an actor even
+ # after it has been evicted.
+ try:
+ value = ray.get(inline_object)
+ assert value == "inline"
+ inlined += 1
+ except ray.worker.RayTaskError:
+ pass
+ # Make sure some objects were inlined. Some of them may not get inlined
+ # because we evict the object soon after creating it.
+ assert inlined > 0
+
+ # Non-inlined objects are not able to be recreated after eviction.
+ for _ in range(10):
+ non_inline_object = a.create_non_inline_object.remote()
+ ray.get(non_inline_object)
+ plasma_id = ray.pyarrow.plasma.ObjectID(non_inline_object.binary())
+ # This while loop is necessary because sometimes the object is still
+ # there immediately after plasma_client.delete.
+ while ray.worker.global_worker.plasma_client.contains(plasma_id):
+ ray.worker.global_worker.plasma_client.delete([plasma_id])
+ # Objects created by an actor that were evicted and larger than the
+ # maximum inline object size cannot be retrieved or reconstructed.
+ with pytest.raises(ray.worker.RayTaskError):
+ ray.get(non_inline_object) == 10000 * [1]
+
+
def test_ray_setproctitle(shutdown_only):
ray.init(num_cpus=2)
diff --git a/test/stress_tests.py b/test/stress_tests.py
index 8c4f20d98bcf..311fbd48eb8a 100644
--- a/test/stress_tests.py
+++ b/test/stress_tests.py
@@ -197,7 +197,7 @@ def g(x):
def ray_start_reconstruction(request):
num_nodes = request.param
- plasma_store_memory = 10**9
+ plasma_store_memory = int(0.5 * 10**9)
cluster = Cluster(
initialize_head=True,