Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions flagscale/runner/auto_tuner/simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Environment
Begin at the root path of `FlagScale` repository:
```
cd flagscale/flagscale/runner/auto_tuner/simulator/custom_backend/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The path flagscale/flagscale/... seems to contain a duplicate flagscale directory. Based on the project structure, the command should likely be cd flagscale/runner/auto_tuner/simulator/custom_backend/ if run from the repository root. Please verify and correct the path to avoid confusion for users.

python setup.py develop
```

# Setup
Set necessary parameters in `config_gen.py`. For example:
```
device_type_list = ["A", "B"]
device_num_list = [4, 4]
global_batch_size = 32
num_micro_batches = 8
num_layers = 4
```
# Run a Task
Start the auto-tuning:
```
export PYTHONPATH=/****/FlagScale:$PYTHONPATH
export PYTHONPATH=$PYTHONPATH:/***/FlagScale/third_party/Megatron-LM

python flagscale/runner/auto_tuner/simulator/config_gen.py
225 changes: 225 additions & 0 deletions flagscale/runner/auto_tuner/simulator/analylize_pipeline_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import os
import re
import subprocess
import time

# from megatron.training import get_args


def kill_other_python_processes():
current_pid = os.getpid()
clear_cmd = f"pkill -f python -o --signal TERM --ignore \"${current_pid}\""
subprocess.run(clear_cmd, text=True, shell=True)


def compute_pipeline_parallelism_cost(
scheme: str = '1F1B',
# num_stages: int=1,
num_micro_batches: int = 1,
process_mesh: list = None,
pp_layers_split: list = None,
fwd_time_per_stage_chunk: list = None,
bwd_time_per_stage_chunk: list = None,
comm_time_between_stages: list = None,
# TODO: add fine-greaied recomputation
):
print(f"--- Compute Pipeline Cost ---")

# process_mesh: [tp0,cp0,ep0,dp0,pp0,(tp1,cp1,...)]
# comm_time_between_stages[i] means the comm time between stage i-1 and stage i
num_pp_stages = sum(process_mesh[4::5])
assert (
len(pp_layers_split) == num_pp_stages
), "\flength of list {num_layers_per_stage} should match {num_stages}"
assert (
len(fwd_time_per_stage_chunk) == num_pp_stages
), "\flength of list {fwd_time_per_stage_chunk} should match {num_stages}"
assert (
len(bwd_time_per_stage_chunk) == num_pp_stages
), "\flength of list {bwd_time_per_stage_chunk} should match {num_stages}"
assert (
len(comm_time_between_stages) == num_pp_stages
), "\flength of list {comm_time_between_stages} should match {num_stages}"
Comment on lines 35 to 47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion messages on lines 33, 36, 39, and 42 use \f which is a form-feed character. This is likely a typo and was intended to be an f-string for proper message formatting. Additionally, the variable names inside the string are incorrect. For example, on line 33, {num_layers_per_stage} should be {len(pp_layers_split)} and {num_stages} should be {num_pp_stages}.

Suggested change
), "\flength of list {num_layers_per_stage} should match {num_stages}"
assert (
len(fwd_time_per_stage_chunk) == num_pp_stages
), "\flength of list {fwd_time_per_stage_chunk} should match {num_stages}"
assert (
len(bwd_time_per_stage_chunk) == num_pp_stages
), "\flength of list {bwd_time_per_stage_chunk} should match {num_stages}"
assert (
len(comm_time_between_stages) == num_pp_stages
), "\flength of list {comm_time_between_stages} should match {num_stages}"
), f"length of list pp_layers_split {len(pp_layers_split)} should match num_pp_stages {num_pp_stages}"
assert (
len(fwd_time_per_stage_chunk) == num_pp_stages
), f"length of list fwd_time_per_stage_chunk {len(fwd_time_per_stage_chunk)} should match num_pp_stages {num_pp_stages}"
assert (
len(bwd_time_per_stage_chunk) == num_pp_stages
), f"length of list bwd_time_per_stage_chunk {len(bwd_time_per_stage_chunk)} should match num_pp_stages {num_pp_stages}"
assert (
len(comm_time_between_stages) == num_pp_stages
), f"length of list comm_time_between_stages {len(comm_time_between_stages)} should match num_pp_stages {num_pp_stages}"


pp_last_stage_time = num_micro_batches * (
fwd_time_per_stage_chunk[num_pp_stages - 1] + bwd_time_per_stage_chunk[num_pp_stages - 1]
)
if num_pp_stages == 1:
return num_micro_batches * (
fwd_time_per_stage_chunk[num_pp_stages - 1]
+ bwd_time_per_stage_chunk[num_pp_stages - 1]
)

pipeline_cost = 0
# TODO: consider when comm time > comp time
# each stage onlt depends on its next stage
if scheme == '1F1B' or scheme == 'AFAB':
pipeline_cost = pp_last_stage_time
for stage_from_last in range(2, num_pp_stages):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The loop for stage_from_last in range(2, num_pp_stages): seems to have an off-by-one error. It will not iterate over all the necessary stages. For example, if num_pp_stages is 3, the loop only runs for stage_from_last = 2, missing the calculation for the first stage (index 0). To include all stages from the second-to-last down to the first, the range should be range(2, num_pp_stages + 1).

Suggested change
for stage_from_last in range(2, num_pp_stages):
for stage_from_last in range(2, num_pp_stages + 1):

pp_this_stage_overlapped_time = (num_micro_batches - 1) * (
fwd_time_per_stage_chunk[num_pp_stages - 1]
+ bwd_time_per_stage_chunk[num_pp_stages - 1]
)
pp_this_stage_compute_time = (
fwd_time_per_stage_chunk[num_pp_stages - stage_from_last]
+ bwd_time_per_stage_chunk[num_pp_stages - stage_from_last]
)
pp_last_stage_overall_time = (
pipeline_cost + 2 * comm_time_between_stages[num_pp_stages - stage_from_last + 1]
)
# not consider the situation that comm stucks the comp
# which means the comm time should no more than the comp time(fwd time)
pipeline_cost = pp_this_stage_compute_time + max(
pp_last_stage_overall_time, pp_this_stage_overlapped_time
)
else:
raise (ValueError("Scheme must be '1F1B' or 'AFAB'."))

return pipeline_cost


import random


def simulator(
process_mesh: list = None,
stage: int = 0,
num_layers: int = None,
simulated_rank: int = None,
pp_layers_split: list = None,
):

# os.environ["PYTHONPATH"] = "/share/project/heyongzhe/FlagScale/megatron:/share/project/heyongzhe/FlagScale"
os.environ["PYTHONPATH"] = (
"/workspace/20251010/new/FlagScale:"
"/workspace/20251010/new/FlagScale/third_party/Megatron-LM"
)
Comment on lines 131 to 134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Hardcoding the PYTHONPATH makes the script brittle and not easily portable. These paths should be configurable, for example, by passing them as command-line arguments or reading them from a configuration file.

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["CUDA_DEVICE_MAX_CONNECTIONS"] = "1"
os.environ["RANK"] = str(simulated_rank)
os.environ["LOCAL_RANK"] = str(simulated_rank)
# os.environ["WORLD_SIZE"] = args.world_size
os.environ["WORLD_SIZE"] = "8"
# os.environ["WORLD_SIZE"] = "32"
rdav_endpoint = random.randint(0, 40000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a typo in the variable name rdav_endpoint. It should be rdzv_endpoint to match the environment variable RDZV_ENDPOINT set on the next line. This will cause a NameError.

Suggested change
rdav_endpoint = random.randint(0, 40000)
rdzv_endpoint = random.randint(0, 40000)

os.environ["RDZV_ENDPOINT"] = "localhost:" + str(rdav_endpoint)
# os.environ["RZDV_ENDPOINT"]="localhost:37832"
os.environ["RDZV_BACKEND"] = "c10d"
os.environ["MASTER_ADDR"] = "localhost"

program_entry = " ./flagscale/train/train_aquila_sft.py "
simulation_arguments = " --enable-hetero --enable-simulator --distributed-backend dummy "
# fine_grained_recomputation_args = "--recompute-granularity-per-stage-micro-batch '[1, 1, 1]' --recompute-method-per-stage-micro-batch '[1, 1, 1]' --recompute-num-layers-per-stage-micro-batch '[1, 1, 1]'"
fine_grained_recomputation_args = ""
# print(stage)

pp_layer_split_args = " --hetero-pipeline-layer-split "
for layers in pp_layers_split:
pp_layer_split_args = pp_layer_split_args + str(layers) + " "

process_mesh_str = " --hetero-process-meshes "
for dim in process_mesh:
process_mesh_str = process_mesh_str + str(dim) + " "

num_pp_stages = sum(process_mesh[4::5])
pp_size_args = " --pipeline-model-parallel-size " + str(num_pp_stages) + " "

# TODO: too ugly to show this command in the code, re-organize these parameters in another way later
train_command = (
"python "
+ program_entry
+ "--tensor-model-parallel-size 1 --timing-log-level 2 --disable-bias-linear --use-flash-attn --sequence-parallel --use-distributed-optimizer --use-mcore-models --transformer-impl transformer_engine --hetero-device-types A800 BI150 --hetero-current-device-type A800 --recompute-granularity full --recompute-method uniform --recompute-num-layers 1 --bf16 --attention-softmax-in-fp32 --accumulate-allreduce-grads-in-fp32 --log-interval 1 --log-throughput --tensorboard-log-interval 1 --wandb-project aquila2 --wandb-exp-name test --tensorboard-dir /share/project/heyongzhe/FlagScale/outputs/tensorboard --wandb-save-dir /share/project/heyongzhe/FlagScale/outputs/wandb --num-layers 32 --hidden-size 4096 --num-attention-heads 32 --seq-length 2048 --max-position-embeddings 2048 --norm-epsilon 1e-05 --use-rotary-position-embeddings --no-position-embedding --swiglu --multiple-of 256 --normalization RMSNorm --untie-embeddings-and-output-weights --init-method-std 0.0165 --attention-dropout 0.0 --hidden-dropout 0.0 --weight-decay 0.1 --clip-grad 1.0 --train-samples 128 --global-batch-size 64 --micro-batch-size 1 --seed 42 --lr 0.0002 --weight-decay 0.01 --adam-beta1 0.9 --adam-beta2 0.95 --lr 0.00015 --min-lr 1.5e-05 --lr-warmup-samples 0 --lr-decay-style cosine --data-path /workspace/FlagScale/datapath/pile_wikipedia_demo --split 1 --tokenizer-type AquilaTokenizerFS --vocab-file ./examples/aquila/tokenizer/vocab.json --merge-file ./examples/aquila/tokenizer/merges.txt --special-tokens-file ./examples/aquila/tokenizer/special_tokens.txt --vocab-size 100008 "
+ process_mesh_str
+ simulation_arguments
+ pp_layer_split_args
+ fine_grained_recomputation_args
+ pp_size_args
)
Comment on lines 166 to 175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The train_command is constructed as a very long and hardcoded string. This is extremely difficult to read, maintain, and debug. As noted in the TODO, this should be refactored. Consider using a list of arguments for subprocess.run and building it programmatically from a configuration object or dictionary. This would also mitigate the security risk of using shell=True.


# enough sleeping time is needed to really kill the survival megatron process
# as least 5 sec before & after killing can not succeed every time
print("sleeping...")
# print(train_command)
# time.sleep(10)
kill_other_python_processes()
# time.sleep(10)
print("start...")
result = subprocess.run(train_command, capture_output=True, text=True, shell=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using shell=True with subprocess.run can be a security risk, especially when the command string is constructed from variables. It's vulnerable to shell injection. It is safer to pass the command and its arguments as a list of strings, with shell=False (the default).

output = result.stdout.strip()
print(train_command)
print(output)
# example output: "[simulatior output] forward: 12.34, backward: 56.78, communication: 90.12"
match = re.search(r"forward:\s*([\d.]+),\s*backward:\s*([\d.]+)", output)
if match:
fwd_time = float(match.group(1))
bwd_time = float(match.group(2))
# comm_time = float(match.group(3))
comm_time = 0.01
print("forward:", fwd_time)
print("backward:", bwd_time)
print("communication:", comm_time)
else:
# print(fwd_time,bwd_time,comm_time)
fwd_time = 12.34
bwd_time = 56.78
comm_time = 90.12
print("forward:", fwd_time)
print("backward:", bwd_time)
print("communication:", comm_time)
# raise(ValueError("Results not found. Example output: \"[simulatior output] forward: 12.34, backward: 56.78, communication: 90.12\""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If the regex match fails to find the forward and backward times, the code falls back to using hardcoded dummy values without raising an error (the raise is commented out). This can lead to silent failures and incorrect simulation results. The else block should raise an exception to signal that the simulation output could not be parsed.

Suggested change
else:
# print(fwd_time,bwd_time,comm_time)
fwd_time = 12.34
bwd_time = 56.78
comm_time = 90.12
print("forward:", fwd_time)
print("backward:", bwd_time)
print("communication:", comm_time)
# raise(ValueError("Results not found. Example output: \"[simulatior output] forward: 12.34, backward: 56.78, communication: 90.12\""))
else:
raise(ValueError(f"Results not found in simulator output: {output}. Example output: \"[simulatior output] forward: 12.34, backward: 56.78, communication: 90.12\""))

return fwd_time, bwd_time, comm_time


# call simulator to obtain the execution of each stage
def simulate_pipeline_parallelism_per_stage_time(
process_mesh: list = None,
pp_layers_split: list = None,
fwd_time_per_stage_chunk: list = None,
bwd_time_per_stage_chunk: list = None,
comm_time_between_stages: list = None,
):
print(f"--- Simulation Begin ---")
print(f"Process Mesh: {process_mesh}")
print(f"PP Layer Split: {pp_layers_split}")
for stage, num_layers in enumerate(pp_layers_split):
# TODO: confirm simulated_rank for different stage
print(f"Stage: {stage}; Num Layers: {num_layers}")
simulated_rank = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The simulated_rank is hardcoded to 0. For multi-stage pipeline simulations, this is likely incorrect as different stages will run on different ranks. This should be determined based on the stage and process_mesh to ensure the simulation is accurate.

fwd_time, bwd_time, comm_time = simulator(
process_mesh, stage, num_layers, simulated_rank, pp_layers_split
)
fwd_time_per_stage_chunk.append(fwd_time)
bwd_time_per_stage_chunk.append(bwd_time)
comm_time_between_stages.append(comm_time)
print(f"--- Simulation End ---")


def analyze_pp_time(
scheme: str = '1F1B',
num_micro_batches: int = 1,
process_mesh: list = None,
pp_layers_split: list = None,
):
fwd_time_per_stage_chunk = []
bwd_time_per_stage_chunk = []
comm_time_between_stages = []

simulate_pipeline_parallelism_per_stage_time(
process_mesh=process_mesh,
pp_layers_split=pp_layers_split,
fwd_time_per_stage_chunk=fwd_time_per_stage_chunk,
bwd_time_per_stage_chunk=bwd_time_per_stage_chunk,
comm_time_between_stages=comm_time_between_stages,
)

pipeline_cost = compute_pipeline_parallelism_cost(
scheme=scheme,
num_micro_batches=num_micro_batches,
process_mesh=process_mesh,
pp_layers_split=pp_layers_split,
fwd_time_per_stage_chunk=fwd_time_per_stage_chunk,
bwd_time_per_stage_chunk=bwd_time_per_stage_chunk,
comm_time_between_stages=comm_time_between_stages,
)

return pipeline_cost
Loading
Loading