From 3d88a79494ac2cef5461cce2cef0cab376d8bf0d Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Thu, 23 Jan 2025 20:52:27 +0100 Subject: [PATCH 01/11] fix tp --- src/transformers/modeling_utils.py | 88 ++++++++++++++++-------------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/src/transformers/modeling_utils.py b/src/transformers/modeling_utils.py index 9c99a6d770d1..731f7661f4e2 100755 --- a/src/transformers/modeling_utils.py +++ b/src/transformers/modeling_utils.py @@ -3442,6 +3442,26 @@ def from_pretrained( if tp_plan is not None and tp_plan != "auto": # TODO: we can relax this check when we support taking tp_plan from a json file, for example. raise ValueError(f"tp_plan supports 'auto' only for now but got {tp_plan}.") + + if tp_plan is not None and device_map is not None: + raise ValueError(f"`tp_plan` and `device_map` are mutually exclusive. Choose either one for parallelization.") + + # We need to correctly dispatch the model on the current process device. The easiest way for this is to use a simple + # `device_map` pointing to the correct device. If we don't, torch will use the default device (index 0) for all + # childs processes at parallelization time, resulting in excessive memory usage on device 0 and OOMs. + # And temporarily setting the default device to current process rank result in the following error + # `torch.distributed.DistBackendError: Attempt to perform collective on tensor not on device passed to init_process_group` + if tp_plan is not None: + if not torch.distributed.is_initialized(): + raise ValueError("Tensor Parallel requires torch.distributed to be initialized first.") + + # Detect the accelerator on the machine. If no accelerator is available, it returns CPU. + device_type = torch._C._get_accelerator().type + device_module = torch.get_device_module(device_type) + # Get device with index assuming equal number of devices per host + tp_device = torch.device(device_type, torch.distributed.get_rank() % device_module.device_count()) + # This is the easiest way to dispatch to the current process device + device_map = tp_device if is_fsdp_enabled(): low_cpu_mem_usage = True @@ -4106,16 +4126,6 @@ def from_pretrained( f"Using `low_cpu_mem_usage=True` or a `device_map` requires Accelerate: `pip install 'accelerate>={ACCELERATE_MIN_VERSION}'`" ) init_contexts.append(init_empty_weights()) - elif tp_plan is not None: - if not torch.distributed.is_initialized(): - raise ValueError("Tensor Parallel requires torch.distributed to be initialized first.") - - # Detect the accelerator on the machine. If no accelerator is available, it returns CPU. - device_type = torch._C._get_accelerator().type - device_module = torch.get_device_module(device_type) - # Get device with index assuming equal number of devices per host - tp_device = torch.device(device_type, torch.distributed.get_rank() % device_module.device_count()) - init_contexts.append(tp_device) if is_deepspeed_zero3_enabled() and is_quantized: init_contexts.append(set_quantized_state()) @@ -4249,38 +4259,32 @@ def from_pretrained( if dtype_orig is not None: torch.set_default_dtype(dtype_orig) - load_contexts = [] - # Make sure we load onto targeted device - if tp_device is not None: - load_contexts.append(tp_device) - - with ContextManagers(load_contexts): - ( - model, - missing_keys, - unexpected_keys, - mismatched_keys, - offload_index, - error_msgs, - ) = cls._load_pretrained_model( - model, - state_dict, - loaded_state_dict_keys, # XXX: rename? - resolved_archive_file, - pretrained_model_name_or_path, - ignore_mismatched_sizes=ignore_mismatched_sizes, - sharded_metadata=sharded_metadata, - _fast_init=_fast_init, - low_cpu_mem_usage=low_cpu_mem_usage, - device_map=device_map, - offload_folder=offload_folder, - offload_state_dict=offload_state_dict, - dtype=torch_dtype, - hf_quantizer=hf_quantizer, - keep_in_fp32_modules=keep_in_fp32_modules, - gguf_path=gguf_path, - weights_only=weights_only, - ) + ( + model, + missing_keys, + unexpected_keys, + mismatched_keys, + offload_index, + error_msgs, + ) = cls._load_pretrained_model( + model, + state_dict, + loaded_state_dict_keys, # XXX: rename? + resolved_archive_file, + pretrained_model_name_or_path, + ignore_mismatched_sizes=ignore_mismatched_sizes, + sharded_metadata=sharded_metadata, + _fast_init=_fast_init, + low_cpu_mem_usage=low_cpu_mem_usage, + device_map=device_map, + offload_folder=offload_folder, + offload_state_dict=offload_state_dict, + dtype=torch_dtype, + hf_quantizer=hf_quantizer, + keep_in_fp32_modules=keep_in_fp32_modules, + gguf_path=gguf_path, + weights_only=weights_only, + ) # make sure token embedding weights are still tied if needed model.tie_weights() From 0d142d91715cc56de8d591bcc9fe8d42cafc5d4f Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Thu, 23 Jan 2025 20:57:04 +0100 Subject: [PATCH 02/11] Update modeling_utils.py --- src/transformers/modeling_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transformers/modeling_utils.py b/src/transformers/modeling_utils.py index 731f7661f4e2..eeb38fe8e320 100755 --- a/src/transformers/modeling_utils.py +++ b/src/transformers/modeling_utils.py @@ -3451,6 +3451,7 @@ def from_pretrained( # childs processes at parallelization time, resulting in excessive memory usage on device 0 and OOMs. # And temporarily setting the default device to current process rank result in the following error # `torch.distributed.DistBackendError: Attempt to perform collective on tensor not on device passed to init_process_group` + tp_device = None if tp_plan is not None: if not torch.distributed.is_initialized(): raise ValueError("Tensor Parallel requires torch.distributed to be initialized first.") @@ -4110,7 +4111,6 @@ def from_pretrained( # Instantiate model. init_contexts = [no_init_weights(_enable=_fast_init)] - tp_device = None if is_deepspeed_zero3_enabled() and not is_quantized and not _is_ds_init_called: import deepspeed From 83207e9282eb4b8ee3024bc909b9f36a3a17c91e Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Thu, 23 Jan 2025 21:05:26 +0100 Subject: [PATCH 03/11] style --- src/transformers/modeling_utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/transformers/modeling_utils.py b/src/transformers/modeling_utils.py index eeb38fe8e320..b2564cbeacf0 100755 --- a/src/transformers/modeling_utils.py +++ b/src/transformers/modeling_utils.py @@ -3442,10 +3442,11 @@ def from_pretrained( if tp_plan is not None and tp_plan != "auto": # TODO: we can relax this check when we support taking tp_plan from a json file, for example. raise ValueError(f"tp_plan supports 'auto' only for now but got {tp_plan}.") - + if tp_plan is not None and device_map is not None: - raise ValueError(f"`tp_plan` and `device_map` are mutually exclusive. Choose either one for parallelization.") - + "`tp_plan` and `device_map` are mutually exclusive. Choose either one for parallelization." + ) + # We need to correctly dispatch the model on the current process device. The easiest way for this is to use a simple # `device_map` pointing to the correct device. If we don't, torch will use the default device (index 0) for all # childs processes at parallelization time, resulting in excessive memory usage on device 0 and OOMs. From daa4a386941ad631cc2cf86f9bac57374d8b24ed Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Thu, 23 Jan 2025 21:05:41 +0100 Subject: [PATCH 04/11] style --- src/transformers/modeling_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transformers/modeling_utils.py b/src/transformers/modeling_utils.py index b2564cbeacf0..b5df36e12a94 100755 --- a/src/transformers/modeling_utils.py +++ b/src/transformers/modeling_utils.py @@ -3444,6 +3444,7 @@ def from_pretrained( raise ValueError(f"tp_plan supports 'auto' only for now but got {tp_plan}.") if tp_plan is not None and device_map is not None: + raise ValueError( "`tp_plan` and `device_map` are mutually exclusive. Choose either one for parallelization." ) From ee0ca561e876424f23d96b2ff45d745fd56c210b Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 11:34:28 +0100 Subject: [PATCH 05/11] Update test_tp.py --- tests/tp/test_tp.py | 46 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 2139a648867b..401ff811c03c 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -13,6 +13,8 @@ # limitations under the License. import os +import textwrap +import tempfile from transformers import is_torch_available from transformers.models.llama.configuration_llama import LlamaConfig @@ -43,6 +45,50 @@ def test_tp(self): execute_subprocess_async(cmd, env=self.get_env()) # successful return here == success - any errors would have caused an error in the sub-call + @require_torch_multi_gpu + def test_memory_consumptiom_loading(self): + script_to_run = textwrap.dedent( + """ + import torch + import os + from transformers import AutoModelForCausalLM + + model_id = "meta-llama/Meta-Llama-3-8B-Instruct" + + rank = int(os.environ["RANK"]) + world_size = int(os.environ["WORLD_SIZE"]) + device = torch.device(f"cuda:{rank}") + torch.distributed.init_process_group("nccl", device_id=device) + device_mesh = torch.distributed.init_device_mesh("cuda", (world_size,)) + + model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, tp_plan="auto") + torch.distributed.barrier() + + # The expected full model memory footprint + expected_model_memory = 16 + overhead_factor = 1.2 + + # Assert we did not use more than the full model expected memory (with some overhead) + if not torch.cuda.max_memory_allocated(device) / 1024**3 < expected_model_memory * overhead_factor: + raise ValueError("Loading the model used more than the full model size") + + # Assert we correctly handled the sharding (we use 20 GB) + if not torch.cuda.memory_allocated(device) / 1024**3 < (expected_model_memory / world_size) * overhead_factor: + raise ValueError("Each model shard is larger than what is expected.") + """ + ) + + with tempfile.NamedTemporaryFile(suffix=".py") as tmp: + tmp.write(script_to_run) + distributed_args = f"--nproc_per_node={torch.cuda.device_count()} --master_port={get_torch_dist_unique_port()} {tmp.name}".split() + output_dir = self.get_auto_remove_tmp_dir() + args = f"--output_dir {output_dir} --report_to none".split() + cmd = ["torchrun"] + distributed_args + args + print(cmd) + execute_subprocess_async(cmd, env=self.get_env()) + # successful return here == success - any errors would have caused an error in the sub-call + + if __name__ == "__main__": # The script below is meant to be run under torch.distributed, on a machine with multiple GPUs: From ca3828af8a59856b554354839e8f0e1fa57dbf09 Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 12:12:59 +0100 Subject: [PATCH 06/11] Update test_tp.py --- tests/tp/test_tp.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 401ff811c03c..88f801472630 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -15,6 +15,7 @@ import os import textwrap import tempfile +import subprocess from transformers import is_torch_available from transformers.models.llama.configuration_llama import LlamaConfig @@ -46,7 +47,7 @@ def test_tp(self): # successful return here == success - any errors would have caused an error in the sub-call @require_torch_multi_gpu - def test_memory_consumptiom_loading(self): + def test_loading_memory_consumption(self): script_to_run = textwrap.dedent( """ import torch @@ -59,7 +60,6 @@ def test_memory_consumptiom_loading(self): world_size = int(os.environ["WORLD_SIZE"]) device = torch.device(f"cuda:{rank}") torch.distributed.init_process_group("nccl", device_id=device) - device_mesh = torch.distributed.init_device_mesh("cuda", (world_size,)) model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, tp_plan="auto") torch.distributed.barrier() @@ -75,17 +75,24 @@ def test_memory_consumptiom_loading(self): # Assert we correctly handled the sharding (we use 20 GB) if not torch.cuda.memory_allocated(device) / 1024**3 < (expected_model_memory / world_size) * overhead_factor: raise ValueError("Each model shard is larger than what is expected.") + + torch.distributed.barrier() + torch.utils.distributed.destroy_process_group() """ ) - with tempfile.NamedTemporaryFile(suffix=".py") as tmp: + with tempfile.NamedTemporaryFile(mode="w+", suffix=".py") as tmp: tmp.write(script_to_run) - distributed_args = f"--nproc_per_node={torch.cuda.device_count()} --master_port={get_torch_dist_unique_port()} {tmp.name}".split() - output_dir = self.get_auto_remove_tmp_dir() - args = f"--output_dir {output_dir} --report_to none".split() - cmd = ["torchrun"] + distributed_args + args - print(cmd) - execute_subprocess_async(cmd, env=self.get_env()) + tmp.flush() + tmp.seek(0) + cmd = ( + f"torchrun --nproc_per_node {torch.cuda.device_count()} --master_port {get_torch_dist_unique_port()} {tmp.name}" + ).split() + + # Note that the subprocess will be waited for here + sub = subprocess.run(cmd, capture_output=True, env=self.get_env(), text=True) + print(sub.stdout) + print(sub.stderr) # successful return here == success - any errors would have caused an error in the sub-call From 9422935b93a4c27d1ada3c5646d1b5d4c8e3b22d Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 12:14:07 +0100 Subject: [PATCH 07/11] style --- tests/tp/test_tp.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 88f801472630..64dae9e8eae5 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -13,9 +13,9 @@ # limitations under the License. import os -import textwrap -import tempfile import subprocess +import tempfile +import textwrap from transformers import is_torch_available from transformers.models.llama.configuration_llama import LlamaConfig @@ -64,7 +64,7 @@ def test_loading_memory_consumption(self): model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, tp_plan="auto") torch.distributed.barrier() - # The expected full model memory footprint + # The expected full model memory footprint expected_model_memory = 16 overhead_factor = 1.2 @@ -96,7 +96,6 @@ def test_loading_memory_consumption(self): # successful return here == success - any errors would have caused an error in the sub-call - if __name__ == "__main__": # The script below is meant to be run under torch.distributed, on a machine with multiple GPUs: # CUDA_VISIBLE_DEVICES=0,1 RUN_SLOW=1 pytest -sv tests/tp/test_tp.py From c9537f5ddef6db3b223e9fc6a0057de5728ad9d6 Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 12:15:11 +0100 Subject: [PATCH 08/11] Update test_tp.py --- tests/tp/test_tp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 64dae9e8eae5..859e4f36ffa3 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -72,7 +72,7 @@ def test_loading_memory_consumption(self): if not torch.cuda.max_memory_allocated(device) / 1024**3 < expected_model_memory * overhead_factor: raise ValueError("Loading the model used more than the full model size") - # Assert we correctly handled the sharding (we use 20 GB) + # Assert we correctly handled the sharding between devices if not torch.cuda.memory_allocated(device) / 1024**3 < (expected_model_memory / world_size) * overhead_factor: raise ValueError("Each model shard is larger than what is expected.") From 4c45e390506146276f0464e21de4f10240191443 Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 12:39:42 +0100 Subject: [PATCH 09/11] Update test_tp.py --- tests/tp/test_tp.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 859e4f36ffa3..5775dac83414 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -33,6 +33,22 @@ class TestTensorParallel(TestCasePlus): + def torchrun(self, script: str): + """Run the `script` using `torchrun` command for multi-processing in a subprocess. Captures errors as necesary.""" + with tempfile.NamedTemporaryFile(mode="w+", suffix=".py") as tmp: + tmp.write(script) + tmp.flush() + tmp.seek(0) + cmd = ( + f"torchrun --nproc_per_node {torch.cuda.device_count()} --master_port {get_torch_dist_unique_port()} {tmp.name}" + ).split() + + # Note that the subprocess will be waited for here, and raise an error if not successful + try: + _ = subprocess.run(cmd, capture_output=True, env=self.get_env(), text=True, check=True) + except subprocess.CalledProcessError as e: + raise Exception(f"The following error was capture: {e.stderr}") + @require_torch_multi_gpu def test_tp(self): distributed_args = f"""--nproc_per_node={torch.cuda.device_count()} @@ -80,20 +96,7 @@ def test_loading_memory_consumption(self): torch.utils.distributed.destroy_process_group() """ ) - - with tempfile.NamedTemporaryFile(mode="w+", suffix=".py") as tmp: - tmp.write(script_to_run) - tmp.flush() - tmp.seek(0) - cmd = ( - f"torchrun --nproc_per_node {torch.cuda.device_count()} --master_port {get_torch_dist_unique_port()} {tmp.name}" - ).split() - - # Note that the subprocess will be waited for here - sub = subprocess.run(cmd, capture_output=True, env=self.get_env(), text=True) - print(sub.stdout) - print(sub.stderr) - # successful return here == success - any errors would have caused an error in the sub-call + self.torchrun(script_to_run) if __name__ == "__main__": From 0b57b5f041cb7f6a2238b5bc4e80e895744651de Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 12:40:58 +0100 Subject: [PATCH 10/11] Update test_tp.py --- tests/tp/test_tp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 5775dac83414..1c3f0fc808c5 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -47,7 +47,7 @@ def torchrun(self, script: str): try: _ = subprocess.run(cmd, capture_output=True, env=self.get_env(), text=True, check=True) except subprocess.CalledProcessError as e: - raise Exception(f"The following error was capture: {e.stderr}") + raise Exception(f"The following error was captured: {e.stderr}") @require_torch_multi_gpu def test_tp(self): From daafd435a84e8f87291bd9d4870a94a0512b0b7e Mon Sep 17 00:00:00 2001 From: Cyril Vallez Date: Tue, 28 Jan 2025 12:46:58 +0100 Subject: [PATCH 11/11] Update test_tp.py --- tests/tp/test_tp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tp/test_tp.py b/tests/tp/test_tp.py index 1c3f0fc808c5..3df57c5f955c 100644 --- a/tests/tp/test_tp.py +++ b/tests/tp/test_tp.py @@ -93,7 +93,7 @@ def test_loading_memory_consumption(self): raise ValueError("Each model shard is larger than what is expected.") torch.distributed.barrier() - torch.utils.distributed.destroy_process_group() + torch.distributed.destroy_process_group() """ ) self.torchrun(script_to_run)