diff --git a/analysis/dashboard/app.py b/analysis/dashboard/app.py index 87920a15..5fca7eac 100644 --- a/analysis/dashboard/app.py +++ b/analysis/dashboard/app.py @@ -304,6 +304,14 @@ def render_sidebar(logs_dir, runs): if run.metadata.gpu_type: label += f" | {run.metadata.gpu_type}" + # Use folder name suffix (YAML file name) instead of YAML's internal name field + dirname = os.path.basename(run.metadata.path) + if "-" in dirname: + folder_suffix = dirname.split("-", 1)[1] + label += f" | {folder_suffix}" + elif run.metadata.job_name: + label += f" | {run.metadata.job_name}" + run_legend_labels[run_id] = label # Get dataframe diff --git a/analysis/dashboard/node_metrics_tab.py b/analysis/dashboard/node_metrics_tab.py index 176faa48..10304188 100644 --- a/analysis/dashboard/node_metrics_tab.py +++ b/analysis/dashboard/node_metrics_tab.py @@ -81,7 +81,7 @@ def render(filtered_runs: list, logs_dir: str): if not all_node_metrics: st.warning("No log files found for the selected runs.") st.info( - "Node metrics are extracted from files like `*_prefill_*.err`, `*_decode_*.err`, `*_prefill_*.out`, or `*_decode_*.out`" + "Node metrics are extracted from files like `*_prefill_*.out`, `*_decode_*.out`, or `*_agg_*.out`" ) return diff --git a/analysis/srtlog/log_parser.py b/analysis/srtlog/log_parser.py index efecd784..57858f70 100644 --- a/analysis/srtlog/log_parser.py +++ b/analysis/srtlog/log_parser.py @@ -66,7 +66,7 @@ def parse_run_logs(self, run_path: str, return_dicts: bool = False) -> list: parsed_successfully = 0 for file in os.listdir(run_path): - if (file.endswith(".err") or file.endswith(".out")) and ("prefill" in file or "decode" in file): + if (file.endswith(".err") or file.endswith(".out")) and ("prefill" in file or "decode" in file or "agg" in file): total_err_files += 1 filepath = os.path.join(run_path, file) node = self.parse_single_log(filepath) @@ -446,10 +446,11 @@ def _deserialize_node_metrics(self, df: pd.DataFrame) -> list: def _parse_dp_tp_ep_tag(self, line: str) -> tuple[int | None, int | None, int | None, str | None]: """Extract DP, TP, EP indices and timestamp from log line. - Supports three formats: + Supports four formats: - Full: [2025-11-04 05:31:43 DP0 TP0 EP0] - Simple TP: [2025-11-04 07:05:55 TP0] (defaults DP=0, EP=0) - Pipeline: [2025-12-08 14:34:44 PP0] (defaults DP=0, EP=0, TP=PP value) + - Agg/ANSI: [2m2026-02-23T22:33:43.664588Z[0m ... (ISO timestamp in ANSI codes, defaults DP=0, TP=0, EP=0) Args: line: Log line to parse @@ -475,6 +476,14 @@ def _parse_dp_tp_ep_tag(self, line: str) -> tuple[int | None, int | None, int | timestamp, pp = match.groups() return 0, int(pp), 0, timestamp # Map PP to TP slot, default DP=0, EP=0 + # Try ANSI-escaped ISO timestamp format (agg mode logs) + # Example: [2m2026-02-23T22:33:43.664588Z[0m + match = re.search(r"\[2m(\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2})", line) + if match: + date_part, time_part = match.groups() + timestamp = f"{date_part} {time_part}" + return 0, 0, 0, timestamp + return None, None, None, None def _parse_prefill_batch_line(self, line: str) -> dict | None: @@ -491,12 +500,12 @@ def _parse_prefill_batch_line(self, line: str) -> dict | None: metrics = {"timestamp": timestamp, "dp": dp, "tp": tp, "ep": ep, "type": "prefill"} - # Extract metrics using regex + # Extract metrics using regex (support both disagg and agg log formats) patterns = { "new_seq": r"#new-seq:\s*(\d+)", "new_token": r"#new-token:\s*(\d+)", "cached_token": r"#cached-token:\s*(\d+)", - "token_usage": r"token usage:\s*([\d.]+)", + "token_usage": r"(?:full )?token usage:\s*([\d.]+)", "running_req": r"#running-req:\s*(\d+)", "queue_req": r"#queue-req:\s*(\d+)", "prealloc_req": r"#prealloc-req:\s*(\d+)", @@ -526,11 +535,11 @@ def _parse_decode_batch_line(self, line: str) -> dict | None: metrics = {"timestamp": timestamp, "dp": dp, "tp": tp, "ep": ep, "type": "decode"} - # Extract metrics using regex + # Extract metrics using regex (support both disagg and agg log formats) patterns = { "running_req": r"#running-req:\s*(\d+)", - "num_tokens": r"#token:\s*(\d+)", - "token_usage": r"token usage:\s*([\d.]+)", + "num_tokens": r"#(?:full )?token:\s*(\d+)", + "token_usage": r"(?:full )?token usage:\s*([\d.]+)", "preallocated_usage": r"pre-allocated usage:\s*([\d.]+)", "prealloc_req": r"#prealloc-req:\s*(\d+)", "transfer_req": r"#transfer-req:\s*(\d+)", @@ -597,8 +606,8 @@ def _extract_node_info_from_filename(self, filename: str) -> dict | None: Example: watchtower-navy-cn01_prefill_w0.err or r02-p01-dgx-c11_prefill_w0.out Returns: {'node': 'watchtower-navy-cn01', 'worker_type': 'prefill', 'worker_id': 'w0'} """ - # Use greedy match for node name up to _(prefill|decode|frontend)_ - match = re.match(r"(.+)_(prefill|decode|frontend)_([^.]+)\.(err|out)", os.path.basename(filename)) + # Use greedy match for node name up to _(prefill|decode|frontend|agg)_ + match = re.match(r"(.+)_(prefill|decode|frontend|agg)_([^.]+)\.(err|out)", os.path.basename(filename)) if match: return { "node": match.group(1), diff --git a/analysis/srtlog/models.py b/analysis/srtlog/models.py index 26744184..796bcdbd 100644 --- a/analysis/srtlog/models.py +++ b/analysis/srtlog/models.py @@ -48,9 +48,43 @@ def from_json(cls, json_data: dict, run_path: str) -> "RunMetadata": Returns: RunMetadata instance """ - # Check if this is the old format (with run_metadata key) or new format (flat) + # Check if this is the new v2.0 format + if json_data.get("version") == "2.0": + resources = json_data.get("resources", {}) + model = json_data.get("model", {}) + agg_workers = resources.get("agg_workers") or 0 + + # Determine mode based on agg_workers + mode = "aggregated" if agg_workers > 0 else "disaggregated" + + # agg_nodes: fall back to agg_workers if not specified + agg_nodes = (resources.get("agg_nodes") or 0) if agg_workers > 0 else 0 + if agg_workers > 0 and agg_nodes == 0: + agg_nodes = agg_workers + + return cls( + job_id=json_data.get("job_id", ""), + path=run_path, + run_date=json_data.get("generated_at", ""), + container=model.get("container", ""), + prefill_nodes=resources.get("prefill_nodes") or 0, + decode_nodes=resources.get("decode_nodes") or 0, + prefill_workers=resources.get("prefill_workers") or 0, + decode_workers=resources.get("decode_workers") or 0, + mode=mode, + job_name=json_data.get("job_name", ""), + partition="", + model_dir=model.get("path", ""), + gpus_per_node=resources.get("gpus_per_node") or 0, + gpu_type=resources.get("gpu_type", ""), + enable_multiple_frontends=False, + num_additional_frontends=0, + agg_nodes=agg_nodes, + agg_workers=agg_workers, + ) + + # Old format with run_metadata key if "run_metadata" in json_data: - # Old format run_meta = json_data.get("run_metadata", {}) mode = run_meta.get("mode", "disaggregated") @@ -74,35 +108,37 @@ def from_json(cls, json_data: dict, run_path: str) -> "RunMetadata": agg_nodes=run_meta.get("agg_nodes", 0), agg_workers=run_meta.get("agg_workers", 0), ) - else: - # New format (flat structure) - model_data = json_data.get("model", {}) - resources_data = json_data.get("resources", {}) - agg_workers = resources_data.get("agg_workers", 0) - # Determine mode based on agg_workers - mode = "aggregated" if agg_workers > 0 else "disaggregated" + # New format (flat structure, no version field) + model_data = json_data.get("model", {}) + resources_data = json_data.get("resources", {}) + agg_workers = resources_data.get("agg_workers") or 0 + mode = "aggregated" if agg_workers > 0 else "disaggregated" - return cls( - job_id=json_data.get("job_id", ""), - path=run_path, - run_date=json_data.get("generated_at", ""), - container=model_data.get("container", ""), - prefill_nodes=resources_data.get("prefill_nodes", 0), - decode_nodes=resources_data.get("decode_nodes", 0), - prefill_workers=resources_data.get("prefill_workers", 0), - decode_workers=resources_data.get("decode_workers", 0), - mode=mode, - job_name=json_data.get("job_name", ""), - partition="", # Not present in new format - model_dir=model_data.get("path", ""), # Use model path as model_dir - gpus_per_node=resources_data.get("gpus_per_node", 0), - gpu_type=resources_data.get("gpu_type", ""), - enable_multiple_frontends=False, # Not present in new format - num_additional_frontends=0, # Not present in new format - agg_nodes=resources_data.get("agg_nodes", 0), # Not present in new format - agg_workers=agg_workers, - ) + agg_nodes = (resources_data.get("agg_nodes") or 0) if agg_workers > 0 else 0 + if agg_workers > 0 and agg_nodes == 0: + agg_nodes = agg_workers + + return cls( + job_id=json_data.get("job_id", ""), + path=run_path, + run_date=json_data.get("generated_at", ""), + container=model_data.get("container", ""), + prefill_nodes=resources_data.get("prefill_nodes") or 0, + decode_nodes=resources_data.get("decode_nodes") or 0, + prefill_workers=resources_data.get("prefill_workers") or 0, + decode_workers=resources_data.get("decode_workers") or 0, + mode=mode, + job_name=json_data.get("job_name", ""), + partition="", + model_dir=model_data.get("path", ""), + gpus_per_node=resources_data.get("gpus_per_node") or 0, + gpu_type=resources_data.get("gpu_type", ""), + enable_multiple_frontends=False, + num_additional_frontends=0, + agg_nodes=agg_nodes, + agg_workers=agg_workers, + ) @property def is_aggregated(self) -> bool: diff --git a/analysis/srtlog/run_loader.py b/analysis/srtlog/run_loader.py index f093b24a..5af1aa4c 100644 --- a/analysis/srtlog/run_loader.py +++ b/analysis/srtlog/run_loader.py @@ -226,8 +226,12 @@ def _load_benchmark_results(self, run: BenchmarkRun) -> None: profiler_type = run.profiler.profiler_type pattern_strs = [f"{profiler_type}_isl_{run.profiler.isl}_osl_{run.profiler.osl}"] - # Define source patterns for cache validation (check all possible patterns) - source_patterns = [f"{pattern}/*.json" for pattern in pattern_strs] + # Define source patterns for cache validation + # Include both direct and logs/ subdirectory paths since results could be in either + source_patterns = [] + for pattern in pattern_strs: + source_patterns.append(f"{pattern}/*.json") + source_patterns.append(f"logs/{pattern}/*.json") # Try to load from cache first if cache_mgr.is_cache_valid("benchmark_results", source_patterns): @@ -531,9 +535,10 @@ def to_dataframe(self, runs: list[BenchmarkRun] | None = None): rows = [] for run in runs: - run_id = ( - f"{run.job_id}_{run.metadata.prefill_workers}P_{run.metadata.decode_workers}D_{run.metadata.run_date}" - ) + if run.metadata.is_aggregated: + run_id = f"{run.job_id}_{run.metadata.agg_workers}A_{run.metadata.run_date}" + else: + run_id = f"{run.job_id}_{run.metadata.prefill_workers}P_{run.metadata.decode_workers}D_{run.metadata.run_date}" total_gpus = run.total_gpus # Create a row for each concurrency level