diff --git a/cmake/Modules/ArrowExternalProject.cmake b/cmake/Modules/ArrowExternalProject.cmake index f691d990b283..b6cba2e0072c 100644 --- a/cmake/Modules/ArrowExternalProject.cmake +++ b/cmake/Modules/ArrowExternalProject.cmake @@ -15,17 +15,17 @@ # - PLASMA_SHARED_LIB set(arrow_URL https://github.com/ray-project/arrow.git) -# This commit is based on https://github.com/apache/arrow/pull/3410. We +# This commit is based on https://github.com/apache/arrow/pull/3526. We # include the link here to make it easier to find the right commit because # Arrow often rewrites git history and invalidates certain commits. # It has been patched to fix an upstream symbol clash with TensorFlow, # the patch is available at -# https://github.com/ray-project/arrow/commit/511dae1149e3656bbf84f461729f2306d2ebf2e5 +# https://github.com/ray-project/arrow/commit/007e1ca289e979bac80231fa9ee7510be744b60b # See the discussion in https://github.com/apache/arrow/pull/3177 # WARNING: If the arrow version is updated, you need to also update the # SETUPTOOLS_SCM_PRETEND_VERSION version string in the ThirdpartyToolchain.cmake # file -set(arrow_TAG 511dae1149e3656bbf84f461729f2306d2ebf2e5) +set(arrow_TAG 007e1ca289e979bac80231fa9ee7510be744b60b) set(ARROW_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}/external/arrow-install) set(ARROW_HOME ${ARROW_INSTALL_PREFIX}) diff --git a/java/pom.xml b/java/pom.xml index b5b13d417703..0ccb62a5811d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -29,7 +29,7 @@ org.apache.arrow arrow-plasma - 0.10.0 + 0.13.0-SNAPSHOT de.ruedigermoeller diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java index 466233e9b32f..3f67a0b827dd 100644 --- a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java +++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java @@ -1,11 +1,13 @@ package org.ray.runtime.objectstore; +import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.arrow.plasma.ObjectStoreLink; +import org.apache.arrow.plasma.ObjectStoreLink.ObjectStoreData; import org.ray.api.id.UniqueId; import org.ray.runtime.RayDevRuntime; import org.ray.runtime.raylet.MockRayletClient; @@ -57,12 +59,18 @@ public List get(byte[][] objectIds, int timeoutMs, boolean isMetadata) { } @Override - public List wait(byte[][] objectIds, int timeoutMs, int numReturns) { - ArrayList rets = new ArrayList<>(); + public List get(byte[][] objectIds, int timeoutMs) { + ArrayList rets = new ArrayList<>(); + // TODO(yuhguo): make ObjectStoreData's constructor public. for (byte[] objId : objectIds) { - //tod test - if (data.containsKey(new UniqueId(objId))) { - rets.add(objId); + UniqueId uniqueId = new UniqueId(objId); + try { + Constructor constructor = ObjectStoreData.class.getConstructor( + byte[].class, byte[].class); + constructor.setAccessible(true); + rets.add(constructor.newInstance(metadata.get(uniqueId), data.get(uniqueId))); + } catch (Exception e) { + throw new RuntimeException(e); } } return rets; @@ -73,11 +81,6 @@ public byte[] hash(byte[] objectId) { return null; } - @Override - public void fetch(byte[][] objectIds) { - - } - @Override public long evict(long numBytes) { return 0; @@ -89,8 +92,12 @@ public void release(byte[] objectId) { } @Override - public boolean contains(byte[] objectId) { + public void delete(byte[] objectId) { + return; + } + @Override + public boolean contains(byte[] objectId) { return data.containsKey(new UniqueId(objectId)); } diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java index 2bbb457dd870..31026930f347 100644 --- a/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java +++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java @@ -4,6 +4,7 @@ import java.util.List; import org.apache.arrow.plasma.ObjectStoreLink; import org.apache.arrow.plasma.PlasmaClient; +import org.apache.arrow.plasma.exceptions.DuplicateObjectException; import org.apache.commons.lang3.tuple.Pair; import org.ray.api.exception.RayException; import org.ray.api.id.UniqueId; @@ -12,6 +13,8 @@ import org.ray.runtime.config.RunMode; import org.ray.runtime.util.Serializer; import org.ray.runtime.util.UniqueIdUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Object store proxy, which handles serialization and deserialization, and utilize a {@code @@ -19,6 +22,8 @@ */ public class ObjectStoreProxy { + private static final Logger LOGGER = LoggerFactory.getLogger(ObjectStoreProxy.class); + private static final int GET_TIMEOUT_MS = 1000; private final AbstractRayRuntime runtime; @@ -82,11 +87,19 @@ public List> get(List ids, int timeoutMs, boole } public void put(UniqueId id, Object obj, Object metadata) { - objectStore.get().put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata)); + try { + objectStore.get().put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata)); + } catch (DuplicateObjectException e) { + LOGGER.warn(e.getMessage()); + } } public void putSerialized(UniqueId id, byte[] obj, byte[] metadata) { - objectStore.get().put(id.getBytes(), obj, metadata); + try { + objectStore.get().put(id.getBytes(), obj, metadata); + } catch (DuplicateObjectException e) { + LOGGER.warn(e.getMessage()); + } } public enum GetStatus { diff --git a/test/stress_tests.py b/test/stress_tests.py index 8c4f20d98bcf..311fbd48eb8a 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -197,7 +197,7 @@ def g(x): def ray_start_reconstruction(request): num_nodes = request.param - plasma_store_memory = 10**9 + plasma_store_memory = int(0.5 * 10**9) cluster = Cluster( initialize_head=True,