diff --git a/.travis.yml b/.travis.yml index 6e48d406ce28..1d6d61de018d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,9 +26,11 @@ matrix: - JDK='Oracle JDK 8' - PYTHON=3.5 PYTHONWARNINGS=ignore - RAY_USE_CMAKE=1 + - RAY_INSTALL_JAVA=1 install: - ./ci/travis/install-dependencies.sh - export PATH="$HOME/miniconda/bin:$PATH" + - ./ci/travis/install-ray.sh script: - ./java/test.sh diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 7dffd3fd54c5..e5d7b20b1d64 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -28,4 +28,9 @@ public void shutdown() { public MockObjectStore getObjectStore() { return store; } + + @Override + public Worker getWorker() { + return ((MockRayletClient) rayletClient).getCurrentWorker(); + } } diff --git a/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java b/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java index f0780cc2d8cd..b0ba67a4c3f2 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java @@ -4,7 +4,6 @@ import org.ray.api.RuntimeContext; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RunMode; -import org.ray.runtime.config.WorkerMode; import org.ray.runtime.task.TaskSpec; public class RuntimeContextImpl implements RuntimeContext { @@ -22,8 +21,10 @@ public UniqueId getCurrentDriverId() { @Override public UniqueId getCurrentActorId() { - Preconditions.checkState(runtime.rayConfig.workerMode == WorkerMode.WORKER); - return runtime.getWorker().getCurrentActorId(); + Worker worker = runtime.getWorker(); + Preconditions.checkState(worker != null && !worker.getCurrentActorId().isNil(), + "This method should only be called from an actor."); + return worker.getCurrentActorId(); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index e6a069efce76..ef319ea20233 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -79,7 +79,6 @@ public void loop() { * Execute a task. */ public void execute(TaskSpec spec) { - LOGGER.info("Executing task {}", spec.taskId); LOGGER.debug("Executing task {}", spec); UniqueId returnId = spec.returnIds[0]; ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); @@ -123,7 +122,7 @@ public void execute(TaskSpec spec) { maybeLoadCheckpoint(result, returnId); currentActor = result; } - LOGGER.info("Finished executing task {}", spec.taskId); + LOGGER.debug("Finished executing task {}", spec.taskId); } catch (Exception e) { LOGGER.error("Error executing task " + spec, e); if (!spec.isActorCreationTask()) { diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java index 3470840826d2..d81c566bf822 100644 --- a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java +++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java @@ -95,11 +95,10 @@ public List get(byte[][] objectIds, int timeoutMs) { ArrayList rets = new ArrayList<>(); for (byte[] id : objectIds) { try { - Constructor constructor = ObjectStoreData.class.getConstructor( - byte[].class, byte[].class); + Constructor constructor = ObjectStoreData.class.getDeclaredConstructors()[0]; constructor.setAccessible(true); - rets.add(constructor.newInstance(metadata.get(new UniqueId(id)), - data.get(new UniqueId(id)))); + rets.add((ObjectStoreData) constructor.newInstance(data.get(new UniqueId(id)), + metadata.get(new UniqueId(id)))); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java index f16c8f9f8cfc..e44fd1014a63 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java @@ -39,6 +39,7 @@ public class MockRayletClient implements RayletClient { private final ExecutorService exec; private final Deque idleWorkers; private final Map actorWorkers; + private final ThreadLocal currentWorker; public MockRayletClient(RayDevRuntime runtime, int numberThreads) { this.runtime = runtime; @@ -48,6 +49,7 @@ public MockRayletClient(RayDevRuntime runtime, int numberThreads) { exec = Executors.newFixedThreadPool(numberThreads); idleWorkers = new LinkedList<>(); actorWorkers = new HashMap<>(); + currentWorker = new ThreadLocal<>(); } public synchronized void onObjectPut(UniqueId id) { @@ -60,22 +62,28 @@ public synchronized void onObjectPut(UniqueId id) { } } + public Worker getCurrentWorker() { + return currentWorker.get(); + } + /** * Get a worker from the worker pool to run the given task. */ private Worker getWorker(TaskSpec task) { - if (task.isActorTask()) { - return actorWorkers.get(task.actorId); - } Worker worker; - if (idleWorkers.size() > 0) { - worker = idleWorkers.pop(); + if (task.isActorTask()) { + worker = actorWorkers.get(task.actorId); } else { - worker = new Worker(runtime); - } - if (task.isActorCreationTask()) { - actorWorkers.put(task.actorCreationId, worker); + if (idleWorkers.size() > 0) { + worker = idleWorkers.pop(); + } else { + worker = new Worker(runtime); + } + if (task.isActorCreationTask()) { + actorWorkers.put(task.actorCreationId, worker); + } } + currentWorker.set(worker); return worker; } @@ -83,6 +91,7 @@ private Worker getWorker(TaskSpec task) { * Return the worker to the worker pool. */ private void returnWorker(Worker worker) { + currentWorker.remove(); idleWorkers.push(worker); } @@ -105,9 +114,7 @@ public synchronized void submitTask(TaskSpec task) { new byte[]{}, new byte[]{}); } } finally { - if (!task.isActorCreationTask() && !task.isActorTask()) { - returnWorker(worker); - } + returnWorker(worker); } }); } else { diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 81dab4d3d017..5faeda7cfedf 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -100,7 +100,7 @@ ray { // ---------------------------- dev-runtime { // Number of threads that you process tasks - execution-parallelism: 5 + execution-parallelism: 10 } } diff --git a/java/test.sh b/java/test.sh index 1c6370d1fe8e..b3e889371bcd 100755 --- a/java/test.sh +++ b/java/test.sh @@ -2,36 +2,25 @@ # Cause the script to exit if a single command fails. set -e - # Show explicitly which commands are currently running. set -x ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) -$ROOT_DIR/../build.sh -l java pushd $ROOT_DIR/../java +echo "Compiling Java code." mvn clean install -Dmaven.test.skip -check_style=$(mvn checkstyle:check) -echo "${check_style}" -[[ ${check_style} =~ "BUILD FAILURE" ]] && exit 1 - -# test raylet -mvn test | tee mvn_test -if [ `grep -c "BUILD FAILURE" mvn_test` -eq '0' ]; then - rm mvn_test - echo "Tests passed under CLUSTER mode!" -else - rm mvn_test - exit 1 -fi -# test raylet under SINGLE_PROCESS mode -mvn test -Dray.run-mode=SINGLE_PROCESS | tee dev_mvn_test -if [ `grep -c "BUILD FAILURE" dev_mvn_test` -eq '0' ]; then - rm dev_mvn_test - echo "Tests passed under SINGLE_PROCESS mode!" -else - rm dev_mvn_test - exit 1 -fi + +echo "Checking code format." +mvn checkstyle:check + +echo "Running tests under cluster mode." +ENABLE_MULTI_LANGUAGE_TESTS=1 mvn test + +echo "Running tests under single-process mode." +mvn test -Dray.run-mode=SINGLE_PROCESS + +set +x +set +e popd diff --git a/java/test/src/main/java/org/ray/api/TestUtils.java b/java/test/src/main/java/org/ray/api/TestUtils.java index 18b7230eec79..9b1ea915b409 100644 --- a/java/test/src/main/java/org/ray/api/TestUtils.java +++ b/java/test/src/main/java/org/ray/api/TestUtils.java @@ -9,7 +9,7 @@ public class TestUtils { public static void skipTestUnderSingleProcess() { AbstractRayRuntime runtime = (AbstractRayRuntime)Ray.internal(); if (runtime.getRayConfig().runMode == RunMode.SINGLE_PROCESS) { - throw new SkipException("Skip case."); + throw new SkipException("This test doesn't work under single-process mode."); } } } diff --git a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java index 12d7d1a8a931..e575daa84f13 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java @@ -44,13 +44,9 @@ public int getPid() { } } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testActorReconstruction() throws InterruptedException, IOException { + TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1); RayActor actor = Ray.createActor(Counter::new, options); // Call increase 3 times. @@ -130,6 +126,8 @@ public void checkpointExpired(UniqueId actorId, UniqueId checkpointId) { @Test public void testActorCheckpointing() throws IOException, InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1); RayActor actor = Ray.createActor(CheckpointableCounter::new, options); // Call increase 3 times. @@ -138,8 +136,6 @@ public void testActorCheckpointing() throws IOException, InterruptedException { } // Assert that the actor wasn't resumed from a checkpoint. Assert.assertFalse(Ray.call(CheckpointableCounter::wasResumedFromCheckpoint, actor).get()); - - // Kill the actor process. int pid = Ray.call(CheckpointableCounter::getPid, actor).get(); Runtime.getRuntime().exec("kill -9 " + pid); // Wait for the actor to be killed. diff --git a/java/test/src/main/java/org/ray/api/test/ActorTest.java b/java/test/src/main/java/org/ray/api/test/ActorTest.java index 96be700b9002..876ab322d66d 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorTest.java @@ -5,6 +5,7 @@ import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; +import org.ray.api.TestUtils; import org.ray.api.annotation.RayRemote; import org.ray.api.exception.UnreconstructableException; import org.ray.api.id.UniqueId; @@ -90,6 +91,7 @@ public void testForkingActorHandle() { @Test public void testUnreconstructableActorObject() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); RayActor counter = Ray.createActor(Counter::new, 100); // Call an actor method. RayObject value = Ray.call(Counter::getValue, counter); diff --git a/java/test/src/main/java/org/ray/api/test/BaseTest.java b/java/test/src/main/java/org/ray/api/test/BaseTest.java index e84e8fadf8ea..b67a8f64c7ce 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseTest.java @@ -1,27 +1,31 @@ package org.ray.api.test; +import java.lang.reflect.Method; import org.ray.api.Ray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; public class BaseTest { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class); + @BeforeMethod - public void setUp() { + public void setUpBase(Method method) { + LOGGER.info("===== Running test: " + + method.getDeclaringClass().getName() + "." + method.getName()); System.setProperty("ray.home", "../.."); System.setProperty("ray.resources", "CPU:4,RES-A:4"); - beforeInitRay(); Ray.init(); - beforeEachCase(); } @AfterMethod - public void tearDown() { + public void tearDownBase() { // TODO(qwang): This is double check to check that the socket file is removed actually. // We could not enable this until `systemInfo` enabled. //File rayletSocketFIle = new File(Ray.systemInfo().rayletSocketName()); Ray.shutdown(); - afterShutdownRay(); //remove raylet socket file //rayletSocketFIle.delete(); @@ -31,15 +35,4 @@ public void tearDown() { System.clearProperty("ray.resources"); } - protected void beforeInitRay() { - - } - - protected void afterShutdownRay() { - - } - - protected void beforeEachCase() { - - } } diff --git a/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java b/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java index 0c0433299386..e9f53dddd794 100644 --- a/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java @@ -17,13 +17,9 @@ public class ClientExceptionTest extends BaseTest { private static final Logger LOGGER = LoggerFactory.getLogger(ClientExceptionTest.class); - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testWaitAndCrash() { + TestUtils.skipTestUnderSingleProcess(); UniqueId randomId = UniqueId.randomId(); RayObject notExisting = new RayObjectImpl(randomId); diff --git a/java/test/src/main/java/org/ray/api/test/FailureTest.java b/java/test/src/main/java/org/ray/api/test/FailureTest.java index f74860177909..6d47a2fc99fa 100644 --- a/java/test/src/main/java/org/ray/api/test/FailureTest.java +++ b/java/test/src/main/java/org/ray/api/test/FailureTest.java @@ -55,30 +55,29 @@ private static void assertTaskFailedWithRayTaskException(RayObject rayObject) } } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testNormalTaskFailure() { + TestUtils.skipTestUnderSingleProcess(); assertTaskFailedWithRayTaskException(Ray.call(FailureTest::badFunc)); } @Test public void testActorCreationFailure() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(BadActor::new, true); assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor)); } @Test public void testActorTaskFailure() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(BadActor::new, false); assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor)); } @Test public void testWorkerProcessDying() { + TestUtils.skipTestUnderSingleProcess(); try { Ray.call(FailureTest::badFunc2).get(); Assert.fail("This line shouldn't be reached."); @@ -90,6 +89,7 @@ public void testWorkerProcessDying() { @Test public void testActorProcessDying() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(BadActor::new, false); try { Ray.call(BadActor::badMethod2, actor).get(); diff --git a/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java b/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java index b3a8e87b7326..c81a148980f9 100644 --- a/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java +++ b/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java @@ -2,7 +2,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; -import java.lang.ProcessBuilder.Redirect; +import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.TimeUnit; import org.ray.api.Ray; @@ -33,13 +33,13 @@ public static String echo(String word) { /** * Execute an external command. + * * @return Whether the command succeeded. */ private boolean executeCommand(List command, int waitTimeoutSeconds) { try { LOGGER.info("Executing command: {}", String.join(" ", command)); - Process process = new ProcessBuilder(command).redirectOutput(Redirect.INHERIT) - .redirectError(Redirect.INHERIT).start(); + Process process = new ProcessBuilder(command).inheritIO().start(); process.waitFor(waitTimeoutSeconds, TimeUnit.SECONDS); return process.exitValue() == 0; } catch (Exception e) { @@ -48,11 +48,12 @@ private boolean executeCommand(List command, int waitTimeoutSeconds) { } @BeforeMethod - public void setUp() { - // Check whether 'ray' command is installed. - boolean rayCommandExists = executeCommand(ImmutableList.of("which", "ray"), 5); - if (!rayCommandExists) { - throw new SkipException("Skipping test, because ray command doesn't exist."); + public void setUp(Method method) { + String testName = method.getName(); + if (!"1".equals(System.getenv("ENABLE_MULTI_LANGUAGE_TESTS"))) { + LOGGER.info("Skip " + testName + + " because env variable ENABLE_MULTI_LANGUAGE_TESTS isn't set"); + throw new SkipException("Skip test."); } // Delete existing socket files. @@ -64,15 +65,20 @@ public void setUp() { } // Start ray cluster. + String testDir = System.getProperty("user.dir"); + String workerOptions = String.format("-Dray.home=%s/../../", testDir); + workerOptions += + " -classpath " + String.format("%s/../../build/java/*:%s/target/*", testDir, testDir); final List startCommand = ImmutableList.of( "ray", "start", "--head", "--redis-port=6379", - "--include-java", String.format("--plasma-store-socket-name=%s", PLASMA_STORE_SOCKET_NAME), String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME), - "--java-worker-options=-classpath ../../build/java/*:../../java/test/target/*" + "--load-code-from-local", + "--include-java", + "--java-worker-options=" + workerOptions ); if (!executeCommand(startCommand, 10)) { throw new RuntimeException("Couldn't start ray cluster."); diff --git a/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java b/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java index 6bbd39ffa20b..6289d1cd7170 100644 --- a/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java +++ b/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java @@ -12,6 +12,7 @@ import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; +import org.ray.api.TestUtils; import org.ray.api.WaitResult; import org.ray.api.annotation.RayRemote; import org.testng.Assert; @@ -73,11 +74,15 @@ public static String testMultiThreading() { @Test public void testInDriver() { + // TODO(hchen): Fix this test under single-process mode. + TestUtils.skipTestUnderSingleProcess(); testMultiThreading(); } @Test public void testInWorker() { + // Single-process mode doesn't have real workers. + TestUtils.skipTestUnderSingleProcess(); RayObject obj = Ray.call(MultiThreadingTest::testMultiThreading); Assert.assertEquals("ok", obj.get()); } diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java index 726bad3da97c..7abc3f421f97 100644 --- a/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java +++ b/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java @@ -4,6 +4,7 @@ import org.apache.arrow.plasma.exceptions.DuplicateObjectException; import org.ray.api.Ray; +import org.ray.api.TestUtils; import org.ray.api.id.UniqueId; import org.ray.runtime.AbstractRayRuntime; import org.testng.Assert; @@ -13,6 +14,7 @@ public class PlasmaStoreTest extends BaseTest { @Test public void testPutWithDuplicateId() { + TestUtils.skipTestUnderSingleProcess(); UniqueId objectId = UniqueId.randomId(); AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); PlasmaClient store = new PlasmaClient(runtime.getRayConfig().objectStoreSocketName, "", 0); diff --git a/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java b/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java index 210a4a045540..114ef7498a77 100644 --- a/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java +++ b/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java @@ -4,18 +4,20 @@ import org.ray.api.RayObject; import org.ray.api.annotation.RayRemote; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class RedisPasswordTest extends BaseTest { - @Override - public void beforeInitRay() { + @BeforeClass + public void setUp() { System.setProperty("ray.redis.head-password", "12345678"); System.setProperty("ray.redis.password", "12345678"); } - @Override - public void afterShutdownRay() { + @AfterClass + public void tearDown() { System.clearProperty("ray.redis.head-password"); System.clearProperty("ray.redis.password"); } diff --git a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java index 5d021d0cb1ad..114dfd3960ce 100644 --- a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java +++ b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java @@ -25,18 +25,15 @@ public static Integer echo(Integer number) { @RayRemote public static class Echo { + public Integer echo(Integer number) { return number; } } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testMethods() { + TestUtils.skipTestUnderSingleProcess(); CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0)); // This is a case that can satisfy required resources. @@ -57,6 +54,7 @@ public void testMethods() { @Test public void testActors() { + TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions actorCreationOptions1 = new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0)); diff --git a/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java b/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java index b6fdca32f170..512519bce02a 100644 --- a/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java +++ b/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java @@ -5,6 +5,8 @@ import org.ray.api.annotation.RayRemote; import org.ray.api.id.UniqueId; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class RuntimeContextTest extends BaseTest { @@ -14,13 +16,20 @@ public class RuntimeContextTest extends BaseTest { private static String RAYLET_SOCKET_NAME = "/tmp/ray/test/raylet_socket"; private static String OBJECT_STORE_SOCKET_NAME = "/tmp/ray/test/object_store_socket"; - @Override - public void beforeInitRay() { + @BeforeClass + public void setUp() { System.setProperty("ray.driver.id", DRIVER_ID.toString()); System.setProperty("ray.raylet.socket-name", RAYLET_SOCKET_NAME); System.setProperty("ray.object-store.socket-name", OBJECT_STORE_SOCKET_NAME); } + @AfterClass + public void tearDown() { + System.clearProperty("ray.driver.id"); + System.clearProperty("ray.raylet.socket-name"); + System.clearProperty("ray.object-store.socket-name"); + } + @Test public void testRuntimeContextInDriver() { Assert.assertEquals(DRIVER_ID, Ray.getRuntimeContext().getCurrentDriverId()); diff --git a/java/test/src/main/java/org/ray/api/test/StressTest.java b/java/test/src/main/java/org/ray/api/test/StressTest.java index 24bc467db0ff..b5bf1356ea4f 100644 --- a/java/test/src/main/java/org/ray/api/test/StressTest.java +++ b/java/test/src/main/java/org/ray/api/test/StressTest.java @@ -17,13 +17,9 @@ public static int echo(int x) { return x; } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testSubmittingTasks() { + TestUtils.skipTestUnderSingleProcess(); for (int numIterations : ImmutableList.of(1, 10, 100, 1000)) { int numTasks = 1000 / numIterations; for (int i = 0; i < numIterations; i++) { @@ -40,6 +36,7 @@ public void testSubmittingTasks() { @Test public void testDependency() { + TestUtils.skipTestUnderSingleProcess(); RayObject x = Ray.call(StressTest::echo, 1); for (int i = 0; i < 1000; i++) { x = Ray.call(StressTest::echo, x); @@ -77,6 +74,7 @@ public int ping(int n) { @Test public void testSubmittingManyTasksToOneActor() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(Actor::new); List objectIds = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -90,6 +88,7 @@ public void testSubmittingManyTasksToOneActor() { @Test public void testPuttingAndGettingManyObjects() { + TestUtils.skipTestUnderSingleProcess(); Integer objectToPut = 1; List> objects = new ArrayList<>(); for (int i = 0; i < 100_000; i++) { diff --git a/python/ray/services.py b/python/ray/services.py index 76be5bc9d63f..42b28581b60a 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1242,7 +1242,7 @@ def build_java_worker_command( """ assert java_worker_options is not None - command = "java {} ".format(java_worker_options) + command = "java ".format(java_worker_options) if redis_address is not None: command += "-Dray.redis.address={} ".format(redis_address) @@ -1259,6 +1259,11 @@ def build_java_worker_command( command += "-Dray.home={} ".format(RAY_HOME) # TODO(suquark): We should use temp_dir as the input of a java worker. command += "-Dray.log-dir={} ".format(os.path.join(temp_dir, "sockets")) + + if java_worker_options: + # Put `java_worker_options` in the last, so it can overwrite the + # above options. + command += java_worker_options + " " command += "org.ray.runtime.runner.worker.DefaultWorker" return command diff --git a/python/setup.py b/python/setup.py index 2177e0c6b5a4..1b36b0ffaf11 100644 --- a/python/setup.py +++ b/python/setup.py @@ -80,7 +80,11 @@ def run(self): # version of Python to build pyarrow inside the build.sh script. Note # that certain flags will not be passed along such as --user or sudo. # TODO(rkn): Fix this. - subprocess.check_call(["../build.sh", "-p", sys.executable]) + command = ["../build.sh", "-p", sys.executable] + if os.getenv("RAY_INSTALL_JAVA") == "1": + # Also build binaries for Java if the above env variable exists. + command += ["-l", "python,java"] + subprocess.check_call(command) # We also need to install pyarrow along with Ray, so make sure that the # relevant non-Python pyarrow files get copied.