Skip to content
Merged
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ matrix:
- JDK='Oracle JDK 8'
- PYTHON=3.5 PYTHONWARNINGS=ignore
- RAY_USE_CMAKE=1
- RAY_INSTALL_JAVA=1
install:
- ./ci/travis/install-dependencies.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- ./ci/travis/install-ray.sh
script:
- ./java/test.sh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public void shutdown() {
public MockObjectStore getObjectStore() {
return store;
}

@Override
public Worker getWorker() {
return ((MockRayletClient) rayletClient).getCurrentWorker();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.ray.api.RuntimeContext;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.task.TaskSpec;

public class RuntimeContextImpl implements RuntimeContext {
Expand All @@ -22,8 +21,10 @@ public UniqueId getCurrentDriverId() {

@Override
public UniqueId getCurrentActorId() {
Preconditions.checkState(runtime.rayConfig.workerMode == WorkerMode.WORKER);
return runtime.getWorker().getCurrentActorId();
Worker worker = runtime.getWorker();
Preconditions.checkState(worker != null && !worker.getCurrentActorId().isNil(),
"This method should only be called from an actor.");
return worker.getCurrentActorId();
}

@Override
Expand Down
3 changes: 1 addition & 2 deletions java/runtime/src/main/java/org/ray/runtime/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void loop() {
* Execute a task.
*/
public void execute(TaskSpec spec) {
LOGGER.info("Executing task {}", spec.taskId);
LOGGER.debug("Executing task {}", spec);
UniqueId returnId = spec.returnIds[0];
ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
Expand Down Expand Up @@ -123,7 +122,7 @@ public void execute(TaskSpec spec) {
maybeLoadCheckpoint(result, returnId);
currentActor = result;
}
LOGGER.info("Finished executing task {}", spec.taskId);
LOGGER.debug("Finished executing task {}", spec.taskId);
} catch (Exception e) {
LOGGER.error("Error executing task " + spec, e);
if (!spec.isActorCreationTask()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,10 @@ public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
ArrayList<ObjectStoreData> rets = new ArrayList<>();
for (byte[] id : objectIds) {
try {
Constructor<ObjectStoreData> constructor = ObjectStoreData.class.getConstructor(
byte[].class, byte[].class);
Constructor<?> constructor = ObjectStoreData.class.getDeclaredConstructors()[0];
constructor.setAccessible(true);
rets.add(constructor.newInstance(metadata.get(new UniqueId(id)),
data.get(new UniqueId(id))));
rets.add((ObjectStoreData) constructor.newInstance(data.get(new UniqueId(id)),
metadata.get(new UniqueId(id))));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MockRayletClient implements RayletClient {
private final ExecutorService exec;
private final Deque<Worker> idleWorkers;
private final Map<UniqueId, Worker> actorWorkers;
private final ThreadLocal<Worker> currentWorker;

public MockRayletClient(RayDevRuntime runtime, int numberThreads) {
this.runtime = runtime;
Expand All @@ -48,6 +49,7 @@ public MockRayletClient(RayDevRuntime runtime, int numberThreads) {
exec = Executors.newFixedThreadPool(numberThreads);
idleWorkers = new LinkedList<>();
actorWorkers = new HashMap<>();
currentWorker = new ThreadLocal<>();
}

public synchronized void onObjectPut(UniqueId id) {
Expand All @@ -60,29 +62,36 @@ public synchronized void onObjectPut(UniqueId id) {
}
}

public Worker getCurrentWorker() {
return currentWorker.get();
}

/**
* Get a worker from the worker pool to run the given task.
*/
private Worker getWorker(TaskSpec task) {
if (task.isActorTask()) {
return actorWorkers.get(task.actorId);
}
Worker worker;
if (idleWorkers.size() > 0) {
worker = idleWorkers.pop();
if (task.isActorTask()) {
worker = actorWorkers.get(task.actorId);
} else {
worker = new Worker(runtime);
}
if (task.isActorCreationTask()) {
actorWorkers.put(task.actorCreationId, worker);
if (idleWorkers.size() > 0) {
worker = idleWorkers.pop();
} else {
worker = new Worker(runtime);
}
if (task.isActorCreationTask()) {
actorWorkers.put(task.actorCreationId, worker);
}
}
currentWorker.set(worker);
return worker;
}

/**
* Return the worker to the worker pool.
*/
private void returnWorker(Worker worker) {
currentWorker.remove();
idleWorkers.push(worker);
}

Expand All @@ -105,9 +114,7 @@ public synchronized void submitTask(TaskSpec task) {
new byte[]{}, new byte[]{});
}
} finally {
if (!task.isActorCreationTask() && !task.isActorTask()) {
returnWorker(worker);
}
returnWorker(worker);
}
});
} else {
Expand Down
2 changes: 1 addition & 1 deletion java/runtime/src/main/resources/ray.default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ ray {
// ----------------------------
dev-runtime {
// Number of threads that you process tasks
execution-parallelism: 5
execution-parallelism: 10
}

}
37 changes: 13 additions & 24 deletions java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,25 @@

# Cause the script to exit if a single command fails.
set -e

# Show explicitly which commands are currently running.
set -x

ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
$ROOT_DIR/../build.sh -l java

pushd $ROOT_DIR/../java
echo "Compiling Java code."
mvn clean install -Dmaven.test.skip
check_style=$(mvn checkstyle:check)
echo "${check_style}"
[[ ${check_style} =~ "BUILD FAILURE" ]] && exit 1

# test raylet
mvn test | tee mvn_test
if [ `grep -c "BUILD FAILURE" mvn_test` -eq '0' ]; then
rm mvn_test
echo "Tests passed under CLUSTER mode!"
else
rm mvn_test
exit 1
fi
# test raylet under SINGLE_PROCESS mode
mvn test -Dray.run-mode=SINGLE_PROCESS | tee dev_mvn_test
if [ `grep -c "BUILD FAILURE" dev_mvn_test` -eq '0' ]; then
rm dev_mvn_test
echo "Tests passed under SINGLE_PROCESS mode!"
else
rm dev_mvn_test
exit 1
fi

echo "Checking code format."
mvn checkstyle:check

echo "Running tests under cluster mode."
ENABLE_MULTI_LANGUAGE_TESTS=1 mvn test

echo "Running tests under single-process mode."
mvn test -Dray.run-mode=SINGLE_PROCESS

set +x
set +e

popd
2 changes: 1 addition & 1 deletion java/test/src/main/java/org/ray/api/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class TestUtils {
public static void skipTestUnderSingleProcess() {
AbstractRayRuntime runtime = (AbstractRayRuntime)Ray.internal();
if (runtime.getRayConfig().runMode == RunMode.SINGLE_PROCESS) {
throw new SkipException("Skip case.");
throw new SkipException("This test doesn't work under single-process mode.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,9 @@ public int getPid() {
}
}

@Override
public void beforeEachCase() {
TestUtils.skipTestUnderSingleProcess();
}

@Test
public void testActorReconstruction() throws InterruptedException, IOException {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1);
RayActor<Counter> actor = Ray.createActor(Counter::new, options);
// Call increase 3 times.
Expand Down Expand Up @@ -130,6 +126,8 @@ public void checkpointExpired(UniqueId actorId, UniqueId checkpointId) {

@Test
public void testActorCheckpointing() throws IOException, InterruptedException {
TestUtils.skipTestUnderSingleProcess();

ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1);
RayActor<CheckpointableCounter> actor = Ray.createActor(CheckpointableCounter::new, options);
// Call increase 3 times.
Expand All @@ -138,8 +136,6 @@ public void testActorCheckpointing() throws IOException, InterruptedException {
}
// Assert that the actor wasn't resumed from a checkpoint.
Assert.assertFalse(Ray.call(CheckpointableCounter::wasResumedFromCheckpoint, actor).get());

// Kill the actor process.
int pid = Ray.call(CheckpointableCounter::getPid, actor).get();
Runtime.getRuntime().exec("kill -9 " + pid);
// Wait for the actor to be killed.
Expand Down
2 changes: 2 additions & 0 deletions java/test/src/main/java/org/ray/api/test/ActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.api.exception.UnreconstructableException;
import org.ray.api.id.UniqueId;
Expand Down Expand Up @@ -90,6 +91,7 @@ public void testForkingActorHandle() {

@Test
public void testUnreconstructableActorObject() throws InterruptedException {
TestUtils.skipTestUnderSingleProcess();
RayActor<Counter> counter = Ray.createActor(Counter::new, 100);
// Call an actor method.
RayObject value = Ray.call(Counter::getValue, counter);
Expand Down
25 changes: 9 additions & 16 deletions java/test/src/main/java/org/ray/api/test/BaseTest.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package org.ray.api.test;

import java.lang.reflect.Method;
import org.ray.api.Ray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

public class BaseTest {

private static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class);

@BeforeMethod
public void setUp() {
public void setUpBase(Method method) {
LOGGER.info("===== Running test: "
+ method.getDeclaringClass().getName() + "." + method.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also log the current mode in this log?
==== Running test: A.b [Cluster]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not very useful. we can also find the mode from the command.

System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4,RES-A:4");
beforeInitRay();
Ray.init();
beforeEachCase();
}

@AfterMethod
public void tearDown() {
public void tearDownBase() {
// TODO(qwang): This is double check to check that the socket file is removed actually.
// We could not enable this until `systemInfo` enabled.
//File rayletSocketFIle = new File(Ray.systemInfo().rayletSocketName());
Ray.shutdown();
afterShutdownRay();

//remove raylet socket file
//rayletSocketFIle.delete();
Expand All @@ -31,15 +35,4 @@ public void tearDown() {
System.clearProperty("ray.resources");
}

protected void beforeInitRay() {

}

protected void afterShutdownRay() {

}

protected void beforeEachCase() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ public class ClientExceptionTest extends BaseTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientExceptionTest.class);

@Override
public void beforeEachCase() {
TestUtils.skipTestUnderSingleProcess();
}

@Test
public void testWaitAndCrash() {
TestUtils.skipTestUnderSingleProcess();
UniqueId randomId = UniqueId.randomId();
RayObject<String> notExisting = new RayObjectImpl(randomId);

Expand Down
10 changes: 5 additions & 5 deletions java/test/src/main/java/org/ray/api/test/FailureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,29 @@ private static void assertTaskFailedWithRayTaskException(RayObject<?> rayObject)
}
}

@Override
public void beforeEachCase() {
TestUtils.skipTestUnderSingleProcess();
}

@Test
public void testNormalTaskFailure() {
TestUtils.skipTestUnderSingleProcess();
assertTaskFailedWithRayTaskException(Ray.call(FailureTest::badFunc));
}

@Test
public void testActorCreationFailure() {
TestUtils.skipTestUnderSingleProcess();
RayActor<BadActor> actor = Ray.createActor(BadActor::new, true);
assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor));
}

@Test
public void testActorTaskFailure() {
TestUtils.skipTestUnderSingleProcess();
RayActor<BadActor> actor = Ray.createActor(BadActor::new, false);
assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor));
}

@Test
public void testWorkerProcessDying() {
TestUtils.skipTestUnderSingleProcess();
try {
Ray.call(FailureTest::badFunc2).get();
Assert.fail("This line shouldn't be reached.");
Expand All @@ -90,6 +89,7 @@ public void testWorkerProcessDying() {

@Test
public void testActorProcessDying() {
TestUtils.skipTestUnderSingleProcess();
RayActor<BadActor> actor = Ray.createActor(BadActor::new, false);
try {
Ray.call(BadActor::badMethod2, actor).get();
Expand Down
Loading