Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions python/ray/autoscaler/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import subprocess
import sys
import time
import json

from ray.autoscaler.docker import check_docker_running_cmd, \
check_docker_image, \
Expand Down Expand Up @@ -81,18 +82,9 @@ def _with_environment_variables(cmd: str,
automatically be converted to a one line yaml string.
"""

def dict_as_one_line_yaml(d):
items = []
for key, val in d.items():
item_str = "{}: {}".format(quote(str(key)), quote(str(val)))
items.append(item_str)

return "{" + ",".join(items) + "}"

as_strings = []
for key, val in environment_variables.items():
if isinstance(val, dict):
val = dict_as_one_line_yaml(val)
val = json.dumps(val, separators=(",", ":"))
s = "export {}={};".format(key, quote(val))
as_strings.append(s)
all_vars = "".join(as_strings)
Expand All @@ -102,7 +94,6 @@ def dict_as_one_line_yaml(d):
def _with_interactive(cmd):
force_interactive = ("true && source ~/.bashrc && "
"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ")

return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)]


Expand Down
25 changes: 18 additions & 7 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,15 @@ def get_resource_spec(self):
"""Resolve and return the current resource spec for the node."""

def merge_resources(env_dict, params_dict):
"""Merge two dictionaries, picking from the second in the event of a conflict.
Also emit a warning on every conflict.
"""Separates special case params and merges two dictionaries, picking from the
first in the event of a conflict. Also emit a warning on every
conflict.
"""
num_cpus = env_dict.pop("CPU", None)
num_gpus = env_dict.pop("GPU", None)
memory = env_dict.pop("memory", None)
object_store_memory = env_dict.pop("object_store_memory", None)

result = params_dict.copy()
result.update(env_dict)

Expand All @@ -268,19 +274,24 @@ def merge_resources(env_dict, params_dict):
logger.warning("Autoscaler is overriding your resource:"
"{}: {} with {}.".format(
key, params_dict[key], env_dict[key]))
return result
return num_cpus, num_gpus, memory, object_store_memory, result

env_resources = {}
env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
if env_string:
env_resources = json.loads(env_string)
logger.info(f"Autosaler overriding resources: {env_resources}.")

if not self._resource_spec:
resources = merge_resources(env_resources,
self._ray_params.resources)
num_cpus, num_gpus, memory, object_store_memory, resources = \
merge_resources(env_resources, self._ray_params.resources)
self._resource_spec = ResourceSpec(
self._ray_params.num_cpus, self._ray_params.num_gpus,
self._ray_params.memory, self._ray_params.object_store_memory,
self._ray_params.num_cpus
if num_cpus is None else num_cpus, self._ray_params.num_gpus
if num_gpus is None else num_gpus, self._ray_params.memory
if memory is None else memory,
self._ray_params.object_store_memory
if object_store_memory is None else object_store_memory,
resources, self._ray_params.redis_max_memory).resolve(
is_head=self.head, node_ip_address=self.node_ip_address)
return self._resource_spec
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_advanced_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,14 +656,16 @@ def test_ray_address_environment_variable(ray_start_cluster):
def test_ray_resources_environment_variable(ray_start_cluster):
address = ray_start_cluster.address

os.environ["RAY_OVERRIDE_RESOURCES"] = "{\"custom1\":1, \"custom2\":2}"
os.environ[
"RAY_OVERRIDE_RESOURCES"] = "{\"custom1\":1, \"custom2\":2, \"CPU\":3}"
ray.init(address=address, resources={"custom1": 3, "custom3": 3})

cluster_resources = ray.cluster_resources()
print(cluster_resources)
assert cluster_resources["custom1"] == 1
assert cluster_resources["custom2"] == 2
assert cluster_resources["custom3"] == 3
assert cluster_resources["CPU"] == 3


def test_gpu_info_parsing():
Expand Down
10 changes: 6 additions & 4 deletions python/ray/tests/test_command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
def test_environment_variable_encoder_strings():
env_vars = {"var1": "quote between this \" and this", "var2": "123"}
res = _with_environment_variables("echo hello", env_vars)
expected = """export var1='quote between this " and this';export var2=123;echo hello""" # noqa: E501
expected = """export var1='"quote between this \\" and this"';export var2='"123"';echo hello""" # noqa: E501
assert res == expected


def test_environment_variable_encoder_dict():
env_vars = {"value1": "string1", "value2": {"a": "b", "c": 2}}
res = _with_environment_variables("echo hello", env_vars)

expected = """export value1=string1;export value2='{a: b,c: 2}';echo hello""" # noqa: E501
expected = """export value1='"string1"';export value2='{"a":"b","c":2}';echo hello""" # noqa: E501
assert res == expected


Expand Down Expand Up @@ -84,7 +84,7 @@ def test_ssh_command_runner():
"--login",
"-c",
"-i",
"""'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'quote between this " and this'"'"';export var2=123;echo helloo'""" # noqa: E501
"""'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"quote between this \\" and this"'"'"';export var2='"'"'"123"'"'"';echo helloo'""" # noqa: E501
]

# Much easier to debug this loop than the function call.
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_docker_command_runner():
# This string is insane because there are an absurd number of embedded
# quotes. While this is a ridiculous string, the escape behavior is
# important and somewhat difficult to get right for environment variables.
cmd = """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && docker exec -it container /bin/bash -c '"'"'bash --login -c -i '"'"'"'"'"'"'"'"'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'quote between this " and this'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';export var2=123;echo hello'"'"'"'"'"'"'"'"''"'"' '""" # noqa: E501
cmd = """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && docker exec -it container /bin/bash -c '"'"'bash --login -c -i '"'"'"'"'"'"'"'"'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"quote between this \\" and this"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';export var2='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"123"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';echo hello'"'"'"'"'"'"'"'"''"'"' '""" # noqa: E501

expected = [
"ssh", "-tt", "-i", "8265.pem", "-o", "StrictHostKeyChecking=no", "-o",
Expand All @@ -135,6 +135,8 @@ def test_docker_command_runner():
]
# Much easier to debug this loop than the function call.
for x, y in zip(process_runner.calls[0], expected):
print(f"expeted:\t{y}")
print(f"actual: \t{x}")
assert x == y
process_runner.assert_has_call("1.2.3.4", exact=expected)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/test_resource_demand_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,10 @@ def testResourcePassing(self):
# These checks are done separately because we have no guarantees on the
# order the dict is serialized in.
runner.assert_has_call("172.0.0.0", "RAY_OVERRIDE_RESOURCES=")
runner.assert_has_call("172.0.0.0", "CPU: 2")
runner.assert_has_call("172.0.0.0", "\"CPU\":2")
runner.assert_has_call("172.0.0.1", "RAY_OVERRIDE_RESOURCES=")
runner.assert_has_call("172.0.0.1", "CPU: 32")
runner.assert_has_call("172.0.0.1", "GPU: 8")
runner.assert_has_call("172.0.0.1", "\"CPU\":32")
runner.assert_has_call("172.0.0.1", "\"GPU\":8")

def testScaleUpLoadMetrics(self):
config = MULTI_WORKER_CLUSTER.copy()
Expand Down