diff --git a/examples/pi0/README.md b/examples/pi0/README.md index 3dfa0aee1a..eaf93cddb4 100644 --- a/examples/pi0/README.md +++ b/examples/pi0/README.md @@ -159,9 +159,9 @@ Download test images: ```sh cd FlagScale/ -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_0_latest.jpg -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_1_latest.jpg -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_2_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_0_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_1_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_2_latest.jpg ``` Run client: diff --git a/examples/robobrain_x0/README.md b/examples/robobrain_x0/README.md index 6b8776e316..ebe6ab7867 100644 --- a/examples/robobrain_x0/README.md +++ b/examples/robobrain_x0/README.md @@ -16,6 +16,12 @@ cd FlagScale/ Install train and inference env according to [README](https://github.com/FlagOpen/FlagScale/blob/main/README.md) +Install transformers. Higher version cause problem on image pre-processing. + +```sh +pip install transformers==4.53.0 +``` + # Download Model ```sh @@ -70,9 +76,9 @@ Download test images: ```sh cd FlagScale/ -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_0_latest.jpg -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_1_latest.jpg -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_2_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_0_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_1_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_2_latest.jpg ``` Run client: diff --git a/examples/robobrain_x0_5/README.md b/examples/robobrain_x0_5/README.md index 9ebcc6fe22..866f827d75 100644 --- a/examples/robobrain_x0_5/README.md +++ b/examples/robobrain_x0_5/README.md @@ -17,6 +17,7 @@ cd FlagScale/ Install train and inference env according to [README](https://github.com/FlagOpen/FlagScale/blob/main/README.md) # Download Model + Checkpoint is not publish yet. Directory structure: @@ -40,6 +41,7 @@ Directory structure: `-- config.yaml ``` + # Serving ## Edit Config @@ -66,9 +68,9 @@ Download test images: ```sh cd FlagScale/ -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_0_latest.jpg -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_1_latest.jpg -wget https://gitee.com/hchnr/flag-scale/blob/robotics_dataset/orbbec_2_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_0_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_1_latest.jpg +wget https://gitee.com/hchnr/flag-scale/raw/robotics_dataset/orbbec_2_latest.jpg ``` Run client: @@ -82,3 +84,66 @@ python examples/robobrain_x0_5/client_libero.py \ --right-wrist-img orbbec_2_latest.jpg \ --num-steps 20 ``` + + +# Training + +## Prepare Dataset + +FlagScale uses WebDataset format and Megatraon.Energon data loader, you need process your data first. + +For example, there is a dataset of 2 timesteps: [demo_0913_n2](https://gitee.com/hchnr/flag-scale/tree/robotics_dataset/demo_0913_n2/wds-2). + +Download demo_0913_n2: + +```sh +git archive --remote=git@gitee.com:hchnr/flag-scale.git robotics_dataset demo_0913_n2/ | tar -xv -C . +``` + +The directory structure of demo_0913_n2 is as follows: +- build_dep.sh: Copy .npy and .jpg files from production environment to ./deps +- demo_0913_n2.jsonl: A single timestep, including: task(str), images(.jpg), action(.npy), state(.npy) +- deps: .npy and .jpg files +- wds-2: Data in webdataset format (DP=2), generated by tools/datasets/vla/convert.py + +Generate Data in webdataset format (DP=2) to ./demo_0913_n2/wds-2: + +```sh +python tools/datasets/vla/convert.py \ + --dataset-root=./demo_0913_n2 \ + --output-root=./demo_0913_n2 \ + --json=demo_0913_n2.jsonl \ + --train-split 1 \ + --val-split 0 \ + --images-key=image \ + --videos-key=video \ + --vision-root='' \ + --shuffle-tars \ + --num-workers=1 \ + --max-samples-per-tar 100000 \ + --dp-size 2 +``` + +Move .jpg and .npy files from ./demo_0913_n2/deps to /: + +```sh +mkdir -p /share/ +cp -r ./demo_0913_n2/deps/* / +``` + +## Edit Config + +```sh +cd FlagScale/ +vim examples/robobrain_x0_5/conf/train/libero_qwengroot.yaml +``` +Change 4 fields: +- checkpoint_dir: path to model checkpoint +- framework.qwenvl.base_vlm: path to backbone model (for example: qwenvl) checkpoint +- datasets.data_path: path to dataset, for example: ./demo_0913_n2/wds-2 + +## Start Training +```sh +cd FlagScale/ +python run.py --config-path ./examples/robobrain_x0_5/conf --config-name train action=run +``` diff --git a/examples/robotics/conf/train.yaml b/examples/robobrain_x0_5/conf/train.yaml similarity index 62% rename from examples/robotics/conf/train.yaml rename to examples/robobrain_x0_5/conf/train.yaml index 75d3189eca..ae9ca37c79 100644 --- a/examples/robotics/conf/train.yaml +++ b/examples/robobrain_x0_5/conf/train.yaml @@ -1,9 +1,9 @@ defaults: - _self_ - - train: 3_3b + - train: libero_qwengroot experiment: - exp_name: Robotics-3.3B + exp_name: libero_qwengroot seed: 42 save_steps: 10000 load: null @@ -12,7 +12,7 @@ experiment: task: type: train backend: robotics - entrypoint: flagscale/train/train_robotics.py + entrypoint: flagscale/train/train_robotics_qwengroot.py runner: per_node_task: false no_shared_fs: false @@ -22,15 +22,10 @@ experiment: before_start: echo "Starting Robotics Training" envs: LOGLEVEL: "INFO" - CUDA_VISIBLE_DEVICES: "1" + CUDA_VISIBLE_DEVICES: "0,1" CUDA_DEVICE_MAX_CONNECTIONS: 1 - # Set python paths for: robotics, lerobot, openpi-client, FlagScale, Megatron-LM - PYTHONPATH: python/paths - # Set lerobot data path - HF_LEROBOT_HOME: lerobot/data/path WANDB_MODE: offline - action: run hydra: diff --git a/examples/robobrain_x0_5/conf/train/libero_qwengroot.yaml b/examples/robobrain_x0_5/conf/train/libero_qwengroot.yaml new file mode 100644 index 0000000000..b548284784 --- /dev/null +++ b/examples/robobrain_x0_5/conf/train/libero_qwengroot.yaml @@ -0,0 +1,102 @@ +config_path: examples/robobrain_x0_5/conf/train/libero_qwengroot.yaml +seed: 42 +trackers: [jsonl, wandb] +wandb_entity: jinhuiye +wandb_project: StarVLA_Libero +is_debug: false + +batch_size: 2 +resume: false +checkpoint_dir: results/ckpt_in +exp_name: starvla +project_name: starvla +wandb_enabled: false +output_directory: results/ckpt_out +log_freq: 10 +train_steps: 100 + +framework: + name: QwenGR00T + qwenvl: + base_vlm: /repos/flagscale_new_robotics/FlagScale/results/ckpt_in/backbone + # attn_implementation: flash_attention_2 + attn_implementation: eager + vl_hidden_dim: 2048 + dino: + dino_backbone: dinov2_vits14 + action_model: + action_model_type: DiT-L + action_hidden_dim: 1024 + hidden_size: 1024 + add_pos_embed: true + max_seq_len: 1024 + action_dim: 14 + state_dim: 7 + future_action_window_size: 29 + action_horizon: 30 + past_action_window_size: 0 + repeated_diffusion_steps: 8 + noise_beta_alpha: 1.5 + noise_beta_beta: 1.0 + noise_s: 0.999 + num_timestep_buckets: 1000 + num_inference_timesteps: 4 + num_target_vision_tokens: 32 + diffusion_model_cfg: + cross_attention_dim: 2048 + dropout: 0.2 + final_dropout: true + interleave_self_attention: true + norm_type: ada_norm + num_layers: 16 + output_dim: 1024 + positional_embeddings: null + reduce_in_full_precision: true + +datasets: + task_encoder: + vision_root: "" + state_key: eepose + action_horizon: 7 + action_key: eepose + data_path: /repos/flagscale_new_robotics/FlagScale/demo_0913_n2/wds-1 + vlm_data: {} + vla_data: {} + +trainer: + epochs: 10 + max_train_steps: 36000 + num_warmup_steps: 3600 + save_interval: 3600 + eval_interval: 500 + learning_rate: + base: 3.0e-05 + qwen_vl_interface: 1.0e-05 + action_model: 1.0e-04 + lr_scheduler_type: cosine_with_min_lr + scheduler_specific_kwargs: + min_lr: 1.0e-06 + freeze_modules: true + loss_scale: + vla: 1.0 + vlm: 0.1 + max_grad_norm: 1.0 + warmup_ratio: 0.1 + weight_decay: 0.0 + logging_frequency: 10 + gradient_clipping: 1.0 + gradient_accumulation_steps: 1 + optimizer: + name: AdamW + betas: [0.9, 0.95] + eps: 1.0e-08 + weight_decay: 1.0e-08 + is_resume: false + resume_epoch: null + resume_step: null + enable_gradient_checkpointing: true + enable_mixed_precision_training: true + +system: {} +model: {} +data: {} diff --git a/examples/robotics/README.md b/examples/robotics/README.md deleted file mode 100644 index 54149310e0..0000000000 --- a/examples/robotics/README.md +++ /dev/null @@ -1,94 +0,0 @@ -Tips: robotics git repo should be added as a submodule, but it is not published yet. - -# Training -## Install FlagScale -Install training env according to README.md under project root path. -FlagScale training env include: -1. python deps -2. unpatching -3. Megatron-LM -4. Megatron-Energon - - -## Install Robotics Deps -1. Robotics, not publish yet -2. Lerobot, https://github.com/huggingface/lerobot -3. openpi-client -4. python deps, FlagScale/requirements/train/robotics/requirements.txt - - -``` sh -# install lerobot -git clone https://github.com/huggingface/lerobot.git -cd lerobot -pip install -e . - -# install openpi -git clone --recurse-submodules git@github.com:Physical-Intelligence/openpi.git -# Or if you already cloned the repo: -# git submodule update --init --recursive -cd openpi -GIT_LFS_SKIP_SMUDGE=1 uv sync -GIT_LFS_SKIP_SMUDGE=1 uv pip install -e . -cd packages/openpi-client -pip install -e . - -# install other python deps -pip install -r requirements/train/robotics/requirements.txt - -``` - - -## Prepare Dataset -Convert WebDataset to Energon: -``` -cd FlagScale/tools/datasets/qwenvl/ -python convert_robotics.py \ - --dataset-root="" \ - --output-root=/output/path \ - --json=/webdataset/path \ - --train-split 1 \ - --val-split 0 \ - --images-key=image \ - --videos-key=video \ - --actions-key=action \ - --state-key=state \ - --actions-qpos-key=qpos \ - --actions-eepose-key=eepose \ - --state-qpos-key=qpos \ - --state-eepose-key=eepose \ - --vision-root="" \ - --max-samples-per-tar 1000000 \ - --dp-size 1 -``` - -## Edit Config -Config files: -1. examples/robotics/conf/train.yaml -2. examples/robotics/conf/train/3_3b.yaml - -Attributes need to change: -1. PYTHONPATH, including Robotics and Lerobot -2. HF_LEROBOT_HOME -3. ENERGON_DATA_PATH, the ernergon dataset - -## Start Training -``` -cd FlagScale/ -python run.py --config-path ./examples/robotics/conf --config-name train action=start -``` - -# Serving - -## Install FlagScale -Install serving env according to [README](https://github.com/FlagOpen/FlagScale/blob/main/README.md) - -## Edit Config -Config files: examples/robotics/conf/serve/3_3b.yaml -Attributes need to change:: model path - -## Start Serving -``` -cd FlagScale/ -python run.py --config-path ./examples/robotics/conf --config-name serve action=start -``` \ No newline at end of file diff --git a/examples/robotics/client_robotics.py b/examples/robotics/client_robotics.py deleted file mode 100644 index 4a2f3374e0..0000000000 --- a/examples/robotics/client_robotics.py +++ /dev/null @@ -1,142 +0,0 @@ -import argparse -import base64 -import io -import json -import os -import random -import sys -import time - -from pathlib import Path -from typing import Any, Dict, List - -import numpy as np -import requests - -from PIL import Image - -IMG_WIDTH = 640 -IMG_HEIGHT = 480 - - -def encode_image(path: str) -> str: - """Read image as base64 string.""" - path = Path(path) - if not path.exists(): - print(f"[WARNING] Image not found: {path.resolve()}. Use fake images.") - image = Image.new('RGB', (IMG_WIDTH, IMG_HEIGHT)) - buffer = io.BytesIO() - image.save(buffer, format='JPEG', quality=50) - buffer.seek(0) - jpeg_binary = buffer.read() - return base64.b64encode(jpeg_binary).decode("utf-8") - return base64.b64encode(path.read_bytes()).decode("utf-8") - - -def check_health(base_url: str) -> None: - """Ping /health; raise RuntimeError if unhealthy.""" - try: - r = requests.get(f"{base_url}/health", timeout=5) - r.raise_for_status() - except Exception as e: - raise RuntimeError(f"Health-check request failed: {e}") from e - - data = r.json() - if not (data.get("status") == "healthy" and data.get("model_loaded")): - raise RuntimeError(f"Server not ready: {json.dumps(data, indent=2)}") - print(f"[√] Server healthy - GPU: {data['gpu_info']['device_name']}") - - -def build_payload(args) -> Dict[str, Any]: - """Construct JSON payload for /infer.""" - # 1. Dummy robot state (batch=1, dim=args.state_dim) - state = np.random.uniform(-1, 1, size=(1, args.state_dim)).tolist() - # 2. Encode images - img_sample = { - "base_0_rgb": encode_image(args.base_img), - "left_wrist_0_rgb": encode_image(args.left_wrist_img), - "right_wrist_0_rgb": encode_image(args.right_wrist_img), - } - # 3. Image masks (True: image is valid) - image_masks = {"base_0_rgb": True, "left_wrist_0_rgb": True, "right_wrist_0_rgb": True} - return { - "instruction": "Grab the orange and put it into the basket.", - "qpos": [[random.random() for _ in range(args.state_dim)]], - "eef_pose": [[random.random() for _ in range(args.state_dim)]], - "state": state, - "high_level_instruction": args.high_level_instruction, - "fine_grained_instruction": args.fine_grained_instruction, - "images": [img_sample], - "image_masks": [image_masks], - "num_steps": args.num_steps, - "temperature": args.temperature, - "top_p": args.top_p, - "max_new_tokens": args.max_new_tokens, - "do_sample": args.do_sample, - } - - -def pretty_print_resp(resp: requests.Response) -> None: - """Nicely print JSON or raw content.""" - try: - print(json.dumps(resp.json(), indent=2, ensure_ascii=False)) - except ValueError: - print(resp.text) - - -def main(): - parser = argparse.ArgumentParser(description="Client for RoboBrain-Robotics inference API") - parser.add_argument( - "--host", default="127.0.0.1", help="Host of local SSH tunnel (default: 127.0.0.1)" - ) - parser.add_argument( - "--port", type=int, default=15000, help="Port of local SSH tunnel (default: 15000)" - ) - parser.add_argument("--base-img", required=True, help="Path to base camera RGB image") - parser.add_argument( - "--left-wrist-img", required=True, help="Path to left wrist camera RGB image" - ) - parser.add_argument( - "--right-wrist-img", required=True, help="Path to right wrist camera RGB image" - ) - parser.add_argument( - "--state-dim", type=int, default=14, help="Dim of robot low-dim state vector (default: 14)" - ) - parser.add_argument("--num-steps", type=int, default=20) - parser.add_argument("--temperature", type=float, default=0.8) - parser.add_argument("--top-p", type=float, default=0.95) - parser.add_argument("--max-new-tokens", type=int, default=64) - parser.add_argument("--do-sample", action="store_true") - parser.add_argument( - "--high-level-instruction", default="Grab the orange and put it into the basket." - ) - parser.add_argument("--fine-grained-instruction", default=None) - args = parser.parse_args() - - base_url = f"http://{args.host}:{args.port}" - print(f"-> Using endpoint: {base_url}") - - # 1. Health-check - check_health(base_url) - # 2. Build payload - payload = build_payload(args) - # 3. POST /infer - try: - t0 = time.time() - resp = requests.post( - f"{base_url}/infer", - headers={"Content-Type": "application/json"}, - data=json.dumps(payload), - timeout=300, - ) - elapsed = (time.time() - t0) * 1000 - resp.raise_for_status() - except requests.RequestException as e: - print(f"[Error] HTTP request failed: {e}") - sys.exit(1) - print(f"[√] Response OK ({resp.status_code}) - {elapsed:.1f}ms") - pretty_print_resp(resp) - - -if __name__ == "__main__": - main() diff --git a/examples/robotics/conf/serve.yaml b/examples/robotics/conf/serve.yaml deleted file mode 100644 index 6cb96254c7..0000000000 --- a/examples/robotics/conf/serve.yaml +++ /dev/null @@ -1,23 +0,0 @@ -defaults: -- _self_ -- serve: 3_3b - -experiment: - exp_name: robotics-3.3b - exp_dir: outputs/${experiment.exp_name} - task: - type: serve - entrypoint: flagscale/serve/run_serve_robotics.py - runner: - hostfile: null - deploy: - use_fs_serve: false - envs: - CUDA_VISIBLE_DEVICES: 0 - CUDA_DEVICE_MAX_CONNECTIONS: 1 - -action: run - -hydra: - run: - dir: ${experiment.exp_dir}/hydra diff --git a/examples/robotics/conf/serve/3_3b.yaml b/examples/robotics/conf/serve/3_3b.yaml deleted file mode 100644 index 9ea75878e2..0000000000 --- a/examples/robotics/conf/serve/3_3b.yaml +++ /dev/null @@ -1,70 +0,0 @@ -- serve_id: vllm_model - engine_args: - model: local/model/path - host: 0.0.0.0 - uvicorn_log_level: warning - port: 5000 - replay_file: local/replay/file/path - robot_const: - action: - scale_: - - 106.4848318002756 - - 121.17540185642973 - - 91.61284029974587 - - 11.064118950366046 - - 21.47920562161597 - - 10.969909525578773 - - 1.0 - - 106.3521239324965 - - 129.19598707443794 - - 96.1861863966185 - - 11.613623716892482 - - 24.058682506464063 - - 11.74695773541818 - - 1.0 - offset_: - - -0.04674754359100264 - - -0.09009609189717949 - - -0.181073005258242 - - 0.03138326981925532 - - 0.0597944555730352 - - -0.06069438290205942 - - 0.0 - - -0.047356633144646554 - - 0.020214122717063576 - - -0.17154876445894596 - - -0.005643775962900666 - - 0.01532585329409053 - - 0.08908289545186188 - - 0.0 - qpos: - scale_: - - 41.70323763777994 - - 21.168971360638828 - - 25.425799601620582 - - 21.417401277059486 - - 24.523985301844952 - - 21.31249098029433 - - 25.678940569162076 - - 44.430235251694164 - - 22.00498165608864 - - 27.680126656823465 - - 23.922713088264057 - - 24.805529859604945 - - 26.94501691721127 - - 37.675327380566195 - offset_: - - -0.17723109375803703 - - -0.07276127804902932 - - -0.008714227710960976 - - 0.01956853553975879 - - -0.028449304172882384 - - 0.01625876332732834 - - -0.08326199188186922 - - 0.10167133001713591 - - -0.05982476885373389 - - -0.033329971471014797 - - 0.0038598047225062437 - - 0.015997883893533293 - - -0.035970393893265395 - - -0.18677250657125954 diff --git a/examples/robotics/conf/train/3_3b.yaml b/examples/robotics/conf/train/3_3b.yaml deleted file mode 100644 index 07df394fe4..0000000000 --- a/examples/robotics/conf/train/3_3b.yaml +++ /dev/null @@ -1,12 +0,0 @@ -system: {} - -model: - batch_size: 4 - training_dtype: float32 - ckpt_overwrite: true - exp_name: Robotics-3.3B - assets_base_dir: local/assets/dir - -data: - # config_name: robotics_libero - config_name: test_single_data \ No newline at end of file diff --git a/flagscale/models/robotics/groot_action_header.py b/flagscale/models/robotics/groot_action_header.py index 23cce7e6eb..14bb4056e1 100644 --- a/flagscale/models/robotics/groot_action_header.py +++ b/flagscale/models/robotics/groot_action_header.py @@ -273,7 +273,6 @@ def forward(self, vl_embs: torch.Tensor, actions: torch.Tensor, state: torch.Ten # action_mask: for counting valid dimensions in the last axis, used for loss computation actual_action_dim = actions.shape[-1] - # Ensure actions last dim matches D_action, pad with zeros if needed D_action = self.config.action_dim if actions.shape[-1] < D_action: @@ -351,7 +350,6 @@ def predict_action(self, vl_embs: torch.Tensor, state: torch.Tensor = None) -> t num_steps = self.num_inference_timesteps dt = 1.0 / num_steps - state_features = self.state_encoder(state) if state is not None else None # Run denoising steps. diff --git a/flagscale/models/robotics/qwen_groot.py b/flagscale/models/robotics/qwen_groot.py index 5513d2abec..19c47135e3 100644 --- a/flagscale/models/robotics/qwen_groot.py +++ b/flagscale/models/robotics/qwen_groot.py @@ -90,9 +90,14 @@ def forward(self, examples: List[dict] = None, **kwargs) -> Tuple: # Step 4: Action Expert Forward and Loss with torch.autocast("cuda", dtype=torch.float32): # [B, T_full, action_dim] - actions = torch.tensor( - np.array(actions), device=last_hidden.device, dtype=last_hidden.dtype - ) + if isinstance(actions[0], torch.Tensor): + actions = torch.stack(actions, dim=0).to( + device=last_hidden.device, dtype=last_hidden.dtype + ) + else: + actions = torch.tensor( + np.array(actions), device=last_hidden.device, dtype=last_hidden.dtype + ) actions_target = actions[ :, -(self.future_action_window_size + 1) :, : ] # (B, chunk_len, action_dim) @@ -107,9 +112,15 @@ def forward(self, examples: List[dict] = None, **kwargs) -> Tuple: state_repeated = None if state is not None: - state = torch.tensor( - np.array(state), device=last_hidden.device, dtype=last_hidden.dtype - ) + + if isinstance(state[0], torch.Tensor): + state = torch.stack(state, dim=0).to( + device=last_hidden.device, dtype=last_hidden.dtype + ) + else: + state = torch.tensor( + np.array(state), device=last_hidden.device, dtype=last_hidden.dtype + ) state_repeated = state.repeat(repeated_diffusion_steps, 1, 1) action_loss = self.action_model( @@ -247,3 +258,97 @@ def resize_images(images, target_size=(224, 224)): return [resize_images(img, target_size) for img in images] else: raise ValueError("Unsupported image type or structure.") + + +def dryrun_with_random_sample(cfg): + """ + Test Qwen-GR00T model with fake data. + """ + # model: Qwen_GR00T = Qwen_GR00T(cfg) + model = Qwen_GR00T.from_pretrained(cfg.checkpoint_dir) + # fake sample + image = Image.fromarray(np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)) + # Create a sample + sample = { + "action": np.random.uniform(-1, 1, size=(16, 7)).astype( + np.float16 + ), # action_chunk, action_dim + "image": [image, image], # two views + "lang": "This is a fake for testing.", + "state": np.random.uniform(-1, 1, size=(1, 7)).astype(np.float16), # chunk, state_dim + } + + batch = [sample, sample] # batch size 2 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = model.to(device) + + # test predict action + for _ in range(3): + predict_output = model.predict_action( + batch_images=[batch[0]["image"]], + instructions=[batch[0]["lang"]], + state=[batch[0]["state"]], + ) + normalized_actions = predict_output['normalized_actions'] + print(f"Unnormalized Action: {normalized_actions.shape}") + print(f"{normalized_actions[0,0,:]=}") + + forward_output = model(batch) + action_loss = forward_output['action_loss'] + print(f"Action Loss: {action_loss.item()}") + + model.save_pretrained() + + +def dryrun_with_dataloader(cfg): + model: Qwen_GR00T = Qwen_GR00T(cfg) + + from megatron.energon import WorkerConfig, get_loader, get_train_dataset + from tools.datasets.vla.data.dataset_helpers_np_pil import TaskEncoder + + ds = get_train_dataset( + cfg.datasets.data_path, + batch_size=1, + shuffle_buffer_size=100, + max_samples_per_sequence=100, + worker_config=WorkerConfig.default_worker_config(num_workers=1, data_parallel_group=None), + task_encoder=TaskEncoder(cfg.datasets.task_encoder), + repeat=True, + ) + vla_train_dataloader = get_loader(ds) + data_iter = iter(vla_train_dataloader) + batch = next(data_iter) + batch = get_batch(batch) + + # try get model + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = model.to(device) + model(batch) + forward_output = model(batch) + action_loss = forward_output['action_loss'] + print(f"Action Loss: {action_loss.item()}") + + action = model.predict_action(batch_images=[batch[0]["image"]], instructions=[batch[0]["lang"]]) + print(f"Action inference: {action['normalized_actions'].shape}") + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config_yaml", + type=str, + default="./examples/robobrain_x0_5/conf/train/libero_qwengroot.yaml", + help="Path to YAML config", + ) + parser.add_argument("--dryrun-dataloader", action="store_true") + parser.add_argument("--dryrun-random", action="store_true") + + args, clipargs = parser.parse_known_args() + cfg = OmegaConf.load(args.config_yaml) + + if args.dryrun_dataloader: + dryrun_with_dataloader(cfg) + if args.dryrun_random: + dryrun_with_random_sample(cfg) diff --git a/flagscale/runner/runner_train.py b/flagscale/runner/runner_train.py index e8b9c30458..2bc09e6029 100644 --- a/flagscale/runner/runner_train.py +++ b/flagscale/runner/runner_train.py @@ -57,12 +57,8 @@ def _get_args_robotics(config: DictConfig): config_dict = OmegaConf.to_container(config, resolve=True) config_dict = config_dict["train"] - new_config_dict = {} - new_config_dict.update(config_dict["model"]) - ignore_keys = ["log_dir", "details_dir", "scripts_dir", "pids_dir"] - # Flatten the dictionary to a list of arguments - args = flatten_dict_to_args(new_config_dict, ignore_keys) - args = [config_dict["data"]["config_name"]] + args + new_config_dict = {"config_path": config_dict["config_path"]} + args = flatten_dict_to_args(new_config_dict) return args @@ -269,7 +265,7 @@ def _generate_run_script_train( f.write(f"\n") f.write(f"cd {root_dir}\n") f.write(f"\n") - f.write(f"export PYTHONPATH={megatron_dir}:{root_dir}:${{PYTHONPATH}}\n") + f.write(f"export PYTHONPATH={root_dir}:{megatron_dir}:${{PYTHONPATH}}\n") f.write(f"\n") f.write(f'cmd="{cmd}"\n') f.write(f"\n") diff --git a/flagscale/train/train_robotics_qwengroot.py b/flagscale/train/train_robotics_qwengroot.py new file mode 100644 index 0000000000..f9166c82f3 --- /dev/null +++ b/flagscale/train/train_robotics_qwengroot.py @@ -0,0 +1,201 @@ +# Adopted from starVLA/starVLA: +# https://github.com/starVLA/starVLA/blob/starVLA/starVLA/training/train_starvla.py +# Below is the original copyright: + +# Copyright 2025 starVLA community. All rights reserved. +# Licensed under the MIT License, Version 1.0 (the "License"); +# Implemented by [Jinhui YE / HKUST University] in [2025]. + +import argparse +import os +import pathlib +import platform +import random + +from typing import Tuple + +import epath +import numpy as np +import torch +import torch.distributed as dist + +from omegaconf import OmegaConf +from torch.nn.parallel import DistributedDataParallel as DDP +from transformers import get_scheduler + +import wandb + +from megatron.energon import WorkerConfig, get_loader, get_train_dataset +from tools.datasets.vla.data.dataset_helpers_np_pil import TaskEncoder + +from flagscale.logger import logger +from flagscale.models.robotics.qwen_groot import Qwen_GR00T, get_batch + +# Sane Defaults +os.environ["TOKENIZERS_PARALLELISM"] = "false" + + +def build_param_lr_groups(model, cfg): + lr_cfg = cfg.trainer.learning_rate + base_lr = lr_cfg.get("base", 1e-4) # default base learning rate + + used_params = set() + param_groups = [] + + for module_name, lr in lr_cfg.items(): + if module_name == "base": + continue + # try to find the module under vla by module_name (support nested paths) + module = model + try: + for attr in module_name.split("."): + module = getattr(module, attr) + params = list(module.parameters()) + param_groups.append({"params": params, "lr": lr, "name": module_name}) + used_params.update(id(p) for p in params) + except AttributeError: + ReferenceError(f"⚠️ module path `{module_name}` not found in vla") + + # assign base learning rate to the remaining unused parameters + other_params = [p for p in model.parameters() if id(p) not in used_params] + if other_params: + param_groups.append({"params": other_params, "lr": base_lr, "name": "base"}) + + return param_groups + + +def setup_optimizer_and_scheduler( + model, cfg +) -> Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler._LRScheduler]: + """set optimizer and scheduler""" + # initialize optimizer + param_groups = build_param_lr_groups(model=model, cfg=cfg) + optimizer = torch.optim.AdamW( + param_groups, + lr=cfg.trainer.learning_rate.base, + betas=tuple(cfg.trainer.optimizer.betas), + weight_decay=cfg.trainer.optimizer.weight_decay, + eps=cfg.trainer.optimizer.eps, + ) + + # print optimizer group info + if dist.is_initialized() and dist.get_rank() == 0: + for i, group in enumerate(optimizer.param_groups): + logger.info( + f"LR Group {group['name']}: lr={group['lr']}, num_params={len(group['params'])}" + ) + + # initialize learning rate scheduler + lr_scheduler = get_scheduler( + name=cfg.trainer.lr_scheduler_type, + optimizer=optimizer, + num_warmup_steps=cfg.trainer.num_warmup_steps, + num_training_steps=cfg.trainer.max_train_steps, + scheduler_specific_kwargs=cfg.trainer.scheduler_specific_kwargs, # minimum learning rate + ) + + return optimizer, lr_scheduler + + +def init_ddp(seed): + torch.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + local_rank = int(os.environ["LOCAL_RANK"]) + torch.cuda.set_device(local_rank) + torch.distributed.init_process_group(backend='nccl', init_method='env://') + torch.backends.cudnn.enabled = True + torch.backends.cudnn.benchmark = True + torch.backends.cudnn.deterministic = True + return local_rank + + +def init_wandb(config, *, resuming: bool, log_code: bool = False, enabled: bool = True): + if not enabled: + wandb.init(mode="disabled") + return + + ckpt_dir = pathlib.Path(config.checkpoint_dir) + if not ckpt_dir.exists(): + raise FileNotFoundError(f"Checkpoint directory {ckpt_dir} does not exist.") + if resuming: + run_id = (ckpt_dir / "wandb_id.txt").read_text().strip() + wandb.init(id=run_id, resume="must", project=config.project_name) + else: + wandb.init(name=config.exp_name, config=vars(config), project=config.project_name) + (ckpt_dir / "wandb_id.txt").write_text(wandb.run.id) + + if log_code: + wandb.run.log_code(epath.Path(__file__).parent.parent) + + +def main(cfg) -> None: + # build model + vla = Qwen_GR00T(cfg) + # prepare data + ds = get_train_dataset( + cfg.datasets.data_path, + batch_size=cfg.batch_size, + shuffle_buffer_size=100, + max_samples_per_sequence=100, + worker_config=WorkerConfig.default_worker_config(num_workers=1, data_parallel_group=None), + task_encoder=TaskEncoder(cfg.datasets.task_encoder), + repeat=True, + ) + vla_train_dataloader = get_loader(ds) + data_iter = iter(vla_train_dataloader) + batch = next(data_iter) + + # set optimizer and scheduler + optimizer, lr_scheduler = setup_optimizer_and_scheduler(model=vla, cfg=cfg) + # Run VLA Training + local_rank = init_ddp(cfg.seed) + if dist.get_rank() == 0 and local_rank == 0: + logger.info(f"Running on: {platform.node()}") + if cfg.batch_size % torch.cuda.device_count() != 0: + raise ValueError( + f"Batch size {cfg.batch_size} must be divisible by the number of devices {torch.cuda.device_count()}." + ) + resuming = cfg.resume + init_wandb(cfg, resuming=resuming, enabled=cfg.wandb_enabled) + + vla = vla.cuda() + vla = DDP(vla, device_ids=[int(os.environ["LOCAL_RANK"])], find_unused_parameters=True) + + step = 0 + done = False + while not done: + batch = next(data_iter) + batch = get_batch(batch) + output_dict = vla.forward(batch) + action_loss = output_dict["action_loss"] + action_loss.backward() + optimizer.step() + lr_scheduler.step() + optimizer.zero_grad() + + if step % cfg.log_freq == 0: + logger.info(f"step: {step} loss: {action_loss.item():.3f}") + step += 1 + if step >= cfg.train_steps: + done = True + break + + if dist.get_rank() == 0 and local_rank == 0: + vla.module.save_pretrained() + + dist.barrier() + dist.destroy_process_group() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-path", + type=str, + default="examples/robotics/conf/train/libero_qwengroot.yaml", + help="Path to YAML config", + ) + args, clipargs = parser.parse_known_args() + cfg = OmegaConf.load(args.config_path) + main(cfg) diff --git a/tools/datasets/vla/data/dataset_helpers_np_pil.py b/tools/datasets/vla/data/dataset_helpers_np_pil.py new file mode 100644 index 0000000000..a8953b912c --- /dev/null +++ b/tools/datasets/vla/data/dataset_helpers_np_pil.py @@ -0,0 +1,66 @@ +import json +import logging + +from typing import List + +import numpy as np +import PIL + +from megatron.energon import DefaultTaskEncoder +from tools.datasets.vla.data.energon.chatml import ChatMLSample + +dataset_logger = logging.getLogger(__name__) + + +class TaskEncoder(DefaultTaskEncoder[ChatMLSample, ChatMLSample, ChatMLSample, ChatMLSample]): + def __init__(self, config): + super().__init__() + self.config = config + self.vision_root = config.vision_root + return + + def encode_sample(self, sample: ChatMLSample) -> dict: + conversation = ( + json.loads(sample.conversation) + if isinstance(sample.conversation, (str, bytes)) + else sample.conversation + ) + # For PI0 token is useless, the position of image embeddings are fixed + task = conversation["conversations"][0]["value"].replace("", "") + + imgs = [] + for i in sample.imgs: + image = PIL.Image.open(i) + imgs.append(image) + + state_paths = sample.metadata['state'][self.config.state_key] + state = np.load(state_paths)[0] + if state.shape[0] < self.config.action_horizon: + pad_width = self.config.action_horizon - state.shape[0] + state = np.pad(state, (0, pad_width), mode='constant') + elif state.shape[0] > self.config.action_horizon: + state = state[: self.config.action_horizon] + + action_paths = sample.metadata['action'][self.config.action_key] + action = np.load(action_paths) + if action.shape[1] < self.config.action_horizon: + pad_width = self.config.action_horizon - action.shape[1] + action = np.pad(action, ((0, 0), (0, pad_width)), mode='constant') + elif action.shape[1] > self.config.action_horizon: + action = action[:, : self.config.action_horizon] + + batch = { + 'task': task, + 'observation.images.camera0': imgs[0], + 'observation.images.camera1': imgs[1], + 'observation.images.camera2': imgs[2], + 'observation.state': state.astype(np.float16), + 'action': action.astype(np.float16), + } + return batch + + def batch(self, samples: List[dict]) -> dict: + return samples + + def encode_batch(self, samples: dict) -> dict: + return samples