diff --git a/open_instruct/finetune.py b/open_instruct/finetune.py index 6a998900e3..800e2b4474 100644 --- a/open_instruct/finetune.py +++ b/open_instruct/finetune.py @@ -40,6 +40,7 @@ import torch import transformers from accelerate import Accelerator, DataLoaderConfiguration +from accelerate.accelerator import GradientAccumulationPlugin from accelerate.logging import get_logger from accelerate.utils import InitProcessGroupKwargs, set_seed from huggingface_hub import HfApi @@ -87,8 +88,6 @@ class FlatArguments: exp_name: str = os.path.basename(__file__)[: -len(".py")] """The name of this experiment""" - run_name: Optional[str] = None - """A unique name of this run""" do_not_randomize_output_dir: bool = False """By default the output directory will be randomized""" model_name_or_path: Optional[str] = field( @@ -373,6 +372,18 @@ class FlatArguments: "help": "Whether to clean up all previous checkpoints at the end of the run.", }, ) + final_lr_ratio: Optional[float] = field( + default=None, + metadata={ + "help": "Set the final lr value at the end of training to be final_lr_ratio * learning_rate." + " Only for linear schedulers, currently." + }, + ) + add_seed_and_date_to_exp_name: bool = True + additional_model_arguments: Optional[list[str]] = field( + default=None, + metadata={"help": "A list of key:val to be passed as additional model args."}, + ) def __post_init__(self): if self.reduce_loss not in ["mean", "sum"]: @@ -387,6 +398,29 @@ def __post_init__(self): raise ValueError("Cannot provide two dataset selection mechanisms.") if self.try_launch_beaker_eval_jobs and not self.push_to_hub: raise ValueError("Cannot launch Beaker evaluation jobs without pushing to the Hub.") + if self.final_lr_ratio is not None: + if self.lr_scheduler_type != "linear": + raise NotImplementedError("final_lr_ratio only currently implemented for linear schedulers") + if not (1.0 >= self.final_lr_ratio >= 0.0): + raise ValueError(f"final_lr_ratio must be between 0 and 1, not {self.final_lr_ratio=}") + + if self.additional_model_arguments is not None: + import re + + def maybe_convert_(x): + return ( + float(x) + if x.count(".") == 1 and re.sub(r"^-?.*\.", "", x, count=1).isnumeric() + else (int(x) if x.count(".") == 0 and re.sub("^-?", "", x).isnumeric() else x) + ) + + try: + self.additional_model_arguments = [x.split(":") for x in self.additional_model_arguments] + self.additional_model_arguments = {k: maybe_convert_(v) for k, v in self.additional_model_arguments} + except IndexError: + raise ValueError("Malformed additional model arguments. Should be space-delimited list of key:val.") + else: + self.additional_model_arguments = {} def main(args: FlatArguments, tc: TokenizerConfig): @@ -401,11 +435,15 @@ def main(args: FlatArguments, tc: TokenizerConfig): # if you get timeouts (e.g. due to long tokenization) increase this. timeout_kwargs = InitProcessGroupKwargs(timeout=timedelta(seconds=args.timeout)) dataloader_config = DataLoaderConfiguration(use_seedable_sampler=True) + accelerator = Accelerator( - gradient_accumulation_steps=args.gradient_accumulation_steps, dataloader_config=dataloader_config, **accelerator_log_kwargs, kwargs_handlers=[timeout_kwargs], + gradient_accumulation_plugin=GradientAccumulationPlugin( + num_steps=args.gradient_accumulation_steps, + sync_each_batch=True, + ), ) # ------------------------------------------------------------ @@ -425,9 +463,13 @@ def main(args: FlatArguments, tc: TokenizerConfig): # ------------------------------------------------------------ # Set up runtime variables - args.run_name = f"{args.exp_name}__{args.seed}__{int(time.time())}" + + if args.add_seed_and_date_to_exp_name: + args.exp_name = f"{args.exp_name}__{args.seed}__{int(time.time())}" + else: + args.exp_name = args.exp_name if not args.do_not_randomize_output_dir: - args.output_dir = os.path.join(args.output_dir, args.run_name) + args.output_dir = os.path.join(args.output_dir, args.exp_name) logger.info("using the output directory: %s", args.output_dir) args.dataset_local_cache_dir = os.path.abspath(args.dataset_local_cache_dir) if is_beaker_job(): @@ -441,7 +483,7 @@ def main(args: FlatArguments, tc: TokenizerConfig): args.hf_entity = HfApi().whoami()["name"] args.hf_repo_id = f"{args.hf_entity}/{args.hf_repo_id}" if args.hf_repo_revision is None: - args.hf_repo_revision = args.run_name + args.hf_repo_revision = args.exp_name args.hf_repo_url = f"https://huggingface.co/{args.hf_repo_id}/tree/{args.hf_repo_revision}" if is_beaker_job(): beaker_config = maybe_get_beaker_config() @@ -465,7 +507,7 @@ def main(args: FlatArguments, tc: TokenizerConfig): experiment_config, init_kwargs={ "wandb": { - "name": args.run_name, + "name": args.exp_name, "entity": args.wandb_entity, "tags": [args.exp_name] + get_wandb_tags(), } @@ -534,12 +576,14 @@ def main(args: FlatArguments, tc: TokenizerConfig): args.config_name, revision=args.model_revision, trust_remote_code=tc.trust_remote_code, + **args.additional_model_arguments, ) elif args.model_name_or_path: config = AutoConfig.from_pretrained( args.model_name_or_path, revision=args.model_revision, trust_remote_code=tc.trust_remote_code, + **args.additional_model_arguments, ) else: raise ValueError( @@ -708,11 +752,19 @@ def main(args: FlatArguments, tc: TokenizerConfig): num_training_steps_for_scheduler = ( args.max_train_steps if overrode_max_train_steps else args.max_train_steps * accelerator.num_processes ) + + num_warmup_steps = int(num_training_steps_for_scheduler * args.warmup_ratio) + if args.final_lr_ratio is not None and args.lr_scheduler_type == "linear": + # Correct num_training_steps_for_scheduler to respect final_lr_ratio for a linear scheduler + num_training_steps_for_scheduler = ( + num_training_steps_for_scheduler - args.final_lr_ratio * num_warmup_steps + ) / (1 - args.final_lr_ratio) + lr_scheduler = get_scheduler( name=args.lr_scheduler_type, optimizer=optimizer, num_training_steps=num_training_steps_for_scheduler, - num_warmup_steps=int(num_training_steps_for_scheduler * args.warmup_ratio), + num_warmup_steps=num_warmup_steps, ) # Prepare everything with `accelerator`. model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare( @@ -835,7 +887,20 @@ def main(args: FlatArguments, tc: TokenizerConfig): loss += aux_loss # We keep track of the loss at each logged step total_loss += loss.detach().float() - accelerator.backward(loss) + + # [Pre-backwards scalings] + # accelerator.backward internally divides by `gradient_accumulation_steps`, which is + # only the right thing to do for `mean` losses. For "sum" losses, we counteract this + # by multiplying the loss by `gradient_accumulation_steps` before the backwards + # call. Additionally, DeepSpeed/FSDP average the gradients across processes, whereas + # we should be summing gradients for a "sum" loss, hence we also multiply by the + # world size in this latter case. + accelerator.backward( + loss + if args.reduce_loss == "mean" + else loss * args.gradient_accumulation_steps * accelerator.num_processes + ) + if args.load_balancing_loss: total_aux_loss += aux_loss.detach().float() # clip gradient norm. don't do this with deepspeed