Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
394c85d
use one memory mapped file for plasma
pcmoritz Jan 26, 2019
c03992f
rebase
pcmoritz Jan 30, 2019
5986a49
Update pom.xml for arrow
Feb 2, 2019
05580d7
Fix java test by copy part code of https://github.com/ray-project/ray…
Feb 2, 2019
97209a4
temp commit: try increased timeout for valgrind
pcmoritz Feb 2, 2019
523b3a3
update
pcmoritz Feb 3, 2019
c4818b5
update
pcmoritz Feb 4, 2019
e2beda8
update
pcmoritz Feb 4, 2019
05ae552
fix
pcmoritz Feb 4, 2019
bafcbac
fix deserialization codepath too
pcmoritz Feb 4, 2019
a6ab285
update
pcmoritz Feb 4, 2019
dc9bc58
update
pcmoritz Feb 4, 2019
cb7ed7c
debug
pcmoritz Feb 4, 2019
3a48208
debug 2
pcmoritz Feb 4, 2019
1c6142a
update
pcmoritz Feb 4, 2019
b4e7edf
update
pcmoritz Feb 4, 2019
3801737
debug
pcmoritz Feb 4, 2019
e9ee35b
update
pcmoritz Feb 4, 2019
b38626d
update
pcmoritz Feb 4, 2019
a50e6af
debug upstream
pcmoritz Feb 4, 2019
bfa3e33
update
pcmoritz Feb 4, 2019
ae5992d
update
pcmoritz Feb 4, 2019
b31009a
Merge branch 'master' into ray-plasma-one-file
pcmoritz Feb 4, 2019
978d660
update
pcmoritz Feb 4, 2019
3061f92
Merge branch 'ray-plasma-one-file' of github.com:pcmoritz/ray-1 into …
pcmoritz Feb 4, 2019
3583ad1
update
pcmoritz Feb 5, 2019
420a6b9
update
pcmoritz Feb 5, 2019
ea4ab5a
update
pcmoritz Feb 5, 2019
3f6b241
update
pcmoritz Feb 5, 2019
414736f
update
pcmoritz Feb 5, 2019
e9bc574
update
pcmoritz Feb 6, 2019
513ad14
update
pcmoritz Feb 6, 2019
ce71019
update
pcmoritz Feb 6, 2019
32cdc25
update
pcmoritz Feb 6, 2019
a0b6179
update
pcmoritz Feb 6, 2019
90fe323
update
pcmoritz Feb 6, 2019
d3ab3d7
roll back
pcmoritz Feb 6, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmake/Modules/ArrowExternalProject.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
# - PLASMA_SHARED_LIB

set(arrow_URL https://github.com/ray-project/arrow.git)
# This commit is based on https://github.com/apache/arrow/pull/3410. We
# This commit is based on https://github.com/apache/arrow/pull/3526. We
# include the link here to make it easier to find the right commit because
# Arrow often rewrites git history and invalidates certain commits.
# It has been patched to fix an upstream symbol clash with TensorFlow,
# the patch is available at
# https://github.com/ray-project/arrow/commit/511dae1149e3656bbf84f461729f2306d2ebf2e5
# https://github.com/ray-project/arrow/commit/007e1ca289e979bac80231fa9ee7510be744b60b
# See the discussion in https://github.com/apache/arrow/pull/3177
# WARNING: If the arrow version is updated, you need to also update the
# SETUPTOOLS_SCM_PRETEND_VERSION version string in the ThirdpartyToolchain.cmake
# file
set(arrow_TAG 511dae1149e3656bbf84f461729f2306d2ebf2e5)
set(arrow_TAG 007e1ca289e979bac80231fa9ee7510be744b60b)

set(ARROW_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}/external/arrow-install)
set(ARROW_HOME ${ARROW_INSTALL_PREFIX})
Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-plasma</artifactId>
<version>0.10.0</version>
<version>0.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.ray.runtime.objectstore;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.arrow.plasma.ObjectStoreLink.ObjectStoreData;
import org.ray.api.id.UniqueId;
import org.ray.runtime.RayDevRuntime;
import org.ray.runtime.raylet.MockRayletClient;
Expand Down Expand Up @@ -57,12 +59,18 @@ public List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata) {
}

@Override
public List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns) {
ArrayList<byte[]> rets = new ArrayList<>();
public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
ArrayList<ObjectStoreData> rets = new ArrayList<>();
// TODO(yuhguo): make ObjectStoreData's constructor public.
for (byte[] objId : objectIds) {
//tod test
if (data.containsKey(new UniqueId(objId))) {
rets.add(objId);
UniqueId uniqueId = new UniqueId(objId);
try {
Constructor<ObjectStoreData> constructor = ObjectStoreData.class.getConstructor(
byte[].class, byte[].class);
constructor.setAccessible(true);
rets.add(constructor.newInstance(metadata.get(uniqueId), data.get(uniqueId)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return rets;
Expand All @@ -73,11 +81,6 @@ public byte[] hash(byte[] objectId) {
return null;
}

@Override
public void fetch(byte[][] objectIds) {

}

@Override
public long evict(long numBytes) {
return 0;
Expand All @@ -89,8 +92,12 @@ public void release(byte[] objectId) {
}

@Override
public boolean contains(byte[] objectId) {
public void delete(byte[] objectId) {
return;
}

@Override
public boolean contains(byte[] objectId) {
return data.containsKey(new UniqueId(objectId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.arrow.plasma.PlasmaClient;
import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
Expand All @@ -12,13 +13,17 @@
import org.ray.runtime.config.RunMode;
import org.ray.runtime.util.Serializer;
import org.ray.runtime.util.UniqueIdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Object store proxy, which handles serialization and deserialization, and utilize a {@code
* org.ray.spi.ObjectStoreLink} to actually store data.
*/
public class ObjectStoreProxy {

private static final Logger LOGGER = LoggerFactory.getLogger(ObjectStoreProxy.class);

private static final int GET_TIMEOUT_MS = 1000;

private final AbstractRayRuntime runtime;
Expand Down Expand Up @@ -82,11 +87,19 @@ public <T> List<Pair<T, GetStatus>> get(List<UniqueId> ids, int timeoutMs, boole
}

public void put(UniqueId id, Object obj, Object metadata) {
objectStore.get().put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata));
try {
objectStore.get().put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata));
} catch (DuplicateObjectException e) {
LOGGER.warn(e.getMessage());
}
}

public void putSerialized(UniqueId id, byte[] obj, byte[] metadata) {
objectStore.get().put(id.getBytes(), obj, metadata);
try {
objectStore.get().put(id.getBytes(), obj, metadata);
} catch (DuplicateObjectException e) {
LOGGER.warn(e.getMessage());
}
}

public enum GetStatus {
Expand Down
2 changes: 1 addition & 1 deletion test/stress_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def g(x):
def ray_start_reconstruction(request):
num_nodes = request.param

plasma_store_memory = 10**9
plasma_store_memory = int(0.5 * 10**9)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was needed because the space calculations in plasma are now done slightly differently (there can now be a discrepancy between the plasma store memory and the size of the memory mapped files, in practice I have seen < 10%).

This change is to be on the safe side to prevent flakiness. With 0.8 here I was still seeing test_simple and test_recursive in stress_tests.py failing occasionally.


cluster = Cluster(
initialize_head=True,
Expand Down