diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index cd646be1fa37..5492964f9763 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -8,6 +8,7 @@ import subprocess import sys import time +import json from ray.autoscaler.docker import check_docker_running_cmd, \ check_docker_image, \ @@ -81,18 +82,9 @@ def _with_environment_variables(cmd: str, automatically be converted to a one line yaml string. """ - def dict_as_one_line_yaml(d): - items = [] - for key, val in d.items(): - item_str = "{}: {}".format(quote(str(key)), quote(str(val))) - items.append(item_str) - - return "{" + ",".join(items) + "}" - as_strings = [] for key, val in environment_variables.items(): - if isinstance(val, dict): - val = dict_as_one_line_yaml(val) + val = json.dumps(val, separators=(",", ":")) s = "export {}={};".format(key, quote(val)) as_strings.append(s) all_vars = "".join(as_strings) @@ -102,7 +94,6 @@ def dict_as_one_line_yaml(d): def _with_interactive(cmd): force_interactive = ("true && source ~/.bashrc && " "export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ") - return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)] diff --git a/python/ray/node.py b/python/ray/node.py index 45bc3c8b10d2..fdc59330de9a 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -257,9 +257,15 @@ def get_resource_spec(self): """Resolve and return the current resource spec for the node.""" def merge_resources(env_dict, params_dict): - """Merge two dictionaries, picking from the second in the event of a conflict. - Also emit a warning on every conflict. + """Separates special case params and merges two dictionaries, picking from the + first in the event of a conflict. Also emit a warning on every + conflict. """ + num_cpus = env_dict.pop("CPU", None) + num_gpus = env_dict.pop("GPU", None) + memory = env_dict.pop("memory", None) + object_store_memory = env_dict.pop("object_store_memory", None) + result = params_dict.copy() result.update(env_dict) @@ -268,19 +274,24 @@ def merge_resources(env_dict, params_dict): logger.warning("Autoscaler is overriding your resource:" "{}: {} with {}.".format( key, params_dict[key], env_dict[key])) - return result + return num_cpus, num_gpus, memory, object_store_memory, result env_resources = {} env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE) if env_string: env_resources = json.loads(env_string) + logger.info(f"Autosaler overriding resources: {env_resources}.") if not self._resource_spec: - resources = merge_resources(env_resources, - self._ray_params.resources) + num_cpus, num_gpus, memory, object_store_memory, resources = \ + merge_resources(env_resources, self._ray_params.resources) self._resource_spec = ResourceSpec( - self._ray_params.num_cpus, self._ray_params.num_gpus, - self._ray_params.memory, self._ray_params.object_store_memory, + self._ray_params.num_cpus + if num_cpus is None else num_cpus, self._ray_params.num_gpus + if num_gpus is None else num_gpus, self._ray_params.memory + if memory is None else memory, + self._ray_params.object_store_memory + if object_store_memory is None else object_store_memory, resources, self._ray_params.redis_max_memory).resolve( is_head=self.head, node_ip_address=self.node_ip_address) return self._resource_spec diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 1ec1d78218ff..0f1de8666abe 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -656,7 +656,8 @@ def test_ray_address_environment_variable(ray_start_cluster): def test_ray_resources_environment_variable(ray_start_cluster): address = ray_start_cluster.address - os.environ["RAY_OVERRIDE_RESOURCES"] = "{\"custom1\":1, \"custom2\":2}" + os.environ[ + "RAY_OVERRIDE_RESOURCES"] = "{\"custom1\":1, \"custom2\":2, \"CPU\":3}" ray.init(address=address, resources={"custom1": 3, "custom3": 3}) cluster_resources = ray.cluster_resources() @@ -664,6 +665,7 @@ def test_ray_resources_environment_variable(ray_start_cluster): assert cluster_resources["custom1"] == 1 assert cluster_resources["custom2"] == 2 assert cluster_resources["custom3"] == 3 + assert cluster_resources["CPU"] == 3 def test_gpu_info_parsing(): diff --git a/python/ray/tests/test_command_runner.py b/python/ray/tests/test_command_runner.py index c540a6989412..164ee32aa96a 100644 --- a/python/ray/tests/test_command_runner.py +++ b/python/ray/tests/test_command_runner.py @@ -14,7 +14,7 @@ def test_environment_variable_encoder_strings(): env_vars = {"var1": "quote between this \" and this", "var2": "123"} res = _with_environment_variables("echo hello", env_vars) - expected = """export var1='quote between this " and this';export var2=123;echo hello""" # noqa: E501 + expected = """export var1='"quote between this \\" and this"';export var2='"123"';echo hello""" # noqa: E501 assert res == expected @@ -22,7 +22,7 @@ def test_environment_variable_encoder_dict(): env_vars = {"value1": "string1", "value2": {"a": "b", "c": 2}} res = _with_environment_variables("echo hello", env_vars) - expected = """export value1=string1;export value2='{a: b,c: 2}';echo hello""" # noqa: E501 + expected = """export value1='"string1"';export value2='{"a":"b","c":2}';echo hello""" # noqa: E501 assert res == expected @@ -84,7 +84,7 @@ def test_ssh_command_runner(): "--login", "-c", "-i", - """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'quote between this " and this'"'"';export var2=123;echo helloo'""" # noqa: E501 + """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"quote between this \\" and this"'"'"';export var2='"'"'"123"'"'"';echo helloo'""" # noqa: E501 ] # Much easier to debug this loop than the function call. @@ -122,7 +122,7 @@ def test_docker_command_runner(): # This string is insane because there are an absurd number of embedded # quotes. While this is a ridiculous string, the escape behavior is # important and somewhat difficult to get right for environment variables. - cmd = """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && docker exec -it container /bin/bash -c '"'"'bash --login -c -i '"'"'"'"'"'"'"'"'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'quote between this " and this'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';export var2=123;echo hello'"'"'"'"'"'"'"'"''"'"' '""" # noqa: E501 + cmd = """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && docker exec -it container /bin/bash -c '"'"'bash --login -c -i '"'"'"'"'"'"'"'"'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"quote between this \\" and this"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';export var2='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"123"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';echo hello'"'"'"'"'"'"'"'"''"'"' '""" # noqa: E501 expected = [ "ssh", "-tt", "-i", "8265.pem", "-o", "StrictHostKeyChecking=no", "-o", @@ -135,6 +135,8 @@ def test_docker_command_runner(): ] # Much easier to debug this loop than the function call. for x, y in zip(process_runner.calls[0], expected): + print(f"expeted:\t{y}") + print(f"actual: \t{x}") assert x == y process_runner.assert_has_call("1.2.3.4", exact=expected) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index fbc731654b77..5d913c9a5e09 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -366,10 +366,10 @@ def testResourcePassing(self): # These checks are done separately because we have no guarantees on the # order the dict is serialized in. runner.assert_has_call("172.0.0.0", "RAY_OVERRIDE_RESOURCES=") - runner.assert_has_call("172.0.0.0", "CPU: 2") + runner.assert_has_call("172.0.0.0", "\"CPU\":2") runner.assert_has_call("172.0.0.1", "RAY_OVERRIDE_RESOURCES=") - runner.assert_has_call("172.0.0.1", "CPU: 32") - runner.assert_has_call("172.0.0.1", "GPU: 8") + runner.assert_has_call("172.0.0.1", "\"CPU\":32") + runner.assert_has_call("172.0.0.1", "\"GPU\":8") def testScaleUpLoadMetrics(self): config = MULTI_WORKER_CLUSTER.copy()