1111import os
1212import shutil
1313import signal
14+ import subprocess
1415import tempfile
1516import time
1617import unittest
1718from contextlib import contextmanager
19+ from dataclasses import dataclass
1820from datetime import datetime
1921from os .path import join
2022from typing import Callable , Generator , Optional
3234 join_PATH ,
3335 make_unique ,
3436)
35- from torchx .specs .api import AppDef , AppState , Role , is_terminal , macros
37+ from torchx .specs .api import AppDef , AppState , Role , is_terminal , macros , Resource
3638
3739from .test_util import write_shell_script
3840
@@ -828,6 +830,114 @@ def test_close_twice(self) -> None:
828830 self .scheduler .close ()
829831 # nothing to validate just make sure no errors are raised
830832
833+ def test_get_gpu_device_count (self ) -> None :
834+ @dataclass
835+ class ProcResult :
836+ stdout : str
837+
838+ nvidia_smi_out = (
839+ "GPU 0: Tesla V100-SXM2-16GB (UUID: GPU-196a22c5-717b-66db-0acc-58cde6f3df85)\n "
840+ "GPU 1: Tesla V100-SXM2-16GB (UUID: GPU-45e9165d-4f7e-d954-7ff5-481bc2c0ec7b)\n "
841+ "GPU 2: Tesla V100-SXM2-16GB (UUID: GPU-26e22503-5fd5-8f55-d068-e1714fbb6fd6)\n "
842+ "GPU 3: Tesla V100-SXM2-16GB (UUID: GPU-ebfc20c7-5f1a-1bc9-0d98-601cbe21fc2d)\n "
843+ )
844+
845+ stdout = nvidia_smi_out
846+ result = ProcResult (stdout )
847+ with patch ("subprocess.run" , return_value = result ):
848+ gpu_count = self .scheduler ._get_gpu_device_count ()
849+ self .assertEqual (4 , gpu_count )
850+
851+ def test_get_gpu_device_count_error (self ) -> None :
852+ error = subprocess .CalledProcessError (
853+ returncode = 2 ,
854+ cmd = "" ,
855+ output = "" ,
856+ stderr = "" ,
857+ )
858+ with patch ("subprocess.run" , side_effect = error ):
859+ gpu_count = self .scheduler ._get_gpu_device_count ()
860+ self .assertEqual (0 , gpu_count )
861+
862+ def test_set_cuda_visible_devices_for_role_replica (self ) -> None :
863+ replica_param1 = ReplicaParam (
864+ args = ["a" , "b" ],
865+ env = {},
866+ cwd = "/home/bob" ,
867+ )
868+ replica_param2 = ReplicaParam (
869+ args = ["a" , "b" ],
870+ env = {},
871+ cwd = "/home/bob" ,
872+ )
873+ self .scheduler ._set_cuda_visible_devices_for_role_replica (
874+ replica_param1 , 0 , 4 , 0
875+ )
876+ self .assertEqual ("0,1,2,3" , replica_param1 .env ["CUDA_VISIBLE_DEVICES" ])
877+ self .scheduler ._set_cuda_visible_devices_for_role_replica (
878+ replica_param2 , 1 , 8 , 4
879+ )
880+ # start gpu is 4(request_gpu_start=4) + 8(replica_id=1)
881+ self .assertEqual (
882+ "12,13,14,15,16,17,18,19" , replica_param2 .env ["CUDA_VISIBLE_DEVICES" ]
883+ )
884+
885+ def test_get_cuda_devices_is_set (self ) -> None :
886+ with patch .object (self .scheduler , "_get_gpu_device_count" , return_value = 16 ):
887+ appdef = AppDef (
888+ name = "role1" ,
889+ roles = [
890+ Role (
891+ name = "role1" ,
892+ image = self .test_dir ,
893+ entrypoint = "train" ,
894+ resource = Resource (gpu = 2 , cpu = 0 , memMB = 0 ),
895+ num_replicas = 2 ,
896+ ),
897+ Role (
898+ name = "role2" ,
899+ image = self .test_dir ,
900+ entrypoint = "train" ,
901+ resource = Resource (gpu = 3 , cpu = 0 , memMB = 0 ),
902+ num_replicas = 2 ,
903+ ),
904+ ],
905+ )
906+ popen_req = self .scheduler ._to_popen_request (
907+ appdef , {"auto_set_cuda_visible_devices" : True }
908+ )
909+ role1_params = popen_req .role_params ["role1" ]
910+ self .assertEqual (2 , len (role1_params ))
911+ self .assertEqual ("0,1" , role1_params [0 ].env ["CUDA_VISIBLE_DEVICES" ])
912+ self .assertEqual ("2,3" , role1_params [1 ].env ["CUDA_VISIBLE_DEVICES" ])
913+ role2_params = popen_req .role_params ["role2" ]
914+ self .assertEqual (2 , len (role2_params ))
915+ self .assertEqual ("4,5,6" , role2_params [0 ].env ["CUDA_VISIBLE_DEVICES" ])
916+ self .assertEqual ("7,8,9" , role2_params [1 ].env ["CUDA_VISIBLE_DEVICES" ])
917+
918+ def test_get_cuda_devices_not_set (self ) -> None :
919+ with patch .object (self .scheduler , "_get_gpu_device_count" , return_value = 8 ):
920+ trainer1 = AppDef (
921+ name = "trainer1" ,
922+ roles = [
923+ Role (
924+ name = "trainer1" ,
925+ image = self .test_dir ,
926+ entrypoint = "trainer1.sh" ,
927+ resource = Resource (gpu = 4 , cpu = 0 , memMB = 0 ),
928+ num_replicas = 4 ,
929+ )
930+ ],
931+ )
932+
933+ popen_req = self .scheduler ._to_popen_request (trainer1 , {})
934+ role_params = popen_req .role_params ["trainer1" ]
935+ self .assertEqual (4 , len (role_params ))
936+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [0 ].env )
937+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [1 ].env )
938+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [2 ].env )
939+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [3 ].env )
940+
831941 def test_no_orphan_process_function (self ) -> None :
832942 self ._test_orphan_workflow ()
833943
@@ -839,6 +949,9 @@ def _test_orphan_workflow(self) -> None:
839949 target = start_sleep_processes , args = (self .test_dir , mp_queue , child_nproc )
840950 )
841951 proc .start ()
952+ # Before querying the queue we need to wait
953+ # Otherwise we will get `FileNotFoundError: [Errno 2] No such file or directory` error
954+ time .sleep (10 )
842955 total_processes = child_nproc + 1
843956 pids = []
844957 for _ in range (total_processes ):
0 commit comments