diff --git a/examples/evaluation/argument_builder.py b/examples/evaluation/argument_builder.py new file mode 100644 index 0000000000..9e58eb7b0b --- /dev/null +++ b/examples/evaluation/argument_builder.py @@ -0,0 +1,69 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse +import os + +from argument_parser import parse_cli_args + + +def list_of_strings(arg): + """Split a comma-separated string into a list of substrings.""" + return arg.split(",") + + +def normalize_arg_name(arg_name: str) -> str: + """ + Normalizes a command-line argument name (e.g., '--model_family_name' or '-m') + into a suitable environment variable name (e.g., 'MODEL_FAMILY_NAME'). + """ + name = arg_name.lstrip("-") + name = name.upper() + name = name.replace("-", "_") + return name + + +def build_cli_args_from_env_vars(parser: argparse.ArgumentParser) -> str: + """ + Inspects an argparse.ArgumentParser, checks for corresponding environment + variables, and constructs a CLI argument string from them. + """ + cli_arg_string = [] + + for action in parser._actions: + if action.option_strings: + long_arg_name = action.option_strings[-1] + env_var_name = normalize_arg_name(long_arg_name) + env_value = os.getenv(env_var_name) + + if env_value is not None: + if isinstance(action, argparse._StoreTrueAction): + is_true = env_value.lower() in ("true", "1", "yes", "on") + if is_true: + cli_arg_string.append(long_arg_name) + continue + elif action.type is list_of_strings: + if env_value: + cli_arg_string.append(long_arg_name) + cli_arg_string.append(env_value) + continue + else: + cli_arg_string.append(long_arg_name) + cli_arg_string.append(env_value) + + return " ".join(cli_arg_string) + + +if __name__ == "__main__": + cli_args_string = build_cli_args_from_env_vars(parse_cli_args()) + print(cli_args_string) diff --git a/examples/evaluation/argument_parser.py b/examples/evaluation/argument_parser.py new file mode 100644 index 0000000000..3d88381855 --- /dev/null +++ b/examples/evaluation/argument_parser.py @@ -0,0 +1,247 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse + + +def list_of_strings(arg): + """Split a comma-separated string into a list of substrings.""" + return arg.split(",") + + +def to_dict(arg): + """Split a comma-separated string into a dictionary of key-value pairs.""" + return dict(item.split("=") for item in arg.split(",")) + + +ENDPOINT_TYPES = {"chat": "chat/completions/", "completions": "completions/"} + + +def parse_cli_args(): + """Parse command line arguments for launching Megatron-Bridge Evaluation.""" + parser = argparse.ArgumentParser(description="Launch Megatron-Bridge Evaluation") + parser.add_argument( + "--dryrun", + action="store_true", + help="Dry run the experiment.", + default=False, + ) + + # Deployment args + deployment_args = parser.add_argument_group("Deployment arguments") + deployment_args.add_argument("--megatron_checkpoint", type=str, help="Megatron checkpoint to evaluate") + deployment_args.add_argument( + "--host", + type=str, + help="Server address to use for evaluation", + default="0.0.0.0", + ) + deployment_args.add_argument("--port", type=int, help="Server port to use for evaluation", default=8000) + deployment_args.add_argument("--gpus_per_node", type=int, help="Number of GPUs per node", default=8) + deployment_args.add_argument("--num_gpus", type=int, help="Number of nodes to use for evaluation", default=8) + deployment_args.add_argument("--num_replicas", type=int, default=1, help="Num of replicas for Ray server") + deployment_args.add_argument( + "--tensor_model_parallel_size", + type=int, + help="Tensor model parallel size to use for evaluation", + default=1, + ) + deployment_args.add_argument( + "--pipeline_model_parallel_size", + type=int, + help="Pipeline model parallel size to use for evaluation", + default=1, + ) + deployment_args.add_argument( + "--context_model_parallel_size", + type=int, + help="Context model parallel size to use for evaluation", + default=1, + ) + + # Evaluation args + evaluation_args = parser.add_argument_group("Evaluation arguments") + evaluation_args.add_argument( + "--endpoint_type", + type=str, + default="completions", + help="Whether to use completions or chat endpoint. Refer to the docs for details on tasks that are completions" + "v/s chat.", + choices=list(ENDPOINT_TYPES), + ) + evaluation_args.add_argument( + "--limit_samples", + type=float, + default=None, + help="Limit evaluation to `limit` samples. Default: use all samples.", + ) + evaluation_args.add_argument( + "--parallelism", + type=int, + default=8, + help="Number of parallel requests to send to server. Default: use default for the task.", + ) + evaluation_args.add_argument( + "--request_timeout", + type=int, + default=1000, + help="Time in seconds for the eval client. Default: 1000s", + ) + evaluation_args.add_argument( + "--temperature", + type=float, + default=None, + help="Sampling temperature for generation. Higher values = more random. Default: use task default.", + ) + evaluation_args.add_argument( + "--top_p", + type=float, + default=None, + help="Top-p (nucleus) sampling threshold. Default: use task default.", + ) + evaluation_args.add_argument( + "--top_k", + type=int, + default=None, + help="Top-k sampling threshold. Default: use task default.", + ) + evaluation_args.add_argument( + "--eval_task", + type=str, + default="mmlu", + help="Evaluation benchmark to run. Refer to the docs for more details on the tasks/benchmarks.", + ) + + # Slurm args + slurm_args = parser.add_argument_group("Slurm arguments") + slurm_args.add_argument( + "--custom_mounts", type=list_of_strings, help="Comma separated string of mounts", default=[], required=False + ) + slurm_args.add_argument( + "--custom_env_vars", + type=to_dict, + help="Comma separated string of environment variables", + default={}, + required=False, + ) + slurm_args.add_argument("--account", type=str, help="Cluster account to run test") + slurm_args.add_argument("--partition", type=str, help="Cluster partition to run test") + slurm_args.add_argument("--time_limit", type=str, default="04:00:00", help="Time limit of run") + slurm_args.add_argument("--container_image", type=str, default="", help="Container image to run") + + # Logging args + logging_args = parser.add_argument_group("Logging arguments") + logging_args.add_argument( + "--output_dir", + type=str, + help="Output directory to save the results", + required=False, + ) + logging_args.add_argument( + "--experiment_name", + type=str, + help="wandb job name", + required=False, + ) + logging_args.add_argument( + "--wandb_key", + type=str, + help="wandb key. Needed for wandb logger projection to server", + required=False, + ) + logging_args.add_argument( + "--wandb_project_name", + type=str, + help="wandb project name", + required=False, + ) + logging_args.add_argument( + "--wandb_entity_name", + type=str, + help="wandb entity name", + required=False, + ) + logging_args.add_argument( + "--wandb_experiment_name", + type=str, + help="wandb job name", + required=False, + ) + + # Tokenizer args + tokenizer_args = parser.add_argument_group("Tokenizer arguments") + tokenizer_args.add_argument( + "-hf", + "--hf_token", + type=str, + help="HuggingFace token. Defaults to None. Required for accessing tokenizers and checkpoints.", + ) + + # DGXCloud + dgxc_args = parser.add_argument_group("DGXCloud arguments") + dgxc_args.add_argument( + "--dgxc_cluster", + type=str, + help="DGXCloud cluster to use for experiment", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_base_url", + type=str, + help="DGXCloud base url", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_kube_apiserver_url", + type=str, + help="DGXCloud kube apiserver url", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_app_id", + type=str, + help="DGXCloud app id", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_app_secret", + type=str, + help="DGXCloud app secret", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_project_name", + type=str, + help="DGXCloud project name", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_pvc_claim_name", + type=str, + help="DGXCloud pvc claim name", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_pvc_mount_path", + type=str, + help="DGXCloud pvc mount path", + required=False, + ) + dgxc_args.add_argument( + "--dgxc_namespace", + type=str, + help="DGXCloud namespace", + required=False, + ) + + return parser diff --git a/examples/evaluation/deploy.sh b/examples/evaluation/deploy.sh new file mode 100644 index 0000000000..c1418f9ed5 --- /dev/null +++ b/examples/evaluation/deploy.sh @@ -0,0 +1,19 @@ +# Unset SLURM/PMI/PMIX env vars to prevent MPI initialization issues +for i in $(env | grep ^SLURM_ | cut -d"=" -f 1); do unset -v $i; done +for i in $(env | grep ^PMI_ | cut -d"=" -f 1); do unset -v $i; done +for i in $(env | grep ^PMIX_ | cut -d"=" -f 1); do unset -v $i; done + +MEGATRON_CHECKPOINT=$1 +NUM_REPLICAS=$2 +NUM_GPUS=$3 +python \ + /opt/Export-Deploy/scripts/deploy/nlp/deploy_ray_inframework.py \ + --megatron_checkpoint "$MEGATRON_CHECKPOINT" \ + --model_id megatron_model \ + --host 0.0.0.0 \ + --port 8000 \ + --num_gpus "$NUM_GPUS" \ + --num_replicas "$NUM_REPLICAS" \ + --tensor_model_parallel_size 1 \ + --pipeline_model_parallel_size 1 \ + --context_parallel_size 1 diff --git a/examples/evaluation/eval.sh b/examples/evaluation/eval.sh new file mode 100644 index 0000000000..4c790e25fb --- /dev/null +++ b/examples/evaluation/eval.sh @@ -0,0 +1,77 @@ +# Unset SLURM/PMI/PMIX env vars to prevent MPI initialization issues +for i in $(env | grep ^SLURM_ | cut -d"=" -f 1); do unset -v $i; done +for i in $(env | grep ^PMI_ | cut -d"=" -f 1); do unset -v $i; done +for i in $(env | grep ^PMIX_ | cut -d"=" -f 1); do unset -v $i; done + +OUTPUT_DIR=$1 +PARALLELISM=$2 + +# Install missing dependency for lm-evaluation-harness +uv pip install math_verify --quiet + +uv run --active --no-sync python << EVAL_EOF +import subprocess +import time + +from nemo_evaluator.api.api_dataclasses import ( + ApiEndpoint, + ConfigParams, + EvaluationConfig, + EvaluationTarget, +) +from nemo_evaluator.api import check_endpoint, evaluate + +# Configuration +endpoint_url = "http://0.0.0.0:8000/v1/completions/" +endpoint_type = "completions" +model_id = "megatron_model" +eval_task = "mmlu" +limit_samples = 100 +parallelism = $PARALLELISM +request_timeout = 1000 +temperature = None +top_p = None +top_k = None +output_dir = "/$OUTPUT_DIR/results/" + +# Check server readiness +server_ready = check_endpoint( + endpoint_url=endpoint_url, + endpoint_type=endpoint_type, + model_name=model_id, +) +if not server_ready: + raise RuntimeError( + "Server is not ready to accept requests. Check the deployment logs for errors." + ) + +# Build configs +api_endpoint = ApiEndpoint( + url=endpoint_url, + type=endpoint_type, + model_id=model_id, +) +target_cfg = EvaluationTarget(api_endpoint=api_endpoint) +eval_params = ConfigParams( + limit_samples=limit_samples, + parallelism=parallelism, + request_timeout=request_timeout, + temperature=temperature, + top_p=top_p, + top_k=top_k, +) +eval_cfg = EvaluationConfig( + type=eval_task, + params=eval_params, + output_dir=output_dir, +) + +# Run evaluation +result = evaluate(target_cfg=target_cfg, eval_cfg=eval_cfg) + +# Shutdown Ray server +print("Evaluation completed. Shutting down Ray server...") +subprocess.run(["ray", "stop", "--force"], check=False, timeout=30) +print("Ray server shutdown command sent.") +time.sleep(5) +EVAL_EOF diff --git a/examples/evaluation/launch_evaluation_pipeline.py b/examples/evaluation/launch_evaluation_pipeline.py new file mode 100644 index 0000000000..1c3a1aaadb --- /dev/null +++ b/examples/evaluation/launch_evaluation_pipeline.py @@ -0,0 +1,178 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#!/usr/bin/env python3 +""" +Launch Megatron-Bridge Evaluation + +Parse arguments early to catch unknown args before other libraries +(like nemo_run) can consume them during import. +""" + +import logging +import os +import signal +import sys +import time +from dataclasses import dataclass + +import yaml +from nemo_run.core.execution.slurm import SlurmJobDetails +from nemo_run.run.ray.job import RayJob + + +try: + import wandb + + HAVE_WANDB = True +except (ImportError, ModuleNotFoundError): + HAVE_WANDB = False + wandb = None + +try: + from argument_parser import parse_cli_args + from utils.executors import kuberay_executor, slurm_executor +except (ImportError, ModuleNotFoundError): + from .argument_parser import parse_cli_args + from .utils.executors import kuberay_executor, slurm_executor + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +def register_pipeline_terminator(job: RayJob): + """Register a signal handler to terminate the job.""" + + def sigterm_handler(_signo, _stack_frame): + logger.info(f"Trying to terminate job {job.name}") + job.stop() + logger.info(f"Job {job.name} terminated") + sys.exit(0) + + signal.signal(signal.SIGINT, sigterm_handler) + signal.signal(signal.SIGTERM, sigterm_handler) + + +@dataclass(kw_only=True) +class CustomJobDetailsRay(SlurmJobDetails): + """Custom job details for Ray jobs.""" + + @property + def ls_term(self) -> str: + """This term will be used to fetch the logs. + + The command used to list the files is ls -1 {ls_term} 2> /dev/null + """ + assert self.folder + return os.path.join(self.folder, "ray-job.log") + + +def main(args): + """Deploys the inference and evaluation server with NemoRun.""" + + if not args.dgxc_cluster: + executor = slurm_executor( + account=args.account, + partition=args.partition, + nodes=-(args.num_gpus // -args.gpus_per_node), + num_gpus_per_node=args.gpus_per_node, + time_limit=args.time_limit, + container_image=args.container_image, + custom_mounts=args.custom_mounts, + custom_env_vars=args.custom_env_vars, + hf_token=args.hf_token, + ) + else: + executor = kuberay_executor( + nodes=-(args.num_gpus // -args.gpus_per_node), + num_gpus_per_node=args.gpus_per_node, + dgxc_pvc_claim_name=args.dgxc_pvc_claim_name, + dgxc_pvc_mount_path=args.dgxc_pvc_mount_path, + custom_env_vars=args.custom_env_vars, + container_image=args.container_image, + namespace=args.dgxc_namespace, + hf_token=args.hf_token, + ) + + executor.job_details = CustomJobDetailsRay() + + job = RayJob( + name="demo-slurm-ray-deploy", + executor=executor, + ) + job.start( + command=f"bash /opt/Megatron-Bridge/examples/evaluation/deploy.sh {args.megatron_checkpoint} {args.num_replicas} {args.num_gpus} | tee -a deploy.log & sleep 120; bash /opt/Megatron-Bridge/examples/evaluation/eval.sh {args.output_dir} {args.parallelism} | tee -a eval.log", + workdir=None, + ) + + register_pipeline_terminator(job=job) + + job_deployment_status = "Initializing" + job_status = "UNKNOWN" + while job_deployment_status != "Running" or job_status != "RUNNING": + status = job.status(display=False) + job_deployment_status = status["jobDeploymentStatus"] + job_status = status["jobStatus"] + time.sleep(1) + if job_deployment_status == "Failed": + raise RuntimeError("Job failed") + + job.logs(follow=True, timeout=10 * 60 * 60) + job.stop() + + with open(os.path.join(args.output_dir, "results", "results.yml"), "r") as f: + results = yaml.safe_load(f) + + logger.info("Results: %s", results) + + if HAVE_WANDB and args.wandb_key: + wandb.login(key=args.wandb_key) + api = wandb.Api() + runs = api.runs( + path=f"{args.wandb_entity_name}/{args.wandb_project_name}", + filters={"display_name": args.wandb_experiment_name}, + ) + + if runs: + run_id = runs[0].id + print(f"Found run with ID: {run_id}") + + wandb_run = wandb.init( + project=args.wandb_project_name, + entity=args.wandb_entity_name, + id=run_id, + resume="allow", + ) + artifact = wandb.Artifact(name="evaluation_results", type="evaluation_results") + artifact.add_file( + local_path=os.path.join(args.output_dir, "results", "results.yml"), + name="results.yml", + ) + wandb_run.log_artifact(artifact) + + for category in ["tasks", "groups"]: + for task_or_group_name, result in results["results"][category].items(): + for metric_name, metric_result in result["metrics"].items(): + field_key = f"{category.rstrip('s')}/{task_or_group_name}/{metric_name}" + wandb_run.log( + { + f"{field_key}/value": metric_result["scores"][metric_name]["value"], + f"{field_key}/stderr": metric_result["scores"][metric_name]["stats"]["stderr"], + } + ) + + wandb_run.finish() + + +if __name__ == "__main__": + main(args=parse_cli_args().parse_args()) diff --git a/examples/evaluation/utils/executors.py b/examples/evaluation/utils/executors.py new file mode 100644 index 0000000000..361c38ec8a --- /dev/null +++ b/examples/evaluation/utils/executors.py @@ -0,0 +1,155 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Dict, List + +import nemo_run as run +from nemo_run.config import get_nemorun_home +from nemo_run.core.execution.kuberay import KubeRayExecutor, KubeRayWorkerGroup + + +def slurm_executor( + account: str, + partition: str, + nodes: int, + num_gpus_per_node: int, + time_limit: str = "00:30:00", + container_image: str = "nvcr.io/nvidia/nemo:dev", + custom_mounts: List[str] = [], + custom_env_vars: Dict[str, str] = {}, + hf_token: str = None, +) -> run.SlurmExecutor: + """ + Slurm cluster definition with appropriate cluster params and NeMo container params needed for pre-training + and fine-tuning experiments + """ + env_vars = { + "HF_TOKEN": hf_token, + "HF_DATASETS_TRUST_REMOTE_CODE": "1", + "TRANSFORMERS_OFFLINE": "0", + } + if custom_env_vars: + env_vars.update(custom_env_vars) + + executor = run.SlurmExecutor( + account=account, + partition=partition, + tunnel=run.LocalTunnel(job_dir=os.path.join(get_nemorun_home(), "experiments")), + nodes=nodes, + ntasks_per_node=num_gpus_per_node, + container_image=container_image, + container_mounts=custom_mounts, + env_vars=env_vars, + srun_args=[ + "--mpi=pmix", + "--no-container-mount-home", + ], + time=time_limit, + mem="0", + exclusive=True, + packager=run.GitArchivePackager(), + ) + + return executor + + +def kuberay_executor( + nodes: int, + num_gpus_per_node: int, + dgxc_pvc_mount_path: str, + dgxc_pvc_claim_name: str, + namespace: str = "default", + ray_version: str = "2.43.0", + container_image: str = "", # Will be set in __post_init__ if empty + head_cpu: str = "8", + head_memory: str = "32Gi", + hf_token: str = None, + custom_env_vars: Dict[str, str] = None, +): + """ + Kuberay cluster definition with appropriate cluster params and NeMo container params needed for pre-training + and fine-tuning experiments + """ + + env_vars = { + "TORCH_HOME": "/nemo-workspace/.cache", + "FI_EFA_USE_HUGE_PAGE": "0", + "NCCL_BUFFSIZE": "8388608", + "NCCL_P2P_NET_CHUNKSIZE": "524288", + "NCCL_TUNER_PLUGIN": "/opt/gcp-ofi-nccl/install/lib/libnccl-ofi-tuner.so", + "HF_TOKEN": hf_token, + "TORCH_NCCL_AVOID_RECORD_STREAMS": "1", + "NCCL_NVLS_ENABLE": "0", + "NVTE_DP_AMAX_REDUCE_INTERVAL": "0", + "NVTE_ASYNC_AMAX_REDUCTION": "1", + "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", + "TOKENIZERS_PARALLELISM": "False", + "TRANSFORMERS_OFFLINE": "1", + "HF_HOME": "/nemo-workspace/pagaray/hf_cache", + "RAY_enable_infeasible_task_early_exit": "true", + "NCCL_IB_DISABLE": "1", + "NCCL_IB_HCA": "^openib", # Ignore OpenIB devices + "NCCL_NET": "Socket", + "NCCL_NET_GDR_LEVEL": "0", + "FI_PROVIDER": "tcp", + } + if custom_env_vars: + env_vars.update(custom_env_vars) + + executor = KubeRayExecutor( + namespace=namespace, + ray_version=ray_version, + image=container_image, + head_cpu=head_cpu, + head_memory=head_memory, + ray_head_start_params={"num-gpus": "0", "num-cpus": "0"}, + ray_worker_start_params={"num-gpus": "8", "num-cpus": "128"}, + worker_groups=[ + KubeRayWorkerGroup( + group_name="worker", + min_replicas=nodes, + max_replicas=nodes, + replicas=nodes, + gpus_per_worker=num_gpus_per_node, + cpu_requests="128", + cpu_limits="128", + memory_requests="512Gi", + memory_limits="512Gi", + ) + ], + spec_kwargs={ + "schedulerName": "runai-scheduler", + "image_pull_secrets": ["dockerregistry-dockerregistry-pagaray-ngc"], + }, # e.g. Run:ai + volume_mounts=[{"name": "workspace", "mountPath": dgxc_pvc_mount_path}], + volumes=[ + { + "name": "workspace", + "persistentVolumeClaim": {"claimName": dgxc_pvc_claim_name}, + }, + ], + env_vars=env_vars, + container_kwargs={ + "securityContext": { + "allowPrivilegeEscalation": False, + "runAsUser": 0, + }, + }, + ) + + executor.volumes.append({"name": "dshm", "emptyDir": {"medium": "Memory"}}) + executor.volume_mounts.append({"name": "dshm", "mountPath": "/dev/shm"}) + + return executor diff --git a/scripts/performance/argument_parser.py b/scripts/performance/argument_parser.py index 5012f43238..a3798df76a 100644 --- a/scripts/performance/argument_parser.py +++ b/scripts/performance/argument_parser.py @@ -676,8 +676,6 @@ def parse_cli_args(): default=0.70, help="Percentage of iterations to skip for timing comparison (default: 0.75 = 75%%)", ) - - # Convergence loss validation parameters testing_args.add_argument( "--correlation_threshold", type=float, default=0.95, help="Correlation threshold for loss curve validation" )