diff --git a/paddlenlp/trainer/trainer.py b/paddlenlp/trainer/trainer.py index 75e643c0caa3..e9e3e24e47b0 100644 --- a/paddlenlp/trainer/trainer.py +++ b/paddlenlp/trainer/trainer.py @@ -2155,16 +2155,7 @@ def save_model(self, output_dir: Optional[str] = None, merge_tensor_parallel: Op self.model_wrapped.get_all_parameters(convert2cpu=True) if self.args.should_save_model_state: - unified_checkpoint_config_backup = self.args.unified_checkpoint_config - # backup and remove unified_checkpoint_config for not trine stage - if not self.is_in_train: - self.args.unified_checkpoint_config = [] - self._save(output_dir=output_dir, merge_tensor_parallel=merge_tensor_parallel) - - # recover unified_checkpoint_config for not trine stage - if not self.is_in_train: - self.args.unified_checkpoint_config = unified_checkpoint_config_backup else: if self.args.unified_checkpoint and "async_save" in self.args.unified_checkpoint_config: os.makedirs(output_dir, exist_ok=True) @@ -2427,10 +2418,9 @@ def _save(self, output_dir: Optional[str] = None, state_dict=None, merge_tensor_ # Save a trained model and configuration using `save_pretrained()`. # They can then be reloaded using `from_pretrained()` - local_rank = int(os.getenv("PADDLE_RANK_IN_NODE", 0)) if ( strtobool(os.getenv("FLAG_LLM_PDC", "False")) - and local_rank == 0 + and paddle.distributed.get_rank() == 0 and self.args.unified_checkpoint and "async_save" in self.args.unified_checkpoint_config ): @@ -2441,9 +2431,10 @@ def _save(self, output_dir: Optional[str] = None, state_dict=None, merge_tensor_ "ignore_save_lr_and_optim": self.args.ignore_save_lr_and_optim, "skip_save_model_weight": "skip_save_model_weight" in self.args.unified_checkpoint_config, } - if not os.path.exists(os.path.join(self.args.logging_dir, "async_save_info.json")): - with open(os.path.join(self.args.logging_dir, "async_save_info.json"), "w") as f: - json.dump(save_info, f) + if os.path.exists(os.path.join(self.args.logging_dir, "async_save_info.json")): # afs cannot overwrite + os.remove(os.path.join(self.args.logging_dir, "async_save_info.json")) + with open(os.path.join(self.args.logging_dir, "async_save_info.json"), "w") as f: + json.dump(save_info, f) if self.args.should_save: if self.tokenizer is not None: @@ -2452,7 +2443,17 @@ def _save(self, output_dir: Optional[str] = None, state_dict=None, merge_tensor_ paddle.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME)) if self.args.unified_checkpoint: + unified_checkpoint_config_backup = self.args.unified_checkpoint_config + # backup and remove unified_checkpoint_config for not trine stage + if not self.is_in_train: + self.args.unified_checkpoint_config = [] + self.unified_checkpoint_handler.save_unified_checkpoint(self.model, self.optimizer, output_dir) + + # recover unified_checkpoint_config for not trine stage + if not self.is_in_train: + self.args.unified_checkpoint_config = unified_checkpoint_config_backup + return merge_tensor_parallel = merge_tensor_parallel and self.args.use_hybrid_parallel