-
Notifications
You must be signed in to change notification settings - Fork 124
[Elastic] add performance monitor #892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @wanglei19991004, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances FlagScale's training capabilities by introducing a dedicated performance monitoring system. The new module allows users to gain deep insights into their model's training efficiency by tracking crucial metrics like TFLOPS, throughput, and GPU memory consumption. It offers flexible logging, detailed breakdowns of computational operations, and supports a wide range of modern large language models, making it easier to optimize and understand training performance. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive performance monitoring module to FlagScale, which is a valuable addition for tracking and optimizing model training. The module is well-structured, with separate components for FLOPS calculation, logging, and integration. The inclusion of examples and unit tests is also commendable. My review focuses on improving the accuracy of some FLOPS formulas, enhancing maintainability by addressing potential issues like global state and code duplication, and refining error handling. Overall, this is a strong contribution that significantly enhances the project's monitoring capabilities.
| def layernorm_flops(batch_size: int, seq_length: int, hidden_size: int) -> float: | ||
| """ | ||
| Calculate FLOPS for LayerNorm. | ||
| LayerNorm involves: | ||
| 1. Computing mean and variance | ||
| 2. Normalization | ||
| 3. Scale and shift | ||
| Args: | ||
| batch_size: Batch size | ||
| seq_length: Sequence length | ||
| hidden_size: Hidden dimension | ||
| Returns: | ||
| Total FLOPS for LayerNorm | ||
| """ | ||
| elements = batch_size * seq_length * hidden_size | ||
|
|
||
| # Mean computation: hidden_size - 1 additions per element | ||
| mean_flops = elements | ||
|
|
||
| # Variance computation: 2 * hidden_size operations per element | ||
| variance_flops = 2 * elements | ||
|
|
||
| # Normalization: 2 operations per element (subtract mean, divide by std) | ||
| norm_flops = 2 * elements | ||
|
|
||
| # Scale and shift: 2 operations per element | ||
| affine_flops = 2 * elements | ||
|
|
||
| return mean_flops + variance_flops + norm_flops + affine_flops |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calculation for layernorm_flops appears to be a rough approximation, particularly for mean_flops and variance_flops. For instance, mean_flops is set to elements, but a more precise calculation would be (batch_size * seq_length) * (hidden_size - 1). Similarly, variance_flops is 2 * elements, but it involves subtractions, squares, and a summation, which would be closer to 3 * elements. Using more accurate formulas will improve the fidelity of the overall FLOPS estimation.
def layernorm_flops(batch_size: int, seq_length: int, hidden_size: int) -> float:
"""
Calculate FLOPS for LayerNorm.
LayerNorm involves:
1. Computing mean and variance
2. Normalization
3. Scale and shift
Args:
batch_size: Batch size
seq_length: Sequence length
hidden_size: Hidden dimension
Returns:
Total FLOPS for LayerNorm
"""
elements = batch_size * seq_length * hidden_size
tokens = batch_size * seq_length
# Mean: (H-1) adds per token.
mean_flops = tokens * (hidden_size - 1)
# Variance: H subs, H muls, (H-1) adds per token.
variance_flops = tokens * (hidden_size + hidden_size + hidden_size - 1)
# Normalization: H subs, H divs.
norm_flops = elements * 2
# Scale and shift: H muls, H adds.
affine_flops = elements * 2
return mean_flops + variance_flops + norm_flops + affine_flops| def gradient_accumulation_factor(micro_batch_size: int, global_batch_size: int) -> float: | ||
| """ | ||
| Calculate the gradient accumulation factor. | ||
| This doesn't add FLOPS but affects memory usage. | ||
| Args: | ||
| micro_batch_size: Micro-batch size per GPU | ||
| global_batch_size: Global batch size | ||
| Returns: | ||
| Number of accumulation steps | ||
| """ | ||
| return global_batch_size / micro_batch_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name gradient_accumulation_factor is slightly misleading, as it calculates the number of accumulation steps, not a factor. Renaming it to gradient_accumulation_steps would improve clarity. Additionally, using float division / could produce a non-integer result if global_batch_size is not perfectly divisible by micro_batch_size. It would be more robust to use integer division // and return an int.
| def gradient_accumulation_factor(micro_batch_size: int, global_batch_size: int) -> float: | |
| """ | |
| Calculate the gradient accumulation factor. | |
| This doesn't add FLOPS but affects memory usage. | |
| Args: | |
| micro_batch_size: Micro-batch size per GPU | |
| global_batch_size: Global batch size | |
| Returns: | |
| Number of accumulation steps | |
| """ | |
| return global_batch_size / micro_batch_size | |
| def gradient_accumulation_steps(micro_batch_size: int, global_batch_size: int) -> int: | |
| """ | |
| Calculate the number of gradient accumulation steps. | |
| This doesn't add FLOPS but affects memory usage. | |
| Args: | |
| micro_batch_size: Micro-batch size per GPU | |
| global_batch_size: Global batch size | |
| Returns: | |
| Number of accumulation steps | |
| """ | |
| if micro_batch_size <= 0: | |
| return 1 | |
| return global_batch_size // micro_batch_size |
| from flagscale.runner.monitor.perf_metrics import FLOPSMeasurementCallback | ||
|
|
||
| # Global variable to store the performance monitor callback | ||
| _perf_monitor_callback: Optional[FLOPSMeasurementCallback] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of a global variable _perf_monitor_callback introduces global state, which can make the code harder to maintain, reason about, and test. A more robust approach would be to pass the performance monitor instance explicitly where it's needed, for instance, through a context object or as a function argument. This would improve modularity and make dependencies clearer. The integration.py file already provides a good example of this pattern.
| ) | ||
| if json_file.exists(): | ||
| json_file.unlink() | ||
| except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad Exception can hide unexpected errors and mask bugs. It's better to catch more specific exceptions that you anticipate might occur during file operations, such as OSError. This will make your error handling more robust and predictable.
| except Exception as e: | |
| except OSError as e: |
| def get_flops_breakdown(self) -> Dict[str, float]: | ||
| """ | ||
| Get detailed FLOPS breakdown by component. | ||
| Returns: | ||
| Dictionary with FLOPS for different components | ||
| """ | ||
| if get_num_microbatches is not None: | ||
| num_micro_batches = get_num_microbatches() | ||
| else: | ||
| num_micro_batches = getattr( | ||
| self.args, 'num_micro_batches', getattr(self.args, 'gradient_accumulation_steps', 1) | ||
| ) | ||
| micro_batch_size = getattr(self.args, 'micro_batch_size', 1) | ||
| batch_size = micro_batch_size * num_micro_batches if micro_batch_size else num_micro_batches | ||
|
|
||
| if self.model_type in ['gpt', 'llama', 'qwen', 'aquila']: | ||
| args = self.args | ||
| # Extract configuration with safe access | ||
| seq_length = getattr(args, 'seq_length', 512) | ||
| hidden_size = getattr(args, 'hidden_size', 768) | ||
| num_layers = getattr(args, 'num_layers', 12) | ||
| num_attention_heads = getattr(args, 'num_attention_heads', 12) | ||
|
|
||
| # Calculate component FLOPS | ||
| attention_flops = ( | ||
| self.formulas.attention_flops( | ||
| batch_size=batch_size, | ||
| seq_length=seq_length, | ||
| hidden_size=hidden_size, | ||
| num_attention_heads=num_attention_heads, | ||
| ) | ||
| * num_layers | ||
| ) | ||
|
|
||
| ffn_hidden_size = getattr(args, 'ffn_hidden_size', 4 * hidden_size) | ||
| use_swiglu = getattr(args, 'swiglu', False) | ||
| ffn_flops = ( | ||
| self.formulas.ffn_flops( | ||
| batch_size=batch_size, | ||
| seq_length=seq_length, | ||
| hidden_size=hidden_size, | ||
| ffn_hidden_size=ffn_hidden_size, | ||
| use_swiglu=use_swiglu, | ||
| ) | ||
| * num_layers | ||
| ) | ||
|
|
||
| total_forward = attention_flops + ffn_flops | ||
|
|
||
| return { | ||
| 'attention': attention_flops, | ||
| 'ffn': ffn_flops, | ||
| 'forward': total_forward, | ||
| 'backward': total_forward * 2, # Backward is approximately 2x forward | ||
| 'total': total_forward * 3, | ||
| } | ||
|
|
||
| return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_flops_breakdown method provides a breakdown of FLOPS by component, but it omits the FLOPS from the embedding and output projection layers. These can contribute significantly to the total computation, especially for models with large vocabularies. Including them in the breakdown would offer a more complete performance picture.
def get_flops_breakdown(self) -> Dict[str, float]:
"""
Get detailed FLOPS breakdown by component.
Returns:
Dictionary with FLOPS for different components
"""
if get_num_microbatches is not None:
num_micro_batches = get_num_microbatches()
else:
num_micro_batches = getattr(
self.args, 'num_micro_batches', getattr(self.args, 'gradient_accumulation_steps', 1)
)
micro_batch_size = getattr(self.args, 'micro_batch_size', 1)
batch_size = micro_batch_size * num_micro_batches if micro_batch_size else num_micro_batches
if self.model_type in ['gpt', 'llama', 'qwen', 'aquila']:
args = self.args
# Extract configuration with safe access
seq_length = getattr(args, 'seq_length', 512)
hidden_size = getattr(args, 'hidden_size', 768)
num_layers = getattr(args, 'num_layers', 12)
num_attention_heads = getattr(args, 'num_attention_heads', 12)
vocab_size = getattr(args, 'vocab_size', getattr(args, 'padded_vocab_size', 50257))
# Calculate component FLOPS
attention_flops = (
self.formulas.attention_flops(
batch_size=batch_size,
seq_length=seq_length,
hidden_size=hidden_size,
num_attention_heads=num_attention_heads,
)
* num_layers
)
ffn_hidden_size = getattr(args, 'ffn_hidden_size', 4 * hidden_size)
use_swiglu = getattr(args, 'swiglu', False)
ffn_flops = (
self.formulas.ffn_flops(
batch_size=batch_size,
seq_length=seq_length,
hidden_size=hidden_size,
ffn_hidden_size=ffn_hidden_size,
use_swiglu=use_swiglu,
)
* num_layers
)
embedding_flops = self.formulas.embedding_flops(
batch_size=batch_size,
seq_length=seq_length,
vocab_size=vocab_size,
hidden_size=hidden_size,
)
total_forward = attention_flops + ffn_flops + embedding_flops
return {
'attention': attention_flops,
'ffn': ffn_flops,
'embedding': embedding_flops,
'forward': total_forward,
'backward': total_forward * 2, # Backward is approximately 2x forward
'total': total_forward * 3,
}
return {}
flagscale/train/train.py
Outdated
| if perf_monitor is not None: | ||
| try: | ||
| metrics = perf_monitor.calculate_metrics(iteration) | ||
| if metrics.tflops_per_gpu > 0: | ||
| log_string += f' TFLOPS/GPU (monitored): {metrics.tflops_per_gpu:.2f} |' | ||
| if metrics.tokens_per_second > 0: | ||
| log_string += f' tokens/sec: {metrics.tokens_per_second:.0f} |' | ||
|
|
||
| # Log to TensorBoard/WandB | ||
| if args.log_timers_to_tensorboard: | ||
| if writer: | ||
| writer.add_scalar('performance/tflops_per_gpu', metrics.tflops_per_gpu, iteration) | ||
| writer.add_scalar('performance/tokens_per_second', metrics.tokens_per_second, iteration) | ||
| if perf_monitor.peak_memory_gb > 0: | ||
| writer.add_scalar('memory/peak_gb', perf_monitor.peak_memory_gb, iteration) | ||
| if wandb_writer: | ||
| wandb_writer.log({ | ||
| 'performance/tflops_per_gpu': metrics.tflops_per_gpu, | ||
| 'performance/tokens_per_second': metrics.tokens_per_second, | ||
| 'memory/peak_gb': perf_monitor.peak_memory_gb | ||
| }, iteration) | ||
| except Exception as e: | ||
| # Don't let monitoring errors affect training | ||
| if torch.distributed.get_rank() == 0: | ||
| print(f"Warning: Performance monitor error: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for logging performance metrics to TensorBoard and WandB appears to be duplicated here and within the PerformanceMonitor.log_metrics method. This redundancy can lead to inconsistencies if one implementation is updated but the other is not. To improve maintainability, consider centralizing the logging logic by calling perf_monitor.log_metrics(iteration, writer, wandb_writer) and allowing the PerformanceMonitor class to handle all logging aspects (file, console, TensorBoard, and WandB).
| def add_extra_args(parser): | ||
| """Add extra arguments including performance monitoring and ModelOpt.""" | ||
| # Add performance monitoring args | ||
| parser = add_performance_args(parser) | ||
|
|
||
| # Chain with ModelOpt args if available | ||
| if has_nvidia_modelopt: | ||
| parser = add_modelopt_args(parser) | ||
|
|
||
| return parser |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function add_extra_args is defined but not used anywhere in the codebase. Its implementation is very similar to combine_extra_args_providers, but it calls a different argument-adding function (add_performance_args vs. add_perf_monitor_args). This is confusing and creates redundant code. To improve clarity and maintainability, this unused function should be removed.
| # Initialize distributed if needed (mock for example) | ||
| if not dist.is_initialized(): | ||
| import os | ||
|
|
||
| os.environ['MASTER_ADDR'] = 'localhost' | ||
| os.environ['MASTER_PORT'] = '12355' | ||
| os.environ['RANK'] = '0' | ||
| os.environ['WORLD_SIZE'] = '1' | ||
|
|
||
| if torch.cuda.is_available(): | ||
| dist.init_process_group(backend='nccl', rank=0, world_size=1) | ||
| else: | ||
| dist.init_process_group(backend='gloo', rank=0, world_size=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setup for torch.distributed in this example is a bit confusing. It sets environment variables like RANK and WORLD_SIZE, but then calls dist.init_process_group with hardcoded values (rank=0, world_size=1), effectively ignoring the environment variables. For better clarity and correctness, it would be best to either use the environment variables consistently or remove them if they are not needed for this single-process example.
| # Initialize distributed if needed (mock for example) | |
| if not dist.is_initialized(): | |
| import os | |
| os.environ['MASTER_ADDR'] = 'localhost' | |
| os.environ['MASTER_PORT'] = '12355' | |
| os.environ['RANK'] = '0' | |
| os.environ['WORLD_SIZE'] = '1' | |
| if torch.cuda.is_available(): | |
| dist.init_process_group(backend='nccl', rank=0, world_size=1) | |
| else: | |
| dist.init_process_group(backend='gloo', rank=0, world_size=1) | |
| if not dist.is_initialized(): | |
| import os | |
| os.environ['MASTER_ADDR'] = 'localhost' | |
| os.environ['MASTER_PORT'] = '12355' | |
| # For this single-process example, we can hardcode rank and world_size. | |
| rank = 0 | |
| world_size = 1 | |
| if torch.cuda.is_available(): | |
| dist.init_process_group(backend='nccl', rank=rank, world_size=world_size) | |
| else: | |
| dist.init_process_group(backend='gloo', rank=rank, world_size=world_size) |
Overview
Performance Monitor is the performance monitoring module of FlagScale, designed to track and record key performance metrics in real time during the training process.
Key Features
Supported Models
Quick Start
1. Launch Using the
run.pyScriptEnable via Command-line Arguments (Add New Parameters with the + Prefix)
2. Enable in the yaml configuration file
Command-line Arguments
--enable-perf-monitor--perf-log-interval N--perf-log-dir PATH--perf-console-output--perf-memory-tracking--perf-breakdown--perf-max-log-files NLog File Description
The performance monitor generates the following files:
Log Format Example
Text Log (perf_metrics_*.log):
JSON SUMMARY (perf_summary_*.json):
{ "session_info": { "start_time": "20240129_103000", "end_time": "2024-01-29T11:30:00", "total_iterations": 100 }, "final_statistics": { "avg_tflops_per_gpu": 127.5, "max_tflops_per_gpu": 135.2, "min_tflops_per_gpu": 120.1, "avg_throughput_tokens": 1050000, "peak_memory_gb": 45.2 }, "iteration_logs": [...] }