Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case "${OSTYPE}" in
darwin*) ip=$(ipconfig getifaddr en0);;
*) echo "Can't get ip address for ${OSTYPE}"; exit 1;;
esac
RAY_BACKEND_LOG_LEVEL=debug ray start --head --redis-port=6379 --redis-password=123456 --include-java --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar"
RAY_BACKEND_LOG_LEVEL=debug ray start --head --redis-port=6379 --redis-password=123456 --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar"
RAY_BACKEND_LOG_LEVEL=debug java -cp bazel-bin/java/all_tests_deploy.jar -Dray.redis.address="$ip:6379"\
-Dray.redis.password='123456' -Dray.job.code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" io.ray.test.MultiDriverTest
ray stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public void setUp() {
String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME),
String.format("--node-manager-port=%s", nodeManagerPort),
"--load-code-from-local",
"--include-java",
"--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters),
"--code-search-path=" + String.join(":", classpath)
);
Expand Down
4 changes: 0 additions & 4 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ def __init__(self,
else:
self._webui_url = (
ray.services.get_webui_url_from_redis(redis_client))
ray_params.include_java = (
ray.services.include_java_from_redis(redis_client))

if head or not connect_only:
# We need to start a local raylet.
Expand Down Expand Up @@ -576,7 +574,6 @@ def start_redis(self):
redis_max_clients=self._ray_params.redis_max_clients,
redirect_worker_output=True,
password=self._ray_params.redis_password,
include_java=self._ray_params.include_java,
fate_share=self.kernel_fate_share)
assert (
ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes)
Expand Down Expand Up @@ -720,7 +717,6 @@ def start_raylet(self, use_valgrind=False, use_profiler=False):
stdout_file=stdout_file,
stderr_file=stderr_file,
config=self._config,
include_java=self._ray_params.include_java,
java_worker_options=self._ray_params.java_worker_options,
load_code_from_local=self._ray_params.load_code_from_local,
plasma_directory=self._ray_params.plasma_directory,
Expand Down
4 changes: 0 additions & 4 deletions python/ray/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ class RayParams:
monitor the log files for all processes on this node and push their
contents to Redis.
autoscaling_config: path to autoscaling config file.
include_java (bool): If True, the raylet backend can also support
Java worker.
java_worker_options (list): The command options for Java worker.
load_code_from_local: Whether load code from local file or from GCS.
metrics_agent_port(int): The port to bind metrics agent.
Expand Down Expand Up @@ -138,7 +136,6 @@ def __init__(self,
temp_dir=None,
include_log_monitor=None,
autoscaling_config=None,
include_java=False,
java_worker_options=None,
load_code_from_local=False,
start_initial_python_workers_for_first_job=False,
Expand Down Expand Up @@ -183,7 +180,6 @@ def __init__(self,
self.temp_dir = temp_dir
self.include_log_monitor = include_log_monitor
self.autoscaling_config = autoscaling_config
self.include_java = include_java
self.java_worker_options = java_worker_options
self.load_code_from_local = load_code_from_local
self.metrics_agent_port = metrics_agent_port
Expand Down
15 changes: 1 addition & 14 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,6 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
"--temp-dir",
default=None,
help="manually specify the root temporary dir of the Ray process")
@click.option(
"--include-java",
is_flag=True,
default=None,
help="Enable Java worker support.")
@click.option(
"--java-worker-options",
required=False,
Expand Down Expand Up @@ -397,7 +392,7 @@ def start(node_ip_address, redis_address, address, redis_port, port,
head, include_webui, webui_host, include_dashboard, dashboard_host,
dashboard_port, block, plasma_directory, huge_pages,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
plasma_store_socket_name, raylet_socket_name, temp_dir,
java_worker_options, code_search_path, load_code_from_local,
system_config, lru_evict, enable_object_reconstruction,
metrics_export_port, log_style, log_color, verbose):
Expand Down Expand Up @@ -505,7 +500,6 @@ def start(node_ip_address, redis_address, address, redis_port, port,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
include_java=include_java,
include_dashboard=include_dashboard,
dashboard_host=dashboard_host,
dashboard_port=dashboard_port,
Expand Down Expand Up @@ -564,7 +558,6 @@ def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
autoscaling_config=autoscaling_config,
include_java=False,
)

node = ray.node.Node(
Expand Down Expand Up @@ -671,12 +664,6 @@ def start(node_ip_address, redis_address, address, redis_port, port,
raise ValueError(
"If --head is not passed in, the --include-dashboard"
"flag is not relevant.")
if include_java is not None:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--include-java"), cf.bold("--head"))

raise ValueError("--include-java should only be set for the head "
"node.")

# Wait for the Redis server to be started. And throw an exception if we
# can't connect to it.
Expand Down
42 changes: 20 additions & 22 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,6 @@ def new_port():
return random.randint(10000, 65535)


def include_java_from_redis(redis_client):
"""This is used for query include_java bool from redis.

Args:
redis_client (StrictRedis): The redis client to GCS.

Returns:
True if this cluster backend enables Java worker.
"""
return redis_client.get("INCLUDE_JAVA") == b"1"


def find_redis_address_or_die():
pids = psutil.pids()
redis_addresses = set()
Expand Down Expand Up @@ -683,7 +671,6 @@ def start_redis(node_ip_address,
redirect_worker_output=False,
password=None,
use_credis=None,
include_java=False,
fate_share=None):
"""Start the Redis global state store.

Expand All @@ -709,8 +696,6 @@ def start_redis(node_ip_address,
use_credis: If True, additionally load the chain-replicated libraries
into the redis servers. Defaults to None, which means its value is
set by the presence of "RAY_USE_NEW_GCS" in os.environ.
include_java (bool): If True, the raylet backend can also support
Java worker.

Returns:
A tuple of the address for the primary Redis shard, a list of
Expand Down Expand Up @@ -784,10 +769,6 @@ def start_redis(node_ip_address,
primary_redis_client.set("RedirectOutput", 1
if redirect_worker_output else 0)

# put the include_java bool to primary redis-server, so that other nodes
# can access it and know whether or not to enable cross-languages.
primary_redis_client.set("INCLUDE_JAVA", 1 if include_java else 0)

# Init job counter to GCS.
primary_redis_client.set("JobCounter", 0)

Expand Down Expand Up @@ -1267,7 +1248,6 @@ def start_raylet(redis_address,
stdout_file=None,
stderr_file=None,
config=None,
include_java=False,
java_worker_options=None,
load_code_from_local=False,
plasma_directory=None,
Expand Down Expand Up @@ -1312,8 +1292,6 @@ def start_raylet(redis_address,
no redirection should happen, then this should be None.
config (dict|None): Optional Raylet configuration that will
override defaults in RayConfig.
include_java (bool): If True, the raylet backend can also support
Java worker.
java_worker_options (list): The command options for Java worker.
code_search_path (list): Code search path for worker. code_search_path
is added to worker command in non-multi-tenancy mode and job_config
Expand Down Expand Up @@ -1345,6 +1323,26 @@ def start_raylet(redis_address,

gcs_ip_address, gcs_port = redis_address.split(":")

has_java_command = False
try:
java_proc = subprocess.run(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this command output be printed to console?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. subprocess.PIPE means capturing the output.

["java", "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if java_proc.returncode == 0:
has_java_command = True
except OSError:
pass

ray_java_installed = False
try:
jars_dir = get_ray_jars_dir()
if os.path.exists(jars_dir):
ray_java_installed = True
except Exception:
pass

include_java = has_java_command and ray_java_installed
if include_java is True:
java_worker_command = build_java_worker_command(
json.loads(java_worker_options) if java_worker_options else [],
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_cross_language.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


def test_cross_language_raise_kwargs(shutdown_only):
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)

with pytest.raises(Exception, match="kwargs"):
ray.java_function("a", "b").remote(x="arg1")
Expand All @@ -16,7 +16,7 @@ def test_cross_language_raise_kwargs(shutdown_only):


def test_cross_language_raise_exception(shutdown_only):
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)

class PythonObject(object):
pass
Expand Down
4 changes: 0 additions & 4 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,6 @@ def init(
_driver_object_store_memory=None,
_memory=None,
_redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
_include_java=False,
_java_worker_options=None,
_code_search_path=None,
_temp_dir=None,
Expand Down Expand Up @@ -580,8 +579,6 @@ def init(
_memory: Amount of reservable memory resource to create.
_redis_password (str): Prevents external clients without the password
from connecting to Redis if provided.
_include_java: Boolean flag indicating whether or not to enable java
workers.
_temp_dir (str): If provided, specifies the root temporary
directory for the Ray process. Defaults to an OS-specific
conventional location, e.g., "/tmp/ray".
Expand Down Expand Up @@ -673,7 +670,6 @@ def init(
redis_password=_redis_password,
plasma_directory=None,
huge_pages=None,
include_java=_include_java,
include_dashboard=include_dashboard,
dashboard_host=dashboard_host,
dashboard_port=dashboard_port,
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
void WorkerPool::Start(int num_workers) {
RAY_CHECK(!RayConfig::instance().enable_multi_tenancy());
for (auto &entry : states_by_lang_) {
if (entry.first == Language::JAVA) {
// Disable initial workers for Java.
continue;
}
auto &state = entry.second;
int num_worker_processes = static_cast<int>(
std::ceil(static_cast<double>(num_workers) / state.num_workers_per_process));
Expand Down
11 changes: 4 additions & 7 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,10 @@ TEST_P(WorkerPoolTest, StartupJavaWorkerProcessCount) {
TEST_P(WorkerPoolTest, InitialWorkerProcessCount) {
if (!RayConfig::instance().enable_multi_tenancy()) {
worker_pool_->Start(1);
// Here we try to start only 1 worker for each worker language. But since each Java
// worker process contains exactly NUM_WORKERS_PER_PROCESS_JAVA (3) workers here,
// it's expected to see 3 workers for Java and 1 worker for Python, instead of 1 for
// each worker language.
ASSERT_NE(worker_pool_->NumWorkersStarting(), 1 * LANGUAGES.size());
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 1 + NUM_WORKERS_PER_PROCESS_JAVA);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), LANGUAGES.size());
// Here we try to start only 1 worker for each worker language. But since we disabled
// initial workers for Java, we expect to see only 1 worker which is a Python worker.
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 1);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1);
} else {
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ static synchronized void startCluster(boolean isCrossLanguage, boolean isLocal)
String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME),
String.format("--node-manager-port=%s", nodeManagerPort),
"--load-code-from-local",
"--include-java",
"--java-worker-options=" + workerOptions,
"--system-config=" + new Gson().toJson(config)
);
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/examples/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def splitter(line):
args = parser.parse_args()
titles_file = str(args.titles_file)

ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)

ctx = StreamingContext.Builder() \
.option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/tests/test_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def test_word_count():
try:
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
# time.sleep(10) # for gdb to attach
ctx = StreamingContext.Builder() \
.option("streaming.context-backend.type", "local_file") \
Expand Down
1 change: 0 additions & 1 deletion streaming/python/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def test_hybrid_stream():
assert not ray.is_initialized()
ray.init(
_load_code_from_local=True,
_include_java=True,
_java_worker_options=java_worker_options,
_system_config={"num_workers_per_process_java": 1})

Expand Down
6 changes: 3 additions & 3 deletions streaming/python/tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


def test_data_stream():
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
ctx = StreamingContext.Builder().build()
stream = ctx.from_values(1, 2, 3)
java_stream = stream.as_java_stream()
Expand All @@ -17,7 +17,7 @@ def test_data_stream():


def test_key_data_stream():
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
ctx = StreamingContext.Builder().build()
key_stream = ctx.from_values(
"a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])
Expand All @@ -32,7 +32,7 @@ def test_key_data_stream():


def test_stream_config():
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
ctx = StreamingContext.Builder().build()
stream = ctx.from_values(1, 2, 3)
stream.with_config("k1", "v1")
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/tests/test_union_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


def test_union_stream():
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
ctx = StreamingContext.Builder() \
.option("streaming.metrics.reporters", "") \
.build()
Expand Down
4 changes: 2 additions & 2 deletions streaming/python/tests/test_word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def test_word_count():
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
ctx = StreamingContext.Builder() \
.build()
ctx.read_text_file(__file__) \
Expand All @@ -23,7 +23,7 @@ def test_word_count():


def test_simple_word_count():
ray.init(_load_code_from_local=True, _include_java=True)
ray.init(_load_code_from_local=True)
ctx = StreamingContext.Builder() \
.build()
sink_file = "/tmp/ray_streaming_test_simple_word_count.txt"
Expand Down