diff --git a/java/test.sh b/java/test.sh index 70ef6ebbbbfd..36a92f259462 100755 --- a/java/test.sh +++ b/java/test.sh @@ -52,7 +52,7 @@ case "${OSTYPE}" in darwin*) ip=$(ipconfig getifaddr en0);; *) echo "Can't get ip address for ${OSTYPE}"; exit 1;; esac -RAY_BACKEND_LOG_LEVEL=debug ray start --head --redis-port=6379 --redis-password=123456 --include-java --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" +RAY_BACKEND_LOG_LEVEL=debug ray start --head --redis-port=6379 --redis-password=123456 --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" RAY_BACKEND_LOG_LEVEL=debug java -cp bazel-bin/java/all_tests_deploy.jar -Dray.redis.address="$ip:6379"\ -Dray.redis.password='123456' -Dray.job.code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" io.ray.test.MultiDriverTest ray stop diff --git a/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java b/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java index 2aeb909887a1..bfe8daa4d9f4 100644 --- a/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java +++ b/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java @@ -82,7 +82,6 @@ public void setUp() { String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME), String.format("--node-manager-port=%s", nodeManagerPort), "--load-code-from-local", - "--include-java", "--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters), "--code-search-path=" + String.join(":", classpath) ); diff --git a/python/ray/node.py b/python/ray/node.py index a65cc1e876a8..a5370b2e3436 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -178,8 +178,6 @@ def __init__(self, else: self._webui_url = ( ray.services.get_webui_url_from_redis(redis_client)) - ray_params.include_java = ( - ray.services.include_java_from_redis(redis_client)) if head or not connect_only: # We need to start a local raylet. @@ -576,7 +574,6 @@ def start_redis(self): redis_max_clients=self._ray_params.redis_max_clients, redirect_worker_output=True, password=self._ray_params.redis_password, - include_java=self._ray_params.include_java, fate_share=self.kernel_fate_share) assert ( ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes) @@ -720,7 +717,6 @@ def start_raylet(self, use_valgrind=False, use_profiler=False): stdout_file=stdout_file, stderr_file=stderr_file, config=self._config, - include_java=self._ray_params.include_java, java_worker_options=self._ray_params.java_worker_options, load_code_from_local=self._ray_params.load_code_from_local, plasma_directory=self._ray_params.plasma_directory, diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 4a4ec4e85d63..811d9539ac35 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -84,8 +84,6 @@ class RayParams: monitor the log files for all processes on this node and push their contents to Redis. autoscaling_config: path to autoscaling config file. - include_java (bool): If True, the raylet backend can also support - Java worker. java_worker_options (list): The command options for Java worker. load_code_from_local: Whether load code from local file or from GCS. metrics_agent_port(int): The port to bind metrics agent. @@ -138,7 +136,6 @@ def __init__(self, temp_dir=None, include_log_monitor=None, autoscaling_config=None, - include_java=False, java_worker_options=None, load_code_from_local=False, start_initial_python_workers_for_first_job=False, @@ -183,7 +180,6 @@ def __init__(self, self.temp_dir = temp_dir self.include_log_monitor = include_log_monitor self.autoscaling_config = autoscaling_config - self.include_java = include_java self.java_worker_options = java_worker_options self.load_code_from_local = load_code_from_local self.metrics_agent_port = metrics_agent_port diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 1d5a0bd053eb..6dfc78deecf0 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -343,11 +343,6 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): "--temp-dir", default=None, help="manually specify the root temporary dir of the Ray process") -@click.option( - "--include-java", - is_flag=True, - default=None, - help="Enable Java worker support.") @click.option( "--java-worker-options", required=False, @@ -397,7 +392,7 @@ def start(node_ip_address, redis_address, address, redis_port, port, head, include_webui, webui_host, include_dashboard, dashboard_host, dashboard_port, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, - plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, + plasma_store_socket_name, raylet_socket_name, temp_dir, java_worker_options, code_search_path, load_code_from_local, system_config, lru_evict, enable_object_reconstruction, metrics_export_port, log_style, log_color, verbose): @@ -505,7 +500,6 @@ def start(node_ip_address, redis_address, address, redis_port, port, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, temp_dir=temp_dir, - include_java=include_java, include_dashboard=include_dashboard, dashboard_host=dashboard_host, dashboard_port=dashboard_port, @@ -564,7 +558,6 @@ def start(node_ip_address, redis_address, address, redis_port, port, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, autoscaling_config=autoscaling_config, - include_java=False, ) node = ray.node.Node( @@ -671,12 +664,6 @@ def start(node_ip_address, redis_address, address, redis_port, port, raise ValueError( "If --head is not passed in, the --include-dashboard" "flag is not relevant.") - if include_java is not None: - cli_logger.abort("`{}` should not be specified without `{}`.", - cf.bold("--include-java"), cf.bold("--head")) - - raise ValueError("--include-java should only be set for the head " - "node.") # Wait for the Redis server to be started. And throw an exception if we # can't connect to it. diff --git a/python/ray/services.py b/python/ray/services.py index b2b1380a83fa..04ad2d9e6812 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -123,18 +123,6 @@ def new_port(): return random.randint(10000, 65535) -def include_java_from_redis(redis_client): - """This is used for query include_java bool from redis. - - Args: - redis_client (StrictRedis): The redis client to GCS. - - Returns: - True if this cluster backend enables Java worker. - """ - return redis_client.get("INCLUDE_JAVA") == b"1" - - def find_redis_address_or_die(): pids = psutil.pids() redis_addresses = set() @@ -683,7 +671,6 @@ def start_redis(node_ip_address, redirect_worker_output=False, password=None, use_credis=None, - include_java=False, fate_share=None): """Start the Redis global state store. @@ -709,8 +696,6 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. - include_java (bool): If True, the raylet backend can also support - Java worker. Returns: A tuple of the address for the primary Redis shard, a list of @@ -784,10 +769,6 @@ def start_redis(node_ip_address, primary_redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0) - # put the include_java bool to primary redis-server, so that other nodes - # can access it and know whether or not to enable cross-languages. - primary_redis_client.set("INCLUDE_JAVA", 1 if include_java else 0) - # Init job counter to GCS. primary_redis_client.set("JobCounter", 0) @@ -1267,7 +1248,6 @@ def start_raylet(redis_address, stdout_file=None, stderr_file=None, config=None, - include_java=False, java_worker_options=None, load_code_from_local=False, plasma_directory=None, @@ -1312,8 +1292,6 @@ def start_raylet(redis_address, no redirection should happen, then this should be None. config (dict|None): Optional Raylet configuration that will override defaults in RayConfig. - include_java (bool): If True, the raylet backend can also support - Java worker. java_worker_options (list): The command options for Java worker. code_search_path (list): Code search path for worker. code_search_path is added to worker command in non-multi-tenancy mode and job_config @@ -1345,6 +1323,26 @@ def start_raylet(redis_address, gcs_ip_address, gcs_port = redis_address.split(":") + has_java_command = False + try: + java_proc = subprocess.run( + ["java", "-version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if java_proc.returncode == 0: + has_java_command = True + except OSError: + pass + + ray_java_installed = False + try: + jars_dir = get_ray_jars_dir() + if os.path.exists(jars_dir): + ray_java_installed = True + except Exception: + pass + + include_java = has_java_command and ray_java_installed if include_java is True: java_worker_command = build_java_worker_command( json.loads(java_worker_options) if java_worker_options else [], diff --git a/python/ray/tests/test_cross_language.py b/python/ray/tests/test_cross_language.py index 9ba24a980628..3904f63df135 100644 --- a/python/ray/tests/test_cross_language.py +++ b/python/ray/tests/test_cross_language.py @@ -6,7 +6,7 @@ def test_cross_language_raise_kwargs(shutdown_only): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) with pytest.raises(Exception, match="kwargs"): ray.java_function("a", "b").remote(x="arg1") @@ -16,7 +16,7 @@ def test_cross_language_raise_kwargs(shutdown_only): def test_cross_language_raise_exception(shutdown_only): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) class PythonObject(object): pass diff --git a/python/ray/worker.py b/python/ray/worker.py index 851f4933e655..536f9d7a13ad 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -491,7 +491,6 @@ def init( _driver_object_store_memory=None, _memory=None, _redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, - _include_java=False, _java_worker_options=None, _code_search_path=None, _temp_dir=None, @@ -580,8 +579,6 @@ def init( _memory: Amount of reservable memory resource to create. _redis_password (str): Prevents external clients without the password from connecting to Redis if provided. - _include_java: Boolean flag indicating whether or not to enable java - workers. _temp_dir (str): If provided, specifies the root temporary directory for the Ray process. Defaults to an OS-specific conventional location, e.g., "/tmp/ray". @@ -673,7 +670,6 @@ def init( redis_password=_redis_password, plasma_directory=None, huge_pages=None, - include_java=_include_java, include_dashboard=include_dashboard, dashboard_host=dashboard_host, dashboard_port=dashboard_port, diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index caf4f67e8d9c..eff4d4ffebef 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -133,6 +133,10 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, void WorkerPool::Start(int num_workers) { RAY_CHECK(!RayConfig::instance().enable_multi_tenancy()); for (auto &entry : states_by_lang_) { + if (entry.first == Language::JAVA) { + // Disable initial workers for Java. + continue; + } auto &state = entry.second; int num_worker_processes = static_cast( std::ceil(static_cast(num_workers) / state.num_workers_per_process)); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 56b251e66069..2d1a831d389f 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -273,13 +273,10 @@ TEST_P(WorkerPoolTest, StartupJavaWorkerProcessCount) { TEST_P(WorkerPoolTest, InitialWorkerProcessCount) { if (!RayConfig::instance().enable_multi_tenancy()) { worker_pool_->Start(1); - // Here we try to start only 1 worker for each worker language. But since each Java - // worker process contains exactly NUM_WORKERS_PER_PROCESS_JAVA (3) workers here, - // it's expected to see 3 workers for Java and 1 worker for Python, instead of 1 for - // each worker language. - ASSERT_NE(worker_pool_->NumWorkersStarting(), 1 * LANGUAGES.size()); - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 1 + NUM_WORKERS_PER_PROCESS_JAVA); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), LANGUAGES.size()); + // Here we try to start only 1 worker for each worker language. But since we disabled + // initial workers for Java, we expect to see only 1 worker which is a Python worker. + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 1); + ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1); } else { ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0); ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0); diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java index 1c820e6f23c6..e331208247ed 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java @@ -68,7 +68,6 @@ static synchronized void startCluster(boolean isCrossLanguage, boolean isLocal) String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME), String.format("--node-manager-port=%s", nodeManagerPort), "--load-code-from-local", - "--include-java", "--java-worker-options=" + workerOptions, "--system-config=" + new Gson().toJson(config) ); diff --git a/streaming/python/examples/wordcount.py b/streaming/python/examples/wordcount.py index 66b1a811272d..2f62b19dad54 100644 --- a/streaming/python/examples/wordcount.py +++ b/streaming/python/examples/wordcount.py @@ -65,7 +65,7 @@ def splitter(line): args = parser.parse_args() titles_file = str(args.titles_file) - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder() \ .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \ diff --git a/streaming/python/tests/test_failover.py b/streaming/python/tests/test_failover.py index def93f43edc2..adab217e09b3 100644 --- a/streaming/python/tests/test_failover.py +++ b/streaming/python/tests/test_failover.py @@ -8,7 +8,7 @@ def test_word_count(): try: - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) # time.sleep(10) # for gdb to attach ctx = StreamingContext.Builder() \ .option("streaming.context-backend.type", "local_file") \ diff --git a/streaming/python/tests/test_hybrid_stream.py b/streaming/python/tests/test_hybrid_stream.py index 7d79b9a0ef4d..e257f0d9fd5a 100644 --- a/streaming/python/tests/test_hybrid_stream.py +++ b/streaming/python/tests/test_hybrid_stream.py @@ -35,7 +35,6 @@ def test_hybrid_stream(): assert not ray.is_initialized() ray.init( _load_code_from_local=True, - _include_java=True, _java_worker_options=java_worker_options, _system_config={"num_workers_per_process_java": 1}) diff --git a/streaming/python/tests/test_stream.py b/streaming/python/tests/test_stream.py index 06dbeba850a5..f99033d19959 100644 --- a/streaming/python/tests/test_stream.py +++ b/streaming/python/tests/test_stream.py @@ -3,7 +3,7 @@ def test_data_stream(): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder().build() stream = ctx.from_values(1, 2, 3) java_stream = stream.as_java_stream() @@ -17,7 +17,7 @@ def test_data_stream(): def test_key_data_stream(): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder().build() key_stream = ctx.from_values( "a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0]) @@ -32,7 +32,7 @@ def test_key_data_stream(): def test_stream_config(): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder().build() stream = ctx.from_values(1, 2, 3) stream.with_config("k1", "v1") diff --git a/streaming/python/tests/test_union_stream.py b/streaming/python/tests/test_union_stream.py index 4f24226c4b9f..0c655b1d03d7 100644 --- a/streaming/python/tests/test_union_stream.py +++ b/streaming/python/tests/test_union_stream.py @@ -5,7 +5,7 @@ def test_union_stream(): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder() \ .option("streaming.metrics.reporters", "") \ .build() diff --git a/streaming/python/tests/test_word_count.py b/streaming/python/tests/test_word_count.py index 07127b96ed10..372ae3e1e44e 100644 --- a/streaming/python/tests/test_word_count.py +++ b/streaming/python/tests/test_word_count.py @@ -4,7 +4,7 @@ def test_word_count(): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder() \ .build() ctx.read_text_file(__file__) \ @@ -23,7 +23,7 @@ def test_word_count(): def test_simple_word_count(): - ray.init(_load_code_from_local=True, _include_java=True) + ray.init(_load_code_from_local=True) ctx = StreamingContext.Builder() \ .build() sink_file = "/tmp/ray_streaming_test_simple_word_count.txt"